diff options
Diffstat (limited to 'nix2')
-rw-r--r-- | nix2/driver.go | 762 | ||||
-rw-r--r-- | nix2/handle.go | 79 | ||||
-rwxr-xr-x | nix2/pull-upstream.sh | 7 | ||||
-rw-r--r-- | nix2/state.go | 33 |
4 files changed, 881 insertions, 0 deletions
diff --git a/nix2/driver.go b/nix2/driver.go new file mode 100644 index 0000000..c97efc5 --- /dev/null +++ b/nix2/driver.go @@ -0,0 +1,762 @@ +package nix2 + +import ( + "context" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/Alexis211/nomad-driver-exec2/executor" + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/drivers/shared/capabilities" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/hashicorp/nomad/drivers/shared/resolvconf" + "github.com/hashicorp/nomad/helper/pluginutils/hclutils" + "github.com/hashicorp/nomad/helper/pluginutils/loader" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/drivers/utils" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + pstructs "github.com/hashicorp/nomad/plugins/shared/structs" +) + +const ( + // pluginName is the name of the plugin + pluginName = "nix2" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 +) + +var ( + // PluginID is the exec plugin metadata registered in the plugin + // catalog. + PluginID = loader.PluginID{ + Name: pluginName, + PluginType: base.PluginTypeDriver, + } + + // pluginInfo is the response returned for the PluginInfo RPC + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersions: []string{drivers.ApiVersion010}, + PluginVersion: "0.1.0", + Name: pluginName, + } + + // configSpec is the hcl specification returned by the ConfigSchema RPC + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "no_pivot_root": hclspec.NewDefault( + hclspec.NewAttr("no_pivot_root", "bool", false), + hclspec.NewLiteral("false"), + ), + "default_pid_mode": hclspec.NewDefault( + hclspec.NewAttr("default_pid_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "default_ipc_mode": hclspec.NewDefault( + hclspec.NewAttr("default_ipc_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "allow_caps": hclspec.NewDefault( + hclspec.NewAttr("allow_caps", "list(string)", false), + hclspec.NewLiteral(capabilities.HCLSpecLiteral), + ), + "allow_bind": hclspec.NewDefault( + hclspec.NewAttr("allow_bind", "bool", false), + hclspec.NewLiteral("true"), + ), + }) + + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "command": hclspec.NewAttr("command", "string", true), + "args": hclspec.NewAttr("args", "list(string)", false), + "bind": hclspec.NewAttr("bind", "list(map(string))", false), + "bind_read_only": hclspec.NewAttr("bind_read_only", "list(map(string))", false), + "pid_mode": hclspec.NewAttr("pid_mode", "string", false), + "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false), + "cap_add": hclspec.NewAttr("cap_add", "list(string)", false), + "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false), + }) + + // driverCapabilities represents the RPC response for what features are + // implemented by the exec task driver + driverCapabilities = &drivers.Capabilities{ + SendSignals: true, + Exec: true, + FSIsolation: drivers.FSIsolationNone, + NetIsolationModes: []drivers.NetIsolationMode{ + drivers.NetIsolationModeHost, + drivers.NetIsolationModeGroup, + }, + MountConfigs: drivers.MountConfigSupportAll, + } +) + +// Driver fork/execs tasks using many of the underlying OS's isolation +// features where configured. +type Driver struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the driver configuration set by the SetConfig RPC + config Config + + // nomadConfig is the client config from nomad + nomadConfig *base.ClientDriverConfig + + // tasks is the in memory datastore mapping taskIDs to driverHandles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels + // the ctx passed to any subsystems + signalShutdown context.CancelFunc + + // logger will log to the Nomad agent + logger hclog.Logger + + // A tri-state boolean to know if the fingerprinting has happened and + // whether it has been successful + fingerprintSuccess *bool + fingerprintLock sync.Mutex +} + +// Config is the driver configuration set by the SetConfig RPC call +type Config struct { + // NoPivotRoot disables the use of pivot_root, useful when the root partition + // is on ramdisk + NoPivotRoot bool `codec:"no_pivot_root"` + + // DefaultModePID is the default PID isolation set for all tasks using + // exec-based task drivers. + DefaultModePID string `codec:"default_pid_mode"` + + // DefaultModeIPC is the default IPC isolation set for all tasks using + // exec-based task drivers. + DefaultModeIPC string `codec:"default_ipc_mode"` + + // AllowCaps configures which Linux Capabilities are enabled for tasks + // running on this node. + AllowCaps []string `codec:"allow_caps"` + + // AllowBind defines whether users may bind host directories + AllowBind bool `codec:"allow_bind"` +} + +func (c *Config) validate() error { + switch c.DefaultModePID { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID) + } + + switch c.DefaultModeIPC { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC) + } + + badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps)) + if !badCaps.Empty() { + return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps) + } + + return nil +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + // Command is the thing to exec. + Command string `codec:"command"` + + // Args are passed along to Command. + Args []string `codec:"args"` + + // Paths to bind for read-write acess + Bind hclutils.MapStrStr `codec:"bind"` + + // Paths to bind for read-only acess + BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` + + // ModePID indicates whether PID namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModePID string `codec:"pid_mode"` + + // ModeIPC indicates whether IPC namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModeIPC string `codec:"ipc_mode"` + + // CapAdd is a set of linux capabilities to enable. + CapAdd []string `codec:"cap_add"` + + // CapDrop is a set of linux capabilities to disable. + CapDrop []string `codec:"cap_drop"` +} + +func (tc *TaskConfig) validate() error { + switch tc.ModePID { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID) + } + + switch tc.ModeIPC { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC) + } + + supported := capabilities.Supported() + badAdds := supported.Difference(capabilities.New(tc.CapAdd)) + if !badAdds.Empty() { + return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds) + } + badDrops := supported.Difference(capabilities.New(tc.CapDrop)) + if !badDrops.Empty() { + return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops) + } + + return nil +} + +// TaskState is the state which is encoded in the handle returned in +// StartTask. This information is needed to rebuild the task state and handler +// during recovery. +type TaskState struct { + ReattachConfig *pstructs.ReattachConfig + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time +} + +// NewPlugin returns a new DrivePlugin implementation +func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { + ctx, cancel := context.WithCancel(context.Background()) + logger = logger.Named(pluginName) + return &Driver{ + eventer: eventer.NewEventer(ctx, logger), + tasks: newTaskStore(), + ctx: ctx, + signalShutdown: cancel, + logger: logger, + } +} + +// setFingerprintSuccess marks the driver as having fingerprinted successfully +func (d *Driver) setFingerprintSuccess() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(true) + d.fingerprintLock.Unlock() +} + +// setFingerprintFailure marks the driver as having failed fingerprinting +func (d *Driver) setFingerprintFailure() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(false) + d.fingerprintLock.Unlock() +} + +// fingerprintSuccessful returns true if the driver has +// never fingerprinted or has successfully fingerprinted +func (d *Driver) fingerprintSuccessful() bool { + d.fingerprintLock.Lock() + defer d.fingerprintLock.Unlock() + return d.fingerprintSuccess == nil || *d.fingerprintSuccess +} + +func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (d *Driver) SetConfig(cfg *base.Config) error { + // unpack, validate, and set agent plugin config + var config Config + if len(cfg.PluginConfig) != 0 { + if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { + return err + } + } + if err := config.validate(); err != nil { + return err + } + d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) + d.config = config + + if cfg != nil && cfg.AgentConfig != nil { + d.nomadConfig = cfg.AgentConfig.Driver + } + return nil +} + +func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +// Capabilities is returned by the Capabilities RPC and indicates what +// optional features this driver supports +func (d *Driver) Capabilities() (*drivers.Capabilities, error) { + return driverCapabilities, nil +} + +func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil + +} +func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { + defer close(ch) + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +func (d *Driver) buildFingerprint() *drivers.Fingerprint { + if runtime.GOOS != "linux" { + d.setFingerprintFailure() + return &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + HealthDescription: "exec driver unsupported on client OS", + } + } + + fp := &drivers.Fingerprint{ + Attributes: map[string]*pstructs.Attribute{}, + Health: drivers.HealthStateHealthy, + HealthDescription: drivers.DriverHealthy, + } + + if !utils.IsUnixRoot() { + fp.Health = drivers.HealthStateUndetected + fp.HealthDescription = drivers.DriverRequiresRootMessage + d.setFingerprintFailure() + return fp + } + + mount, err := cgutil.FindCgroupMountpointDir() + if err != nil { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.NoCgroupMountMessage + if d.fingerprintSuccessful() { + d.logger.Warn(fp.HealthDescription, "error", err) + } + d.setFingerprintFailure() + return fp + } + + if mount == "" { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.CgroupMountEmpty + d.setFingerprintFailure() + return fp + } + + fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true) + d.setFingerprintSuccess() + return fp +} + +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { + return fmt.Errorf("handle cannot be nil") + } + + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + + // Handle doesn't already exist, try to reattach + var taskState TaskState + if err := handle.GetDriverState(&taskState); err != nil { + d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to decode task state from handle: %v", err) + } + + // Create client for reattached executor + plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to build ReattachConfig from task state: %v", err) + } + + exec, pluginClient, err := executor.ReattachToExecutor(plugRC, + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + if err != nil { + d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to reattach to executor: %v", err) + } + + h := &taskHandle{ + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { + if _, ok := d.tasks.Get(cfg.ID); ok { + return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) + } + + if err := driverConfig.validate(); err != nil { + return nil, nil, fmt.Errorf("failed driver config validation: %v", err) + } + + d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) + handle := drivers.NewTaskHandle(taskHandleVersion) + handle.Config = cfg + + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") + executorConfig := &executor.ExecutorConfig{ + LogFile: pluginLogFile, + LogLevel: "debug", + FSIsolation: true, + } + + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } + + user := cfg.User + if user == "" { + user = "0" + } + + if cfg.DNS != nil { + dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) + if err != nil { + return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err) + } + cfg.Mounts = append(cfg.Mounts, dnsMount) + } + + // Bind mounts specified in driver config + + // Bind mounts specified in task config + if d.config.AllowBind { + if driverConfig.Bind != nil { + for host, task := range driverConfig.Bind { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: false, + PropagationMode: "private", + } + d.logger.Info("adding RW mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + if driverConfig.BindReadOnly != nil { + for host, task := range driverConfig.BindReadOnly { + mount_config := drivers.MountConfig{ + TaskPath: task, + HostPath: host, + Readonly: true, + PropagationMode: "private", + } + d.logger.Info("adding RO mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) + cfg.Mounts = append(cfg.Mounts, &mount_config) + } + } + } else { + if len(driverConfig.Bind) > 0 || len(driverConfig.BindReadOnly) > 0 { + return nil, nil, fmt.Errorf("bind and bind_read_only are deactivated for the %s driver", pluginName) + } + } + + caps, err := capabilities.Calculate( + capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop, + ) + if err != nil { + return nil, nil, err + } + d.logger.Debug("task capabilities", "capabilities", caps) + + execCmd := &executor.ExecCommand{ + Cmd: driverConfig.Command, + Args: driverConfig.Args, + Env: cfg.EnvList(), + User: user, + ResourceLimits: true, + NoPivotRoot: d.config.NoPivotRoot, + Resources: cfg.Resources, + TaskDir: cfg.TaskDir().Dir, + StdoutPath: cfg.StdoutPath, + StderrPath: cfg.StderrPath, + Mounts: cfg.Mounts, + Devices: cfg.Devices, + NetworkIsolation: cfg.NetworkIsolation, + ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID), + ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC), + Capabilities: caps, + } + + d.logger.Info("launching with", "exec_cmd", hclog.Fmt("%+v", execCmd)) + + ps, err := exec.Launch(execCmd) + if err != nil { + pluginClient.Kill() + return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) + } + + h := &taskHandle{ + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, + } + + driverState := TaskState{ + ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, + } + + if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + _ = exec.Shutdown("", 0) + pluginClient.Kill() + return nil, nil, fmt.Errorf("failed to set driver state: %v", err) + } + + d.tasks.Set(cfg.ID, h) + go h.run() + return handle, nil, nil +} + +func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + ch := make(chan *drivers.ExitResult) + go d.handleWait(ctx, handle, ch) + + return ch, nil +} + +func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + var result *drivers.ExitResult + ps, err := handle.exec.Wait(ctx) + if err != nil { + result = &drivers.ExitResult{ + Err: fmt.Errorf("executor: error waiting on process: %v", err), + } + } else { + result = &drivers.ExitResult{ + ExitCode: ps.ExitCode, + Signal: ps.Signal, + } + } + + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case ch <- result: + } +} + +func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if err := handle.exec.Shutdown(signal, timeout); err != nil { + if handle.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + + return nil +} + +// resetCgroup will re-create the v2 cgroup for the task after the task has been +// destroyed by libcontainer. In the case of a task restart we call DestroyTask +// which removes the cgroup - but we still need it! +// +// Ideally the cgroup management would be more unified - and we could do the creation +// on a task runner pre-start hook, eliminating the need for this hack. +func (d *Driver) resetCgroup(handle *taskHandle) { + if cgutil.UseV2 { + if handle.taskConfig.Resources != nil && + handle.taskConfig.Resources.LinuxResources != nil && + handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" { + err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755) + if err != nil { + d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath) + } + } + } +} + +func (d *Driver) DestroyTask(taskID string, force bool) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if handle.IsRunning() && !force { + return fmt.Errorf("cannot destroy running task") + } + + if !handle.pluginClient.Exited() { + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "error", err) + } + + handle.pluginClient.Kill() + } + + // workaround for the case where DestroyTask was issued on task restart + d.resetCgroup(handle) + + d.tasks.Delete(taskID) + return nil +} + +func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.TaskStatus(), nil +} + +func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.exec.Stats(ctx, interval) +} + +func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return d.eventer.TaskEvents(ctx) +} + +func (d *Driver) SignalTask(taskID string, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + sig := os.Interrupt + if s, ok := signals.SignalLookup[signal]; ok { + sig = s + } else { + d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) + + } + return handle.exec.Signal(sig) +} + +func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + args := []string{} + if len(cmd) > 1 { + args = cmd[1:] + } + + out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) + if err != nil { + return nil, err + } + + return &drivers.ExecTaskResult{ + Stdout: out, + ExitResult: &drivers.ExitResult{ + ExitCode: exitCode, + }, + }, nil +} + +var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) + +func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, + taskID string, + command []string, + tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + return handle.exec.ExecStreaming(ctx, command, tty, stream) +} diff --git a/nix2/handle.go b/nix2/handle.go new file mode 100644 index 0000000..0e54f4e --- /dev/null +++ b/nix2/handle.go @@ -0,0 +1,79 @@ +package nix2 + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/Alexis211/nomad-driver-exec2/executor" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type taskHandle struct { + exec executor.Executor + pid int + pluginClient *plugin.Client + logger hclog.Logger + + // stateLock syncs access to all fields below + stateLock sync.RWMutex + + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult +} + +func (h *taskHandle) TaskStatus() *drivers.TaskStatus { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + + return &drivers.TaskStatus{ + ID: h.taskConfig.ID, + Name: h.taskConfig.Name, + State: h.procState, + StartedAt: h.startedAt, + CompletedAt: h.completedAt, + ExitResult: h.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(h.pid), + }, + } +} + +func (h *taskHandle) IsRunning() bool { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + return h.procState == drivers.TaskStateRunning +} + +func (h *taskHandle) run() { + h.stateLock.Lock() + if h.exitResult == nil { + h.exitResult = &drivers.ExitResult{} + } + h.stateLock.Unlock() + + // Block until process exits + ps, err := h.exec.Wait(context.Background()) + + h.stateLock.Lock() + defer h.stateLock.Unlock() + + if err != nil { + h.exitResult.Err = err + h.procState = drivers.TaskStateUnknown + h.completedAt = time.Now() + return + } + h.procState = drivers.TaskStateExited + h.exitResult.ExitCode = ps.ExitCode + h.exitResult.Signal = ps.Signal + h.completedAt = ps.Time + + // TODO: detect if the task OOMed +} diff --git a/nix2/pull-upstream.sh b/nix2/pull-upstream.sh new file mode 100755 index 0000000..a797951 --- /dev/null +++ b/nix2/pull-upstream.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +REF=v1.4.3 + +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/driver.go -O driver.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/handle.go -O handle.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/state.go -O state.go diff --git a/nix2/state.go b/nix2/state.go new file mode 100644 index 0000000..a846ea4 --- /dev/null +++ b/nix2/state.go @@ -0,0 +1,33 @@ +package nix2 + +import ( + "sync" +) + +type taskStore struct { + store map[string]*taskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*taskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *taskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*taskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} |