From 153b8f1b9d52e7e5a6d35dfbd4ff4ff359a0dee7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Nov 2022 09:46:43 +0100 Subject: Rename driver to nix2 --- .gitignore | 4 +- GNUmakefile | 2 +- exec2/driver.go | 747 ------------------------------------------------- exec2/handle.go | 77 ----- exec2/pull-upstream.sh | 7 - exec2/state.go | 33 --- main.go | 4 +- nix2/driver.go | 747 +++++++++++++++++++++++++++++++++++++++++++++++++ nix2/handle.go | 77 +++++ nix2/pull-upstream.sh | 7 + nix2/state.go | 33 +++ 11 files changed, 869 insertions(+), 869 deletions(-) delete mode 100644 exec2/driver.go delete mode 100644 exec2/handle.go delete mode 100755 exec2/pull-upstream.sh delete mode 100644 exec2/state.go create mode 100644 nix2/driver.go create mode 100644 nix2/handle.go create mode 100755 nix2/pull-upstream.sh create mode 100644 nix2/state.go diff --git a/.gitignore b/.gitignore index 39a5816..cb0937f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -nomad-driver-exec2 -exec2-driver +nomad-driver-* +*-driver diff --git a/GNUmakefile b/GNUmakefile index 6d84e6b..fc9e222 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -1,4 +1,4 @@ -PLUGIN_BINARY=hello-driver +PLUGIN_BINARY=nix2-driver export GO111MODULE=on default: build diff --git a/exec2/driver.go b/exec2/driver.go deleted file mode 100644 index fb84f1a..0000000 --- a/exec2/driver.go +++ /dev/null @@ -1,747 +0,0 @@ -package exec2 - -import ( - "context" - "fmt" - "os" - "runtime" - "sync" - "time" - - "github.com/hashicorp/consul-template/signals" - hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/drivers/shared/capabilities" - "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/Alexis211/nomad-driver-exec2/executor" - "github.com/hashicorp/nomad/drivers/shared/resolvconf" - "github.com/hashicorp/nomad/helper/pluginutils/hclutils" - "github.com/hashicorp/nomad/helper/pluginutils/loader" - "github.com/hashicorp/nomad/helper/pointer" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/drivers/utils" - "github.com/hashicorp/nomad/plugins/shared/hclspec" - pstructs "github.com/hashicorp/nomad/plugins/shared/structs" -) - -const ( - // pluginName is the name of the plugin - pluginName = "exec2" - - // fingerprintPeriod is the interval at which the driver will send fingerprint responses - fingerprintPeriod = 30 * time.Second - - // taskHandleVersion is the version of task handle which this driver sets - // and understands how to decode driver state - taskHandleVersion = 1 -) - -var ( - // PluginID is the exec plugin metadata registered in the plugin - // catalog. - PluginID = loader.PluginID{ - Name: pluginName, - PluginType: base.PluginTypeDriver, - } - - // pluginInfo is the response returned for the PluginInfo RPC - pluginInfo = &base.PluginInfoResponse{ - Type: base.PluginTypeDriver, - PluginApiVersions: []string{drivers.ApiVersion010}, - PluginVersion: "0.1.0", - Name: pluginName, - } - - // configSpec is the hcl specification returned by the ConfigSchema RPC - configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - "no_pivot_root": hclspec.NewDefault( - hclspec.NewAttr("no_pivot_root", "bool", false), - hclspec.NewLiteral("false"), - ), - "default_pid_mode": hclspec.NewDefault( - hclspec.NewAttr("default_pid_mode", "string", false), - hclspec.NewLiteral(`"private"`), - ), - "default_ipc_mode": hclspec.NewDefault( - hclspec.NewAttr("default_ipc_mode", "string", false), - hclspec.NewLiteral(`"private"`), - ), - "allow_caps": hclspec.NewDefault( - hclspec.NewAttr("allow_caps", "list(string)", false), - hclspec.NewLiteral(capabilities.HCLSpecLiteral), - ), - // Default host directories to bind in tasks - "bind": hclspec.NewDefault( - hclspec.NewAttr("bind", "list(map(string))", false), - hclspec.NewLiteral("{}"), - ), - "bind_read_only": hclspec.NewDefault( - hclspec.NewAttr("bind_read_only", "list(map(string))", false), - hclspec.NewLiteral("{}"), - ), - }) - - // taskConfigSpec is the hcl specification for the driver config section of - // a task within a job. It is returned in the TaskConfigSchema RPC - taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - "command": hclspec.NewAttr("command", "string", true), - "args": hclspec.NewAttr("args", "list(string)", false), - "bind": hclspec.NewAttr("bind", "list(map(string))", false), - "bind_read_only": hclspec.NewAttr("bind_read_only", "list(map(string))", false), - "pid_mode": hclspec.NewAttr("pid_mode", "string", false), - "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false), - "cap_add": hclspec.NewAttr("cap_add", "list(string)", false), - "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false), - }) - - // driverCapabilities represents the RPC response for what features are - // implemented by the exec task driver - driverCapabilities = &drivers.Capabilities{ - SendSignals: true, - Exec: true, - FSIsolation: drivers.FSIsolationNone, - NetIsolationModes: []drivers.NetIsolationMode{ - drivers.NetIsolationModeHost, - drivers.NetIsolationModeGroup, - }, - MountConfigs: drivers.MountConfigSupportAll, - } -) - -// Driver fork/execs tasks using many of the underlying OS's isolation -// features where configured. -type Driver struct { - // eventer is used to handle multiplexing of TaskEvents calls such that an - // event can be broadcast to all callers - eventer *eventer.Eventer - - // config is the driver configuration set by the SetConfig RPC - config Config - - // tasks is the in memory datastore mapping taskIDs to driverHandles - tasks *taskStore - - // ctx is the context for the driver. It is passed to other subsystems to - // coordinate shutdown - ctx context.Context - - // signalShutdown is called when the driver is shutting down and cancels - // the ctx passed to any subsystems - signalShutdown context.CancelFunc - - // logger will log to the Nomad agent - logger hclog.Logger - - // A tri-state boolean to know if the fingerprinting has happened and - // whether it has been successful - fingerprintSuccess *bool - fingerprintLock sync.Mutex -} - -// Config is the driver configuration set by the SetConfig RPC call -type Config struct { - // NoPivotRoot disables the use of pivot_root, useful when the root partition - // is on ramdisk - NoPivotRoot bool `codec:"no_pivot_root"` - - // DefaultModePID is the default PID isolation set for all tasks using - // exec-based task drivers. - DefaultModePID string `codec:"default_pid_mode"` - - // DefaultModeIPC is the default IPC isolation set for all tasks using - // exec-based task drivers. - DefaultModeIPC string `codec:"default_ipc_mode"` - - // AllowCaps configures which Linux Capabilities are enabled for tasks - // running on this node. - AllowCaps []string `codec:"allow_caps"` - - // Paths to bind for read-write acess in all jobs - Bind hclutils.MapStrStr `codec:"bind"` - - // Paths to bind for read-only acess in all jobs - BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` -} - -func (c *Config) validate() error { - switch c.DefaultModePID { - case executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID) - } - - switch c.DefaultModeIPC { - case executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC) - } - - badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps)) - if !badCaps.Empty() { - return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps) - } - - return nil -} - -// TaskConfig is the driver configuration of a task within a job -type TaskConfig struct { - // Command is the thing to exec. - Command string `codec:"command"` - - // Args are passed along to Command. - Args []string `codec:"args"` - - // Paths to bind for read-write acess - Bind hclutils.MapStrStr `codec:"bind"` - - // Paths to bind for read-only acess - BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` - - // ModePID indicates whether PID namespace isolation is enabled for the task. - // Must be "private" or "host" if set. - ModePID string `codec:"pid_mode"` - - // ModeIPC indicates whether IPC namespace isolation is enabled for the task. - // Must be "private" or "host" if set. - ModeIPC string `codec:"ipc_mode"` - - // CapAdd is a set of linux capabilities to enable. - CapAdd []string `codec:"cap_add"` - - // CapDrop is a set of linux capabilities to disable. - CapDrop []string `codec:"cap_drop"` -} - -func (tc *TaskConfig) validate() error { - switch tc.ModePID { - case "", executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID) - } - - switch tc.ModeIPC { - case "", executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC) - } - - supported := capabilities.Supported() - badAdds := supported.Difference(capabilities.New(tc.CapAdd)) - if !badAdds.Empty() { - return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds) - } - badDrops := supported.Difference(capabilities.New(tc.CapDrop)) - if !badDrops.Empty() { - return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops) - } - - return nil -} - -// TaskState is the state which is encoded in the handle returned in -// StartTask. This information is needed to rebuild the task state and handler -// during recovery. -type TaskState struct { - TaskConfig *drivers.TaskConfig - Pid int - StartedAt time.Time -} - -// NewPlugin returns a new DrivePlugin implementation -func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { - ctx, cancel := context.WithCancel(context.Background()) - logger = logger.Named(pluginName) - return &Driver{ - eventer: eventer.NewEventer(ctx, logger), - tasks: newTaskStore(), - ctx: ctx, - signalShutdown: cancel, - logger: logger, - } -} - -// setFingerprintSuccess marks the driver as having fingerprinted successfully -func (d *Driver) setFingerprintSuccess() { - d.fingerprintLock.Lock() - d.fingerprintSuccess = pointer.Of(true) - d.fingerprintLock.Unlock() -} - -// setFingerprintFailure marks the driver as having failed fingerprinting -func (d *Driver) setFingerprintFailure() { - d.fingerprintLock.Lock() - d.fingerprintSuccess = pointer.Of(false) - d.fingerprintLock.Unlock() -} - -// fingerprintSuccessful returns true if the driver has -// never fingerprinted or has successfully fingerprinted -func (d *Driver) fingerprintSuccessful() bool { - d.fingerprintLock.Lock() - defer d.fingerprintLock.Unlock() - return d.fingerprintSuccess == nil || *d.fingerprintSuccess -} - -func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { - return pluginInfo, nil -} - -func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { - return configSpec, nil -} - -func (d *Driver) SetConfig(cfg *base.Config) error { - // unpack, validate, and set agent plugin config - var config Config - if len(cfg.PluginConfig) != 0 { - if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { - return err - } - } - if err := config.validate(); err != nil { - return err - } - d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) - d.config = config - - return nil -} - -func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { - return taskConfigSpec, nil -} - -// Capabilities is returned by the Capabilities RPC and indicates what -// optional features this driver supports -func (d *Driver) Capabilities() (*drivers.Capabilities, error) { - return driverCapabilities, nil -} - -func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { - ch := make(chan *drivers.Fingerprint) - go d.handleFingerprint(ctx, ch) - return ch, nil - -} -func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { - defer close(ch) - ticker := time.NewTimer(0) - for { - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case <-ticker.C: - ticker.Reset(fingerprintPeriod) - ch <- d.buildFingerprint() - } - } -} - -func (d *Driver) buildFingerprint() *drivers.Fingerprint { - if runtime.GOOS != "linux" { - d.setFingerprintFailure() - return &drivers.Fingerprint{ - Health: drivers.HealthStateUndetected, - HealthDescription: "exec driver unsupported on client OS", - } - } - - fp := &drivers.Fingerprint{ - Attributes: map[string]*pstructs.Attribute{}, - Health: drivers.HealthStateHealthy, - HealthDescription: drivers.DriverHealthy, - } - - if !utils.IsUnixRoot() { - fp.Health = drivers.HealthStateUndetected - fp.HealthDescription = drivers.DriverRequiresRootMessage - d.setFingerprintFailure() - return fp - } - - mount, err := cgutil.FindCgroupMountpointDir() - if err != nil { - fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = drivers.NoCgroupMountMessage - if d.fingerprintSuccessful() { - d.logger.Warn(fp.HealthDescription, "error", err) - } - d.setFingerprintFailure() - return fp - } - - if mount == "" { - fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = drivers.CgroupMountEmpty - d.setFingerprintFailure() - return fp - } - - fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true) - d.setFingerprintSuccess() - return fp -} - -func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { - if handle == nil { - return fmt.Errorf("handle cannot be nil") - } - - // If already attached to handle there's nothing to recover. - if _, ok := d.tasks.Get(handle.Config.ID); ok { - d.logger.Trace("nothing to recover; task already exists", - "task_id", handle.Config.ID, - "task_name", handle.Config.Name, - ) - return nil - } - - // Handle doesn't already exist, try to reattach - var taskState TaskState - if err := handle.GetDriverState(&taskState); err != nil { - d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) - return fmt.Errorf("failed to decode task state from handle: %v", err) - } - - // Create new executor - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) - - h := &taskHandle{ - exec: exec, - pid: taskState.Pid, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - logger: d.logger, - } - - d.tasks.Set(taskState.TaskConfig.ID, h) - - go h.run() - return nil -} - -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { - if _, ok := d.tasks.Get(cfg.ID); ok { - return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) - } - - var driverConfig TaskConfig - if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { - return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) - } - - if err := driverConfig.validate(); err != nil { - return nil, nil, fmt.Errorf("failed driver config validation: %v", err) - } - - d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) - handle := drivers.NewTaskHandle(taskHandleVersion) - handle.Config = cfg - - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) - - user := cfg.User - if user == "" { - user = "0" - } - - if cfg.DNS != nil { - dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) - if err != nil { - return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err) - } - cfg.Mounts = append(cfg.Mounts, dnsMount) - } - - // Bind mounts specified in driver config - if d.config.Bind != nil { - for host, task := range d.config.Bind { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: false, - PropagationMode: "private", - } - d.logger.Info("adding RW mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - if d.config.BindReadOnly != nil { - for host, task := range d.config.BindReadOnly { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: true, - PropagationMode: "private", - } - d.logger.Info("adding RO mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - - // Bind mounts specified in task config - if driverConfig.Bind != nil { - for host, task := range driverConfig.Bind { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: false, - PropagationMode: "private", - } - d.logger.Info("adding RW mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - if driverConfig.BindReadOnly != nil { - for host, task := range driverConfig.BindReadOnly { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: true, - PropagationMode: "private", - } - d.logger.Info("adding RO mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - - caps, err := capabilities.Calculate( - capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop, - ) - if err != nil { - return nil, nil, err - } - d.logger.Debug("task capabilities", "capabilities", caps) - - execCmd := &executor.ExecCommand{ - Cmd: driverConfig.Command, - Args: driverConfig.Args, - Env: cfg.EnvList(), - User: user, - ResourceLimits: true, - NoPivotRoot: d.config.NoPivotRoot, - Resources: cfg.Resources, - TaskDir: cfg.TaskDir().Dir, - StdoutPath: cfg.StdoutPath, - StderrPath: cfg.StderrPath, - Mounts: cfg.Mounts, - Devices: cfg.Devices, - NetworkIsolation: cfg.NetworkIsolation, - ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID), - ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC), - Capabilities: caps, - } - - d.logger.Info("launching with", "exec_cmd", hclog.Fmt("%+v", execCmd)) - - ps, err := exec.Launch(execCmd) - if err != nil { - return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) - } - - h := &taskHandle{ - exec: exec, - pid: ps.Pid, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, - } - - driverState := TaskState{ - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, - } - - if err := handle.SetDriverState(&driverState); err != nil { - d.logger.Error("failed to start task, error setting driver state", "error", err) - _ = exec.Shutdown("", 0) - return nil, nil, fmt.Errorf("failed to set driver state: %v", err) - } - - d.tasks.Set(cfg.ID, h) - go h.run() - return handle, nil, nil -} - -func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - ch := make(chan *drivers.ExitResult) - go d.handleWait(ctx, handle, ch) - - return ch, nil -} - -func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { - defer close(ch) - var result *drivers.ExitResult - ps, err := handle.exec.Wait(ctx) - if err != nil { - result = &drivers.ExitResult{ - Err: fmt.Errorf("executor: error waiting on process: %v", err), - } - } else { - result = &drivers.ExitResult{ - ExitCode: ps.ExitCode, - Signal: ps.Signal, - } - } - - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case ch <- result: - } -} - -func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - if err := handle.exec.Shutdown(signal, timeout); err != nil { - return fmt.Errorf("executor Shutdown failed: %v", err) - } - - return nil -} - -// resetCgroup will re-create the v2 cgroup for the task after the task has been -// destroyed by libcontainer. In the case of a task restart we call DestroyTask -// which removes the cgroup - but we still need it! -// -// Ideally the cgroup management would be more unified - and we could do the creation -// on a task runner pre-start hook, eliminating the need for this hack. -func (d *Driver) resetCgroup(handle *taskHandle) { - if cgutil.UseV2 { - if handle.taskConfig.Resources != nil && - handle.taskConfig.Resources.LinuxResources != nil && - handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" { - err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755) - if err != nil { - d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath) - } - } - } -} - -func (d *Driver) DestroyTask(taskID string, force bool) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - if handle.IsRunning() && !force { - return fmt.Errorf("cannot destroy running task") - } - - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "error", err) - } - - // workaround for the case where DestroyTask was issued on task restart - d.resetCgroup(handle) - - d.tasks.Delete(taskID) - return nil -} - -func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - return handle.TaskStatus(), nil -} - -func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - return handle.exec.Stats(ctx, interval) -} - -func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { - return d.eventer.TaskEvents(ctx) -} - -func (d *Driver) SignalTask(taskID string, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - sig := os.Interrupt - if s, ok := signals.SignalLookup[signal]; ok { - sig = s - } else { - d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) - - } - return handle.exec.Signal(sig) -} - -func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - if len(cmd) == 0 { - return nil, fmt.Errorf("error cmd must have at least one value") - } - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - args := []string{} - if len(cmd) > 1 { - args = cmd[1:] - } - - out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) - if err != nil { - return nil, err - } - - return &drivers.ExecTaskResult{ - Stdout: out, - ExitResult: &drivers.ExitResult{ - ExitCode: exitCode, - }, - }, nil -} - -var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) - -func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, - taskID string, - command []string, - tty bool, - stream drivers.ExecTaskStream) error { - - if len(command) == 0 { - return fmt.Errorf("error cmd must have at least one value") - } - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - return handle.exec.ExecStreaming(ctx, command, tty, stream) -} diff --git a/exec2/handle.go b/exec2/handle.go deleted file mode 100644 index 9cd1cc3..0000000 --- a/exec2/handle.go +++ /dev/null @@ -1,77 +0,0 @@ -package exec2 - -import ( - "context" - "strconv" - "sync" - "time" - - hclog "github.com/hashicorp/go-hclog" - "github.com/Alexis211/nomad-driver-exec2/executor" - "github.com/hashicorp/nomad/plugins/drivers" -) - -type taskHandle struct { - exec executor.Executor - pid int - logger hclog.Logger - - // stateLock syncs access to all fields below - stateLock sync.RWMutex - - taskConfig *drivers.TaskConfig - procState drivers.TaskState - startedAt time.Time - completedAt time.Time - exitResult *drivers.ExitResult -} - -func (h *taskHandle) TaskStatus() *drivers.TaskStatus { - h.stateLock.RLock() - defer h.stateLock.RUnlock() - - return &drivers.TaskStatus{ - ID: h.taskConfig.ID, - Name: h.taskConfig.Name, - State: h.procState, - StartedAt: h.startedAt, - CompletedAt: h.completedAt, - ExitResult: h.exitResult, - DriverAttributes: map[string]string{ - "pid": strconv.Itoa(h.pid), - }, - } -} - -func (h *taskHandle) IsRunning() bool { - h.stateLock.RLock() - defer h.stateLock.RUnlock() - return h.procState == drivers.TaskStateRunning -} - -func (h *taskHandle) run() { - h.stateLock.Lock() - if h.exitResult == nil { - h.exitResult = &drivers.ExitResult{} - } - h.stateLock.Unlock() - - // Block until process exits - ps, err := h.exec.Wait(context.Background()) - - h.stateLock.Lock() - defer h.stateLock.Unlock() - - if err != nil { - h.exitResult.Err = err - h.procState = drivers.TaskStateUnknown - h.completedAt = time.Now() - return - } - h.procState = drivers.TaskStateExited - h.exitResult.ExitCode = ps.ExitCode - h.exitResult.Signal = ps.Signal - h.completedAt = ps.Time - - // TODO: detect if the task OOMed -} diff --git a/exec2/pull-upstream.sh b/exec2/pull-upstream.sh deleted file mode 100755 index a797951..0000000 --- a/exec2/pull-upstream.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -REF=v1.4.3 - -wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/driver.go -O driver.go -wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/handle.go -O handle.go -wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/state.go -O state.go diff --git a/exec2/state.go b/exec2/state.go deleted file mode 100644 index 277e336..0000000 --- a/exec2/state.go +++ /dev/null @@ -1,33 +0,0 @@ -package exec2 - -import ( - "sync" -) - -type taskStore struct { - store map[string]*taskHandle - lock sync.RWMutex -} - -func newTaskStore() *taskStore { - return &taskStore{store: map[string]*taskHandle{}} -} - -func (ts *taskStore) Set(id string, handle *taskHandle) { - ts.lock.Lock() - defer ts.lock.Unlock() - ts.store[id] = handle -} - -func (ts *taskStore) Get(id string) (*taskHandle, bool) { - ts.lock.RLock() - defer ts.lock.RUnlock() - t, ok := ts.store[id] - return t, ok -} - -func (ts *taskStore) Delete(id string) { - ts.lock.Lock() - defer ts.lock.Unlock() - delete(ts.store, id) -} diff --git a/main.go b/main.go index 3e1e36c..99d94b1 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,7 @@ package main import ( - "github.com/Alexis211/nomad-driver-exec2/exec2" + "github.com/Alexis211/nomad-driver-exec2/nix2" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/plugins" @@ -14,5 +14,5 @@ func main() { // factory returns a new instance of a nomad driver plugin func factory(log hclog.Logger) interface{} { - return exec2.NewPlugin(log) + return nix2.NewPlugin(log) } diff --git a/nix2/driver.go b/nix2/driver.go new file mode 100644 index 0000000..964ff56 --- /dev/null +++ b/nix2/driver.go @@ -0,0 +1,747 @@ +package nix2 + +import ( + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/drivers/shared/capabilities" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/Alexis211/nomad-driver-exec2/executor" + "github.com/hashicorp/nomad/drivers/shared/resolvconf" + "github.com/hashicorp/nomad/helper/pluginutils/hclutils" + "github.com/hashicorp/nomad/helper/pluginutils/loader" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/drivers/utils" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + pstructs "github.com/hashicorp/nomad/plugins/shared/structs" +) + +const ( + // pluginName is the name of the plugin + pluginName = "nix2" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 +) + +var ( + // PluginID is the exec plugin metadata registered in the plugin + // catalog. + PluginID = loader.PluginID{ + Name: pluginName, + PluginType: base.PluginTypeDriver, + } + + // pluginInfo is the response returned for the PluginInfo RPC + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersions: []string{drivers.ApiVersion010}, + PluginVersion: "0.1.0", + Name: pluginName, + } + + // configSpec is the hcl specification returned by the ConfigSchema RPC + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "no_pivot_root": hclspec.NewDefault( + hclspec.NewAttr("no_pivot_root", "bool", false), + hclspec.NewLiteral("false"), + ), + "default_pid_mode": hclspec.NewDefault( + hclspec.NewAttr("default_pid_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "default_ipc_mode": hclspec.NewDefault( + hclspec.NewAttr("default_ipc_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "allow_caps": hclspec.NewDefault( + hclspec.NewAttr("allow_caps", "list(string)", false), + hclspec.NewLiteral(capabilities.HCLSpecLiteral), + ), + // Default host directories to bind in tasks + "bind": hclspec.NewDefault( + hclspec.NewAttr("bind", "list(map(string))", false), + hclspec.NewLiteral("{}"), + ), + "bind_read_only": hclspec.NewDefault( + hclspec.NewAttr("bind_read_only", "list(map(string))", false), + hclspec.NewLiteral("{}"), + ), + }) + + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "command": hclspec.NewAttr("command", "string", true), + "args": hclspec.NewAttr("args", "list(string)", false), + "bind": hclspec.NewAttr("bind", "list(map(string))", false), + "bind_read_only": hclspec.NewAttr("bind_read_only", "list(map(string))", false), + "pid_mode": hclspec.NewAttr("pid_mode", "string", false), + "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false), + "cap_add": hclspec.NewAttr("cap_add", "list(string)", false), + "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false), + }) + + // driverCapabilities represents the RPC response for what features are + // implemented by the exec task driver + driverCapabilities = &drivers.Capabilities{ + SendSignals: true, + Exec: true, + FSIsolation: drivers.FSIsolationNone, + NetIsolationModes: []drivers.NetIsolationMode{ + drivers.NetIsolationModeHost, + drivers.NetIsolationModeGroup, + }, + MountConfigs: drivers.MountConfigSupportAll, + } +) + +// Driver fork/execs tasks using many of the underlying OS's isolation +// features where configured. +type Driver struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the driver configuration set by the SetConfig RPC + config Config + + // tasks is the in memory datastore mapping taskIDs to driverHandles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels + // the ctx passed to any subsystems + signalShutdown context.CancelFunc + + // logger will log to the Nomad agent + logger hclog.Logger + + // A tri-state boolean to know if the fingerprinting has happened and + // whether it has been successful + fingerprintSuccess *bool + fingerprintLock sync.Mutex +} + +// Config is the driver configuration set by the SetConfig RPC call +type Config struct { + // NoPivotRoot disables the use of pivot_root, useful when the root partition + // is on ramdisk + NoPivotRoot bool `codec:"no_pivot_root"` + + // DefaultModePID is the default PID isolation set for all tasks using + // exec-based task drivers. + DefaultModePID string `codec:"default_pid_mode"` + + // DefaultModeIPC is the default IPC isolation set for all tasks using + // exec-based task drivers. + DefaultModeIPC string `codec:"default_ipc_mode"` + + // AllowCaps configures which Linux Capabilities are enabled for tasks + // running on this node. + AllowCaps []string `codec:"allow_caps"` + + // Paths to bind for read-write acess in all jobs + Bind hclutils.MapStrStr `codec:"bind"` + + // Paths to bind for read-only acess in all jobs + BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` +} + +func (c *Config) validate() error { + switch c.DefaultModePID { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID) + } + + switch c.DefaultModeIPC { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC) + } + + badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps)) + if !badCaps.Empty() { + return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps) + } + + return nil +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + // Command is the thing to exec. + Command string `codec:"command"` + + // Args are passed along to Command. + Args []string `codec:"args"` + + // Paths to bind for read-write acess + Bind hclutils.MapStrStr `codec:"bind"` + + // Paths to bind for read-only acess + BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` + + // ModePID indicates whether PID namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModePID string `codec:"pid_mode"` + + // ModeIPC indicates whether IPC namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModeIPC string `codec:"ipc_mode"` + + // CapAdd is a set of linux capabilities to enable. + CapAdd []string `codec:"cap_add"` + + // CapDrop is a set of linux capabilities to disable. + CapDrop []string `codec:"cap_drop"` +} + +func (tc *TaskConfig) validate() error { + switch tc.ModePID { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID) + } + + switch tc.ModeIPC { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC) + } + + supported := capabilities.Supported() + badAdds := supported.Difference(capabilities.New(tc.CapAdd)) + if !badAdds.Empty() { + return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds) + } + badDrops := supported.Difference(capabilities.New(tc.CapDrop)) + if !badDrops.Empty() { + return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops) + } + + return nil +} + +// TaskState is the state which is encoded in the handle returned in +// StartTask. This information is needed to rebuild the task state and handler +// during recovery. +type TaskState struct { + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time +} + +// NewPlugin returns a new DrivePlugin implementation +func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { + ctx, cancel := context.WithCancel(context.Background()) + logger = logger.Named(pluginName) + return &Driver{ + eventer: eventer.NewEventer(ctx, logger), + tasks: newTaskStore(), + ctx: ctx, + signalShutdown: cancel, + logger: logger, + } +} + +// setFingerprintSuccess marks the driver as having fingerprinted successfully +func (d *Driver) setFingerprintSuccess() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(true) + d.fingerprintLock.Unlock() +} + +// setFingerprintFailure marks the driver as having failed fingerprinting +func (d *Driver) setFingerprintFailure() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(false) + d.fingerprintLock.Unlock() +} + +// fingerprintSuccessful returns true if the driver has +// never fingerprinted or has successfully fingerprinted +func (d *Driver) fingerprintSuccessful() bool { + d.fingerprintLock.Lock() + defer d.fingerprintLock.Unlock() + return d.fingerprintSuccess == nil || *d.fingerprintSuccess +} + +func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (d *Driver) SetConfig(cfg *base.Config) error { + // unpack, validate, and set agent plugin config + var config Config + if len(cfg.PluginConfig) != 0 { + if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { + return err + } + } + if err := config.validate(); err != nil { + return err + } + d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) + d.config = config + + return nil +} + +func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +// Capabilities is returned by the Capabilities RPC and indicates what +// optional features this driver supports +func (d *Driver) Capabilities() (*drivers.Capabilities, error) { + return driverCapabilities, nil +} + +func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil + +} +func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { + defer close(ch) + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +func (d *Driver) buildFingerprint() *drivers.Fingerprint { + if runtime.GOOS != "linux" { + d.setFingerprintFailure() + return &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + HealthDescription: "exec driver unsupported on client OS", + } + } + + fp := &drivers.Fingerprint{ + Attributes: map[string]*pstructs.Attribute{}, + Health: drivers.HealthStateHealthy, + HealthDescription: drivers.DriverHealthy, + } + + if !utils.IsUnixRoot() { + fp.Health = drivers.HealthStateUndetected + fp.HealthDescription = drivers.DriverRequiresRootMessage + d.setFingerprintFailure() + return fp + } + + mount, err := cgutil.FindCgroupMountpointDir() + if err != nil { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.NoCgroupMountMessage + if d.fingerprintSuccessful() { + d.logger.Warn(fp.HealthDescription, "error", err) + } + d.setFingerprintFailure() + return fp + } + + if mount == "" { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.CgroupMountEmpty + d.setFingerprintFailure() + return fp + } + + fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true) + d.setFingerprintSuccess() + return fp +} + +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { + return fmt.Errorf("handle cannot be nil") + } + + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + + // Handle doesn't already exist, try to reattach + var taskState TaskState + if err := handle.GetDriverState(&taskState); err != nil { + d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to decode task state from handle: %v", err) + } + + // Create new executor + exec := executor.NewExecutorWithIsolation( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) + + h := &taskHandle{ + exec: exec, + pid: taskState.Pid, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { + if _, ok := d.tasks.Get(cfg.ID); ok { + return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) + } + + if err := driverConfig.validate(); err != nil { + return nil, nil, fmt.Errorf("failed driver config validation: %v", err) + } + + d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) + handle := drivers.NewTaskHandle(taskHandleVersion) + handle.Config = cfg + + exec := executor.NewExecutorWithIsolation( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) + + user := cfg.User + if user == "" { + user = "0" + } + + if cfg.DNS != nil { + dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) + if err != nil { + return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err) + } + cfg.Mounts = append(cfg.Mounts, dnsMount) + } + + // Bind mounts specified in driver config + if d.config.Bind != nil { + for host, task := range d.config.Bind { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: false, + PropagationMode: "private", + } + d.logger.Info("adding RW mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + if d.config.BindReadOnly != nil { + for host, task := range d.config.BindReadOnly { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: true, + PropagationMode: "private", + } + d.logger.Info("adding RO mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + + // Bind mounts specified in task config + if driverConfig.Bind != nil { + for host, task := range driverConfig.Bind { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: false, + PropagationMode: "private", + } + d.logger.Info("adding RW mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + if driverConfig.BindReadOnly != nil { + for host, task := range driverConfig.BindReadOnly { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: true, + PropagationMode: "private", + } + d.logger.Info("adding RO mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + + caps, err := capabilities.Calculate( + capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop, + ) + if err != nil { + return nil, nil, err + } + d.logger.Debug("task capabilities", "capabilities", caps) + + execCmd := &executor.ExecCommand{ + Cmd: driverConfig.Command, + Args: driverConfig.Args, + Env: cfg.EnvList(), + User: user, + ResourceLimits: true, + NoPivotRoot: d.config.NoPivotRoot, + Resources: cfg.Resources, + TaskDir: cfg.TaskDir().Dir, + StdoutPath: cfg.StdoutPath, + StderrPath: cfg.StderrPath, + Mounts: cfg.Mounts, + Devices: cfg.Devices, + NetworkIsolation: cfg.NetworkIsolation, + ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID), + ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC), + Capabilities: caps, + } + + d.logger.Info("launching with", "exec_cmd", hclog.Fmt("%+v", execCmd)) + + ps, err := exec.Launch(execCmd) + if err != nil { + return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) + } + + h := &taskHandle{ + exec: exec, + pid: ps.Pid, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, + } + + driverState := TaskState{ + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, + } + + if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + _ = exec.Shutdown("", 0) + return nil, nil, fmt.Errorf("failed to set driver state: %v", err) + } + + d.tasks.Set(cfg.ID, h) + go h.run() + return handle, nil, nil +} + +func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + ch := make(chan *drivers.ExitResult) + go d.handleWait(ctx, handle, ch) + + return ch, nil +} + +func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + var result *drivers.ExitResult + ps, err := handle.exec.Wait(ctx) + if err != nil { + result = &drivers.ExitResult{ + Err: fmt.Errorf("executor: error waiting on process: %v", err), + } + } else { + result = &drivers.ExitResult{ + ExitCode: ps.ExitCode, + Signal: ps.Signal, + } + } + + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case ch <- result: + } +} + +func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if err := handle.exec.Shutdown(signal, timeout); err != nil { + return fmt.Errorf("executor Shutdown failed: %v", err) + } + + return nil +} + +// resetCgroup will re-create the v2 cgroup for the task after the task has been +// destroyed by libcontainer. In the case of a task restart we call DestroyTask +// which removes the cgroup - but we still need it! +// +// Ideally the cgroup management would be more unified - and we could do the creation +// on a task runner pre-start hook, eliminating the need for this hack. +func (d *Driver) resetCgroup(handle *taskHandle) { + if cgutil.UseV2 { + if handle.taskConfig.Resources != nil && + handle.taskConfig.Resources.LinuxResources != nil && + handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" { + err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755) + if err != nil { + d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath) + } + } + } +} + +func (d *Driver) DestroyTask(taskID string, force bool) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if handle.IsRunning() && !force { + return fmt.Errorf("cannot destroy running task") + } + + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "error", err) + } + + // workaround for the case where DestroyTask was issued on task restart + d.resetCgroup(handle) + + d.tasks.Delete(taskID) + return nil +} + +func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.TaskStatus(), nil +} + +func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.exec.Stats(ctx, interval) +} + +func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return d.eventer.TaskEvents(ctx) +} + +func (d *Driver) SignalTask(taskID string, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + sig := os.Interrupt + if s, ok := signals.SignalLookup[signal]; ok { + sig = s + } else { + d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) + + } + return handle.exec.Signal(sig) +} + +func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + args := []string{} + if len(cmd) > 1 { + args = cmd[1:] + } + + out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) + if err != nil { + return nil, err + } + + return &drivers.ExecTaskResult{ + Stdout: out, + ExitResult: &drivers.ExitResult{ + ExitCode: exitCode, + }, + }, nil +} + +var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) + +func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, + taskID string, + command []string, + tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + return handle.exec.ExecStreaming(ctx, command, tty, stream) +} diff --git a/nix2/handle.go b/nix2/handle.go new file mode 100644 index 0000000..0bd7a2e --- /dev/null +++ b/nix2/handle.go @@ -0,0 +1,77 @@ +package nix2 + +import ( + "context" + "strconv" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/Alexis211/nomad-driver-exec2/executor" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type taskHandle struct { + exec executor.Executor + pid int + logger hclog.Logger + + // stateLock syncs access to all fields below + stateLock sync.RWMutex + + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult +} + +func (h *taskHandle) TaskStatus() *drivers.TaskStatus { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + + return &drivers.TaskStatus{ + ID: h.taskConfig.ID, + Name: h.taskConfig.Name, + State: h.procState, + StartedAt: h.startedAt, + CompletedAt: h.completedAt, + ExitResult: h.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(h.pid), + }, + } +} + +func (h *taskHandle) IsRunning() bool { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + return h.procState == drivers.TaskStateRunning +} + +func (h *taskHandle) run() { + h.stateLock.Lock() + if h.exitResult == nil { + h.exitResult = &drivers.ExitResult{} + } + h.stateLock.Unlock() + + // Block until process exits + ps, err := h.exec.Wait(context.Background()) + + h.stateLock.Lock() + defer h.stateLock.Unlock() + + if err != nil { + h.exitResult.Err = err + h.procState = drivers.TaskStateUnknown + h.completedAt = time.Now() + return + } + h.procState = drivers.TaskStateExited + h.exitResult.ExitCode = ps.ExitCode + h.exitResult.Signal = ps.Signal + h.completedAt = ps.Time + + // TODO: detect if the task OOMed +} diff --git a/nix2/pull-upstream.sh b/nix2/pull-upstream.sh new file mode 100755 index 0000000..a797951 --- /dev/null +++ b/nix2/pull-upstream.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +REF=v1.4.3 + +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/driver.go -O driver.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/handle.go -O handle.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/state.go -O state.go diff --git a/nix2/state.go b/nix2/state.go new file mode 100644 index 0000000..a846ea4 --- /dev/null +++ b/nix2/state.go @@ -0,0 +1,33 @@ +package nix2 + +import ( + "sync" +) + +type taskStore struct { + store map[string]*taskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*taskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *taskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*taskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} -- cgit v1.2.3