aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-11-29 10:10:22 +0100
committerAlex Auvolat <alex@adnab.me>2022-11-29 10:10:22 +0100
commit50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0 (patch)
tree5fc54877cd87897e48741149209397859a4ba195
parentc2af63186de8214e9abc077b093d4dacc780372d (diff)
parentdc9af4e04ff3381b8e034ecf81a7159b79613d4a (diff)
downloadnomad-driver-nix2-50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0.tar.gz
nomad-driver-nix2-50412d4cf0c2ae780bc8a1acce8dd7aa1d0b19b0.zip
Merge branch 'driver-exec2'
-rw-r--r--nix2/driver.go91
-rw-r--r--nix2/handle.go8
2 files changed, 71 insertions, 28 deletions
diff --git a/nix2/driver.go b/nix2/driver.go
index 8118f94..c97efc5 100644
--- a/nix2/driver.go
+++ b/nix2/driver.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
+ "path/filepath"
"runtime"
"sync"
"time"
@@ -114,6 +115,9 @@ type Driver struct {
// config is the driver configuration set by the SetConfig RPC
config Config
+ // nomadConfig is the client config from nomad
+ nomadConfig *base.ClientDriverConfig
+
// tasks is the in memory datastore mapping taskIDs to driverHandles
tasks *taskStore
@@ -236,9 +240,10 @@ func (tc *TaskConfig) validate() error {
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
- TaskConfig *drivers.TaskConfig
- Pid int
- StartedAt time.Time
+ ReattachConfig *pstructs.ReattachConfig
+ TaskConfig *drivers.TaskConfig
+ Pid int
+ StartedAt time.Time
}
// NewPlugin returns a new DrivePlugin implementation
@@ -298,6 +303,9 @@ func (d *Driver) SetConfig(cfg *base.Config) error {
d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config))
d.config = config
+ if cfg != nil && cfg.AgentConfig != nil {
+ d.nomadConfig = cfg.AgentConfig.Driver
+ }
return nil
}
@@ -399,18 +407,29 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
- // Create new executor
- exec := executor.NewExecutorWithIsolation(
+ // Create client for reattached executor
+ plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
+ if err != nil {
+ d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
+ return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
+ }
+
+ exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
+ if err != nil {
+ d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
+ return fmt.Errorf("failed to reattach to executor: %v", err)
+ }
h := &taskHandle{
- exec: exec,
- pid: taskState.Pid,
- taskConfig: taskState.TaskConfig,
- procState: drivers.TaskStateRunning,
- startedAt: taskState.StartedAt,
- exitResult: &drivers.ExitResult{},
- logger: d.logger,
+ exec: exec,
+ pid: taskState.Pid,
+ pluginClient: pluginClient,
+ taskConfig: taskState.TaskConfig,
+ procState: drivers.TaskStateRunning,
+ startedAt: taskState.StartedAt,
+ exitResult: &drivers.ExitResult{},
+ logger: d.logger,
}
d.tasks.Set(taskState.TaskConfig.ID, h)
@@ -437,8 +456,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
- exec := executor.NewExecutorWithIsolation(
- d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
+ pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
+ executorConfig := &executor.ExecutorConfig{
+ LogFile: pluginLogFile,
+ LogLevel: "debug",
+ FSIsolation: true,
+ }
+
+ exec, pluginClient, err := executor.CreateExecutor(
+ d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
+ d.nomadConfig, executorConfig)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create executor: %v", err)
+ }
user := cfg.User
if user == "" {
@@ -518,27 +548,31 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
ps, err := exec.Launch(execCmd)
if err != nil {
+ pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &taskHandle{
- exec: exec,
- pid: ps.Pid,
- taskConfig: cfg,
- procState: drivers.TaskStateRunning,
- startedAt: time.Now().Round(time.Millisecond),
- logger: d.logger,
+ exec: exec,
+ pid: ps.Pid,
+ pluginClient: pluginClient,
+ taskConfig: cfg,
+ procState: drivers.TaskStateRunning,
+ startedAt: time.Now().Round(time.Millisecond),
+ logger: d.logger,
}
driverState := TaskState{
- Pid: ps.Pid,
- TaskConfig: cfg,
- StartedAt: h.startedAt,
+ ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
+ Pid: ps.Pid,
+ TaskConfig: cfg,
+ StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
_ = exec.Shutdown("", 0)
+ pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
@@ -590,6 +624,9 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
}
if err := handle.exec.Shutdown(signal, timeout); err != nil {
+ if handle.pluginClient.Exited() {
+ return nil
+ }
return fmt.Errorf("executor Shutdown failed: %v", err)
}
@@ -625,8 +662,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
return fmt.Errorf("cannot destroy running task")
}
- if err := handle.exec.Shutdown("", 0); err != nil {
- handle.logger.Error("destroying executor failed", "error", err)
+ if !handle.pluginClient.Exited() {
+ if err := handle.exec.Shutdown("", 0); err != nil {
+ handle.logger.Error("destroying executor failed", "error", err)
+ }
+
+ handle.pluginClient.Kill()
}
// workaround for the case where DestroyTask was issued on task restart
diff --git a/nix2/handle.go b/nix2/handle.go
index 9de5d3e..0e54f4e 100644
--- a/nix2/handle.go
+++ b/nix2/handle.go
@@ -8,13 +8,15 @@ import (
"github.com/Alexis211/nomad-driver-exec2/executor"
hclog "github.com/hashicorp/go-hclog"
+ plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/drivers"
)
type taskHandle struct {
- exec executor.Executor
- pid int
- logger hclog.Logger
+ exec executor.Executor
+ pid int
+ pluginClient *plugin.Client
+ logger hclog.Logger
// stateLock syncs access to all fields below
stateLock sync.RWMutex