diff options
author | Alex Auvolat <alex@adnab.me> | 2022-11-29 10:10:22 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-11-29 10:10:22 +0100 |
commit | 50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0 (patch) | |
tree | 5fc54877cd87897e48741149209397859a4ba195 /nix2/driver.go | |
parent | c2af63186de8214e9abc077b093d4dacc780372d (diff) | |
parent | dc9af4e04ff3381b8e034ecf81a7159b79613d4a (diff) | |
download | nomad-driver-nix2-50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0.tar.gz nomad-driver-nix2-50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0.zip |
Merge branch 'driver-exec2'
Diffstat (limited to 'nix2/driver.go')
-rw-r--r-- | nix2/driver.go | 91 |
1 files changed, 66 insertions, 25 deletions
diff --git a/nix2/driver.go b/nix2/driver.go index 8118f94..c97efc5 100644 --- a/nix2/driver.go +++ b/nix2/driver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "runtime" "sync" "time" @@ -114,6 +115,9 @@ type Driver struct { // config is the driver configuration set by the SetConfig RPC config Config + // nomadConfig is the client config from nomad + nomadConfig *base.ClientDriverConfig + // tasks is the in memory datastore mapping taskIDs to driverHandles tasks *taskStore @@ -236,9 +240,10 @@ func (tc *TaskConfig) validate() error { // StartTask. This information is needed to rebuild the task state and handler // during recovery. type TaskState struct { - TaskConfig *drivers.TaskConfig - Pid int - StartedAt time.Time + ReattachConfig *pstructs.ReattachConfig + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time } // NewPlugin returns a new DrivePlugin implementation @@ -298,6 +303,9 @@ func (d *Driver) SetConfig(cfg *base.Config) error { d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) d.config = config + if cfg != nil && cfg.AgentConfig != nil { + d.nomadConfig = cfg.AgentConfig.Driver + } return nil } @@ -399,18 +407,29 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("failed to decode task state from handle: %v", err) } - // Create new executor - exec := executor.NewExecutorWithIsolation( + // Create client for reattached executor + plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to build ReattachConfig from task state: %v", err) + } + + exec, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + if err != nil { + d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to reattach to executor: %v", err) + } h := &taskHandle{ - exec: exec, - pid: taskState.Pid, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - logger: d.logger, + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, } d.tasks.Set(taskState.TaskConfig.ID, h) @@ -437,8 +456,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") + executorConfig := &executor.ExecutorConfig{ + LogFile: pluginLogFile, + LogLevel: "debug", + FSIsolation: true, + } + + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } user := cfg.User if user == "" { @@ -518,27 +548,31 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive ps, err := exec.Launch(execCmd) if err != nil { + pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) } h := &taskHandle{ - exec: exec, - pid: ps.Pid, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, } driverState := TaskState{ - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, + ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, } if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) _ = exec.Shutdown("", 0) + pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } @@ -590,6 +624,9 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e } if err := handle.exec.Shutdown(signal, timeout); err != nil { + if handle.pluginClient.Exited() { + return nil + } return fmt.Errorf("executor Shutdown failed: %v", err) } @@ -625,8 +662,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { return fmt.Errorf("cannot destroy running task") } - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "error", err) + if !handle.pluginClient.Exited() { + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "error", err) + } + + handle.pluginClient.Kill() } // workaround for the case where DestroyTask was issued on task restart |