diff options
Diffstat (limited to 'exec2/driver.go')
-rw-r--r-- | exec2/driver.go | 747 |
1 files changed, 0 insertions, 747 deletions
diff --git a/exec2/driver.go b/exec2/driver.go deleted file mode 100644 index fb84f1a..0000000 --- a/exec2/driver.go +++ /dev/null @@ -1,747 +0,0 @@ -package exec2 - -import ( - "context" - "fmt" - "os" - "runtime" - "sync" - "time" - - "github.com/hashicorp/consul-template/signals" - hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/drivers/shared/capabilities" - "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/Alexis211/nomad-driver-exec2/executor" - "github.com/hashicorp/nomad/drivers/shared/resolvconf" - "github.com/hashicorp/nomad/helper/pluginutils/hclutils" - "github.com/hashicorp/nomad/helper/pluginutils/loader" - "github.com/hashicorp/nomad/helper/pointer" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/drivers/utils" - "github.com/hashicorp/nomad/plugins/shared/hclspec" - pstructs "github.com/hashicorp/nomad/plugins/shared/structs" -) - -const ( - // pluginName is the name of the plugin - pluginName = "exec2" - - // fingerprintPeriod is the interval at which the driver will send fingerprint responses - fingerprintPeriod = 30 * time.Second - - // taskHandleVersion is the version of task handle which this driver sets - // and understands how to decode driver state - taskHandleVersion = 1 -) - -var ( - // PluginID is the exec plugin metadata registered in the plugin - // catalog. - PluginID = loader.PluginID{ - Name: pluginName, - PluginType: base.PluginTypeDriver, - } - - // pluginInfo is the response returned for the PluginInfo RPC - pluginInfo = &base.PluginInfoResponse{ - Type: base.PluginTypeDriver, - PluginApiVersions: []string{drivers.ApiVersion010}, - PluginVersion: "0.1.0", - Name: pluginName, - } - - // configSpec is the hcl specification returned by the ConfigSchema RPC - configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - "no_pivot_root": hclspec.NewDefault( - hclspec.NewAttr("no_pivot_root", "bool", false), - hclspec.NewLiteral("false"), - ), - "default_pid_mode": hclspec.NewDefault( - hclspec.NewAttr("default_pid_mode", "string", false), - hclspec.NewLiteral(`"private"`), - ), - "default_ipc_mode": hclspec.NewDefault( - hclspec.NewAttr("default_ipc_mode", "string", false), - hclspec.NewLiteral(`"private"`), - ), - "allow_caps": hclspec.NewDefault( - hclspec.NewAttr("allow_caps", "list(string)", false), - hclspec.NewLiteral(capabilities.HCLSpecLiteral), - ), - // Default host directories to bind in tasks - "bind": hclspec.NewDefault( - hclspec.NewAttr("bind", "list(map(string))", false), - hclspec.NewLiteral("{}"), - ), - "bind_read_only": hclspec.NewDefault( - hclspec.NewAttr("bind_read_only", "list(map(string))", false), - hclspec.NewLiteral("{}"), - ), - }) - - // taskConfigSpec is the hcl specification for the driver config section of - // a task within a job. It is returned in the TaskConfigSchema RPC - taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - "command": hclspec.NewAttr("command", "string", true), - "args": hclspec.NewAttr("args", "list(string)", false), - "bind": hclspec.NewAttr("bind", "list(map(string))", false), - "bind_read_only": hclspec.NewAttr("bind_read_only", "list(map(string))", false), - "pid_mode": hclspec.NewAttr("pid_mode", "string", false), - "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false), - "cap_add": hclspec.NewAttr("cap_add", "list(string)", false), - "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false), - }) - - // driverCapabilities represents the RPC response for what features are - // implemented by the exec task driver - driverCapabilities = &drivers.Capabilities{ - SendSignals: true, - Exec: true, - FSIsolation: drivers.FSIsolationNone, - NetIsolationModes: []drivers.NetIsolationMode{ - drivers.NetIsolationModeHost, - drivers.NetIsolationModeGroup, - }, - MountConfigs: drivers.MountConfigSupportAll, - } -) - -// Driver fork/execs tasks using many of the underlying OS's isolation -// features where configured. -type Driver struct { - // eventer is used to handle multiplexing of TaskEvents calls such that an - // event can be broadcast to all callers - eventer *eventer.Eventer - - // config is the driver configuration set by the SetConfig RPC - config Config - - // tasks is the in memory datastore mapping taskIDs to driverHandles - tasks *taskStore - - // ctx is the context for the driver. It is passed to other subsystems to - // coordinate shutdown - ctx context.Context - - // signalShutdown is called when the driver is shutting down and cancels - // the ctx passed to any subsystems - signalShutdown context.CancelFunc - - // logger will log to the Nomad agent - logger hclog.Logger - - // A tri-state boolean to know if the fingerprinting has happened and - // whether it has been successful - fingerprintSuccess *bool - fingerprintLock sync.Mutex -} - -// Config is the driver configuration set by the SetConfig RPC call -type Config struct { - // NoPivotRoot disables the use of pivot_root, useful when the root partition - // is on ramdisk - NoPivotRoot bool `codec:"no_pivot_root"` - - // DefaultModePID is the default PID isolation set for all tasks using - // exec-based task drivers. - DefaultModePID string `codec:"default_pid_mode"` - - // DefaultModeIPC is the default IPC isolation set for all tasks using - // exec-based task drivers. - DefaultModeIPC string `codec:"default_ipc_mode"` - - // AllowCaps configures which Linux Capabilities are enabled for tasks - // running on this node. - AllowCaps []string `codec:"allow_caps"` - - // Paths to bind for read-write acess in all jobs - Bind hclutils.MapStrStr `codec:"bind"` - - // Paths to bind for read-only acess in all jobs - BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` -} - -func (c *Config) validate() error { - switch c.DefaultModePID { - case executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID) - } - - switch c.DefaultModeIPC { - case executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC) - } - - badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps)) - if !badCaps.Empty() { - return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps) - } - - return nil -} - -// TaskConfig is the driver configuration of a task within a job -type TaskConfig struct { - // Command is the thing to exec. - Command string `codec:"command"` - - // Args are passed along to Command. - Args []string `codec:"args"` - - // Paths to bind for read-write acess - Bind hclutils.MapStrStr `codec:"bind"` - - // Paths to bind for read-only acess - BindReadOnly hclutils.MapStrStr `codec:"bind_read_only"` - - // ModePID indicates whether PID namespace isolation is enabled for the task. - // Must be "private" or "host" if set. - ModePID string `codec:"pid_mode"` - - // ModeIPC indicates whether IPC namespace isolation is enabled for the task. - // Must be "private" or "host" if set. - ModeIPC string `codec:"ipc_mode"` - - // CapAdd is a set of linux capabilities to enable. - CapAdd []string `codec:"cap_add"` - - // CapDrop is a set of linux capabilities to disable. - CapDrop []string `codec:"cap_drop"` -} - -func (tc *TaskConfig) validate() error { - switch tc.ModePID { - case "", executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID) - } - - switch tc.ModeIPC { - case "", executor.IsolationModePrivate, executor.IsolationModeHost: - default: - return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC) - } - - supported := capabilities.Supported() - badAdds := supported.Difference(capabilities.New(tc.CapAdd)) - if !badAdds.Empty() { - return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds) - } - badDrops := supported.Difference(capabilities.New(tc.CapDrop)) - if !badDrops.Empty() { - return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops) - } - - return nil -} - -// TaskState is the state which is encoded in the handle returned in -// StartTask. This information is needed to rebuild the task state and handler -// during recovery. -type TaskState struct { - TaskConfig *drivers.TaskConfig - Pid int - StartedAt time.Time -} - -// NewPlugin returns a new DrivePlugin implementation -func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { - ctx, cancel := context.WithCancel(context.Background()) - logger = logger.Named(pluginName) - return &Driver{ - eventer: eventer.NewEventer(ctx, logger), - tasks: newTaskStore(), - ctx: ctx, - signalShutdown: cancel, - logger: logger, - } -} - -// setFingerprintSuccess marks the driver as having fingerprinted successfully -func (d *Driver) setFingerprintSuccess() { - d.fingerprintLock.Lock() - d.fingerprintSuccess = pointer.Of(true) - d.fingerprintLock.Unlock() -} - -// setFingerprintFailure marks the driver as having failed fingerprinting -func (d *Driver) setFingerprintFailure() { - d.fingerprintLock.Lock() - d.fingerprintSuccess = pointer.Of(false) - d.fingerprintLock.Unlock() -} - -// fingerprintSuccessful returns true if the driver has -// never fingerprinted or has successfully fingerprinted -func (d *Driver) fingerprintSuccessful() bool { - d.fingerprintLock.Lock() - defer d.fingerprintLock.Unlock() - return d.fingerprintSuccess == nil || *d.fingerprintSuccess -} - -func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { - return pluginInfo, nil -} - -func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { - return configSpec, nil -} - -func (d *Driver) SetConfig(cfg *base.Config) error { - // unpack, validate, and set agent plugin config - var config Config - if len(cfg.PluginConfig) != 0 { - if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { - return err - } - } - if err := config.validate(); err != nil { - return err - } - d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config)) - d.config = config - - return nil -} - -func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { - return taskConfigSpec, nil -} - -// Capabilities is returned by the Capabilities RPC and indicates what -// optional features this driver supports -func (d *Driver) Capabilities() (*drivers.Capabilities, error) { - return driverCapabilities, nil -} - -func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { - ch := make(chan *drivers.Fingerprint) - go d.handleFingerprint(ctx, ch) - return ch, nil - -} -func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { - defer close(ch) - ticker := time.NewTimer(0) - for { - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case <-ticker.C: - ticker.Reset(fingerprintPeriod) - ch <- d.buildFingerprint() - } - } -} - -func (d *Driver) buildFingerprint() *drivers.Fingerprint { - if runtime.GOOS != "linux" { - d.setFingerprintFailure() - return &drivers.Fingerprint{ - Health: drivers.HealthStateUndetected, - HealthDescription: "exec driver unsupported on client OS", - } - } - - fp := &drivers.Fingerprint{ - Attributes: map[string]*pstructs.Attribute{}, - Health: drivers.HealthStateHealthy, - HealthDescription: drivers.DriverHealthy, - } - - if !utils.IsUnixRoot() { - fp.Health = drivers.HealthStateUndetected - fp.HealthDescription = drivers.DriverRequiresRootMessage - d.setFingerprintFailure() - return fp - } - - mount, err := cgutil.FindCgroupMountpointDir() - if err != nil { - fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = drivers.NoCgroupMountMessage - if d.fingerprintSuccessful() { - d.logger.Warn(fp.HealthDescription, "error", err) - } - d.setFingerprintFailure() - return fp - } - - if mount == "" { - fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = drivers.CgroupMountEmpty - d.setFingerprintFailure() - return fp - } - - fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true) - d.setFingerprintSuccess() - return fp -} - -func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { - if handle == nil { - return fmt.Errorf("handle cannot be nil") - } - - // If already attached to handle there's nothing to recover. - if _, ok := d.tasks.Get(handle.Config.ID); ok { - d.logger.Trace("nothing to recover; task already exists", - "task_id", handle.Config.ID, - "task_name", handle.Config.Name, - ) - return nil - } - - // Handle doesn't already exist, try to reattach - var taskState TaskState - if err := handle.GetDriverState(&taskState); err != nil { - d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) - return fmt.Errorf("failed to decode task state from handle: %v", err) - } - - // Create new executor - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) - - h := &taskHandle{ - exec: exec, - pid: taskState.Pid, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - logger: d.logger, - } - - d.tasks.Set(taskState.TaskConfig.ID, h) - - go h.run() - return nil -} - -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { - if _, ok := d.tasks.Get(cfg.ID); ok { - return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) - } - - var driverConfig TaskConfig - if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { - return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) - } - - if err := driverConfig.validate(); err != nil { - return nil, nil, fmt.Errorf("failed driver config validation: %v", err) - } - - d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) - handle := drivers.NewTaskHandle(taskHandleVersion) - handle.Config = cfg - - exec := executor.NewExecutorWithIsolation( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),) - - user := cfg.User - if user == "" { - user = "0" - } - - if cfg.DNS != nil { - dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) - if err != nil { - return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err) - } - cfg.Mounts = append(cfg.Mounts, dnsMount) - } - - // Bind mounts specified in driver config - if d.config.Bind != nil { - for host, task := range d.config.Bind { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: false, - PropagationMode: "private", - } - d.logger.Info("adding RW mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - if d.config.BindReadOnly != nil { - for host, task := range d.config.BindReadOnly { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: true, - PropagationMode: "private", - } - d.logger.Info("adding RO mount from driver config", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - - // Bind mounts specified in task config - if driverConfig.Bind != nil { - for host, task := range driverConfig.Bind { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: false, - PropagationMode: "private", - } - d.logger.Info("adding RW mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - if driverConfig.BindReadOnly != nil { - for host, task := range driverConfig.BindReadOnly { - mount_config := drivers.MountConfig{ - TaskPath: task, - HostPath: host, - Readonly: true, - PropagationMode: "private", - } - d.logger.Info("adding RO mount from task spec", "mount_config", hclog.Fmt("%+v", mount_config)) - cfg.Mounts = append(cfg.Mounts, &mount_config) - } - } - - caps, err := capabilities.Calculate( - capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop, - ) - if err != nil { - return nil, nil, err - } - d.logger.Debug("task capabilities", "capabilities", caps) - - execCmd := &executor.ExecCommand{ - Cmd: driverConfig.Command, - Args: driverConfig.Args, - Env: cfg.EnvList(), - User: user, - ResourceLimits: true, - NoPivotRoot: d.config.NoPivotRoot, - Resources: cfg.Resources, - TaskDir: cfg.TaskDir().Dir, - StdoutPath: cfg.StdoutPath, - StderrPath: cfg.StderrPath, - Mounts: cfg.Mounts, - Devices: cfg.Devices, - NetworkIsolation: cfg.NetworkIsolation, - ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID), - ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC), - Capabilities: caps, - } - - d.logger.Info("launching with", "exec_cmd", hclog.Fmt("%+v", execCmd)) - - ps, err := exec.Launch(execCmd) - if err != nil { - return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) - } - - h := &taskHandle{ - exec: exec, - pid: ps.Pid, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, - } - - driverState := TaskState{ - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, - } - - if err := handle.SetDriverState(&driverState); err != nil { - d.logger.Error("failed to start task, error setting driver state", "error", err) - _ = exec.Shutdown("", 0) - return nil, nil, fmt.Errorf("failed to set driver state: %v", err) - } - - d.tasks.Set(cfg.ID, h) - go h.run() - return handle, nil, nil -} - -func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - ch := make(chan *drivers.ExitResult) - go d.handleWait(ctx, handle, ch) - - return ch, nil -} - -func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { - defer close(ch) - var result *drivers.ExitResult - ps, err := handle.exec.Wait(ctx) - if err != nil { - result = &drivers.ExitResult{ - Err: fmt.Errorf("executor: error waiting on process: %v", err), - } - } else { - result = &drivers.ExitResult{ - ExitCode: ps.ExitCode, - Signal: ps.Signal, - } - } - - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case ch <- result: - } -} - -func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - if err := handle.exec.Shutdown(signal, timeout); err != nil { - return fmt.Errorf("executor Shutdown failed: %v", err) - } - - return nil -} - -// resetCgroup will re-create the v2 cgroup for the task after the task has been -// destroyed by libcontainer. In the case of a task restart we call DestroyTask -// which removes the cgroup - but we still need it! -// -// Ideally the cgroup management would be more unified - and we could do the creation -// on a task runner pre-start hook, eliminating the need for this hack. -func (d *Driver) resetCgroup(handle *taskHandle) { - if cgutil.UseV2 { - if handle.taskConfig.Resources != nil && - handle.taskConfig.Resources.LinuxResources != nil && - handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" { - err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755) - if err != nil { - d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath) - } - } - } -} - -func (d *Driver) DestroyTask(taskID string, force bool) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - if handle.IsRunning() && !force { - return fmt.Errorf("cannot destroy running task") - } - - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "error", err) - } - - // workaround for the case where DestroyTask was issued on task restart - d.resetCgroup(handle) - - d.tasks.Delete(taskID) - return nil -} - -func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - return handle.TaskStatus(), nil -} - -func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - return handle.exec.Stats(ctx, interval) -} - -func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { - return d.eventer.TaskEvents(ctx) -} - -func (d *Driver) SignalTask(taskID string, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - sig := os.Interrupt - if s, ok := signals.SignalLookup[signal]; ok { - sig = s - } else { - d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) - - } - return handle.exec.Signal(sig) -} - -func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - if len(cmd) == 0 { - return nil, fmt.Errorf("error cmd must have at least one value") - } - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - args := []string{} - if len(cmd) > 1 { - args = cmd[1:] - } - - out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) - if err != nil { - return nil, err - } - - return &drivers.ExecTaskResult{ - Stdout: out, - ExitResult: &drivers.ExitResult{ - ExitCode: exitCode, - }, - }, nil -} - -var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) - -func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, - taskID string, - command []string, - tty bool, - stream drivers.ExecTaskStream) error { - - if len(command) == 0 { - return fmt.Errorf("error cmd must have at least one value") - } - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - return handle.exec.ExecStreaming(ctx, command, tty, stream) -} |