From dc9af4e04ff3381b8e034ecf81a7159b79613d4a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Nov 2022 10:05:29 +0100 Subject: Revert some wrong changes to make this closer to upstream exec driver --- exec2/driver.go | 91 +++++++++++++++++++++++++++++++++++++++++---------------- exec2/handle.go | 8 +++-- 2 files changed, 71 insertions(+), 28 deletions(-) (limited to 'exec2') diff --git a/exec2/driver.go b/exec2/driver.go index 57efd3a..38d5415 100644 --- a/exec2/driver.go +++ b/exec2/driver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "runtime" "sync" "time" @@ -119,6 +120,9 @@ type Driver struct { // config is the driver configuration set by the SetConfig RPC config Config + // nomadConfig is the client config from nomad + nomadConfig *base.ClientDriverConfig + // tasks is the in memory datastore mapping taskIDs to driverHandles tasks *taskStore @@ -244,9 +248,10 @@ func (tc *TaskConfig) validate() error { // StartTask. This information is needed to rebuild the task state and handler // during recovery. type TaskState struct { - TaskConfig *drivers.TaskConfig - Pid int - StartedAt time.Time + ReattachConfig *pstructs.ReattachConfig + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time } // NewPlugin returns a new DrivePlugin implementation @@ -306,6 +311,9 @@ func (d *Driver) SetConfig(cfg *base.Config) error { d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) d.config = config + if cfg != nil && cfg.AgentConfig != nil { + d.nomadConfig = cfg.AgentConfig.Driver + } return nil } @@ -407,18 +415,29 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("failed to decode task state from handle: %v", err) } - // Create new executor - exec := executor.NewExecutorWithIsolation( + // Create client for reattached executor + plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to build ReattachConfig from task state: %v", err) + } + + exec, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + if err != nil { + d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to reattach to executor: %v", err) + } h := &taskHandle{ - exec: exec, - pid: taskState.Pid, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - logger: d.logger, + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, } d.tasks.Set(taskState.TaskConfig.ID, h) @@ -445,8 +464,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") + executorConfig := &executor.ExecutorConfig{ + LogFile: pluginLogFile, + LogLevel: "debug", + FSIsolation: true, + } + + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } user := cfg.User if user == "" { @@ -544,27 +574,31 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive ps, err := exec.Launch(execCmd) if err != nil { + pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) } h := &taskHandle{ - exec: exec, - pid: ps.Pid, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, } driverState := TaskState{ - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, + ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, } if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) _ = exec.Shutdown("", 0) + pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } @@ -616,6 +650,9 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e } if err := handle.exec.Shutdown(signal, timeout); err != nil { + if handle.pluginClient.Exited() { + return nil + } return fmt.Errorf("executor Shutdown failed: %v", err) } @@ -651,8 +688,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { return fmt.Errorf("cannot destroy running task") } - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "error", err) + if !handle.pluginClient.Exited() { + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "error", err) + } + + handle.pluginClient.Kill() } // workaround for the case where DestroyTask was issued on task restart diff --git a/exec2/handle.go b/exec2/handle.go index 695f49d..edb3346 100644 --- a/exec2/handle.go +++ b/exec2/handle.go @@ -8,13 +8,15 @@ import ( "github.com/Alexis211/nomad-driver-exec2/executor" hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec executor.Executor - pid int - logger hclog.Logger + exec executor.Executor + pid int + pluginClient *plugin.Client + logger hclog.Logger // stateLock syncs access to all fields below stateLock sync.RWMutex -- cgit v1.2.3