aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec2/driver.go812
-rw-r--r--exec2/handle.go35
-rwxr-xr-xexec2/pull-upstream.sh7
-rw-r--r--exec2/state.go5
4 files changed, 466 insertions, 393 deletions
diff --git a/exec2/driver.go b/exec2/driver.go
index 312cb76..f035d15 100644
--- a/exec2/driver.go
+++ b/exec2/driver.go
@@ -1,276 +1,316 @@
-package hello
+package exec
import (
"context"
- "errors"
"fmt"
"os"
- "os/exec"
"path/filepath"
- "regexp"
+ "runtime"
+ "sync"
"time"
"github.com/hashicorp/consul-template/signals"
- "github.com/hashicorp/go-hclog"
+ hclog "github.com/hashicorp/go-hclog"
+ "github.com/hashicorp/nomad/client/lib/cgutil"
+ "github.com/hashicorp/nomad/drivers/shared/capabilities"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
+ "github.com/hashicorp/nomad/drivers/shared/resolvconf"
+ "github.com/hashicorp/nomad/helper/pluginutils/loader"
+ "github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
+ "github.com/hashicorp/nomad/plugins/drivers/utils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
- "github.com/hashicorp/nomad/plugins/shared/structs"
+ pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)
const (
// pluginName is the name of the plugin
- // this is used for logging and (along with the version) for uniquely
- // identifying plugin binaries fingerprinted by the client
- pluginName = "hello-world-example"
+ pluginName = "exec"
- // pluginVersion allows the client to identify and use newer versions of
- // an installed plugin
- pluginVersion = "v0.1.0"
-
- // fingerprintPeriod is the interval at which the plugin will send
- // fingerprint responses
+ // fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
- // taskHandleVersion is the version of task handle which this plugin sets
- // and understands how to decode
- // this is used to allow modification and migration of the task schema
- // used by the plugin
+ // taskHandleVersion is the version of task handle which this driver sets
+ // and understands how to decode driver state
taskHandleVersion = 1
)
var (
- // pluginInfo describes the plugin
+ // PluginID is the exec plugin metadata registered in the plugin
+ // catalog.
+ PluginID = loader.PluginID{
+ Name: pluginName,
+ PluginType: base.PluginTypeDriver,
+ }
+
+ // PluginConfig is the exec driver factory function registered in the
+ // plugin catalog.
+ PluginConfig = &loader.InternalPluginConfig{
+ Config: map[string]interface{}{},
+ Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
+ }
+
+ // pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersions: []string{drivers.ApiVersion010},
- PluginVersion: pluginVersion,
+ PluginVersion: "0.1.0",
Name: pluginName,
}
- // configSpec is the specification of the plugin's configuration
- // this is used to validate the configuration specified for the plugin
- // on the client.
- // this is not global, but can be specified on a per-client basis.
+ // configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
- // TODO: define plugin's agent configuration schema.
- //
- // The schema should be defined using HCL specs and it will be used to
- // validate the agent configuration provided by the user in the
- // `plugin` stanza (https://www.nomadproject.io/docs/configuration/plugin.html).
- //
- // For example, for the schema below a valid configuration would be:
- //
- // plugin "hello-driver-plugin" {
- // config {
- // shell = "fish"
- // }
- // }
- "shell": hclspec.NewDefault(
- hclspec.NewAttr("shell", "string", false),
- hclspec.NewLiteral(`"bash"`),
+ "no_pivot_root": hclspec.NewDefault(
+ hclspec.NewAttr("no_pivot_root", "bool", false),
+ hclspec.NewLiteral("false"),
+ ),
+ "default_pid_mode": hclspec.NewDefault(
+ hclspec.NewAttr("default_pid_mode", "string", false),
+ hclspec.NewLiteral(`"private"`),
+ ),
+ "default_ipc_mode": hclspec.NewDefault(
+ hclspec.NewAttr("default_ipc_mode", "string", false),
+ hclspec.NewLiteral(`"private"`),
+ ),
+ "allow_caps": hclspec.NewDefault(
+ hclspec.NewAttr("allow_caps", "list(string)", false),
+ hclspec.NewLiteral(capabilities.HCLSpecLiteral),
),
})
- // taskConfigSpec is the specification of the plugin's configuration for
- // a task
- // this is used to validated the configuration specified for the plugin
- // when a job is submitted.
+ // taskConfigSpec is the hcl specification for the driver config section of
+ // a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
- // TODO: define plugin's task configuration schema
- //
- // The schema should be defined using HCL specs and it will be used to
- // validate the task configuration provided by the user when they
- // submit a job.
- //
- // For example, for the schema below a valid task would be:
- // job "example" {
- // group "example" {
- // task "say-hi" {
- // driver = "hello-driver-plugin"
- // config {
- // greeting = "Hi"
- // }
- // }
- // }
- // }
- "greeting": hclspec.NewDefault(
- hclspec.NewAttr("greeting", "string", false),
- hclspec.NewLiteral(`"Hello, World!"`),
- ),
+ "command": hclspec.NewAttr("command", "string", true),
+ "args": hclspec.NewAttr("args", "list(string)", false),
+ "pid_mode": hclspec.NewAttr("pid_mode", "string", false),
+ "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false),
+ "cap_add": hclspec.NewAttr("cap_add", "list(string)", false),
+ "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false),
})
- // capabilities indicates what optional features this driver supports
- // this should be set according to the target run time.
- capabilities = &drivers.Capabilities{
- // TODO: set plugin's capabilities
- //
- // The plugin's capabilities signal Nomad which extra functionalities
- // are supported. For a list of available options check the docs page:
- // https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities
+ // driverCapabilities represents the RPC response for what features are
+ // implemented by the exec task driver
+ driverCapabilities = &drivers.Capabilities{
SendSignals: true,
- Exec: false,
+ Exec: true,
+ FSIsolation: drivers.FSIsolationChroot,
+ NetIsolationModes: []drivers.NetIsolationMode{
+ drivers.NetIsolationModeHost,
+ drivers.NetIsolationModeGroup,
+ },
+ MountConfigs: drivers.MountConfigSupportAll,
}
)
-// Config contains configuration information for the plugin
-type Config struct {
- // TODO: create decoded plugin configuration struct
- //
- // This struct is the decoded version of the schema defined in the
- // configSpec variable above. It's used to convert the HCL configuration
- // passed by the Nomad agent into Go contructs.
- Shell string `codec:"shell"`
-}
-
-// TaskConfig contains configuration information for a task that runs with
-// this plugin
-type TaskConfig struct {
- // TODO: create decoded plugin task configuration struct
- //
- // This struct is the decoded version of the schema defined in the
- // taskConfigSpec variable above. It's used to convert the string
- // configuration for the task into Go contructs.
- Greeting string `codec:"greeting"`
-}
-
-// TaskState is the runtime state which is encoded in the handle returned to
-// Nomad client.
-// This information is needed to rebuild the task state and handler during
-// recovery.
-type TaskState struct {
- ReattachConfig *structs.ReattachConfig
- TaskConfig *drivers.TaskConfig
- StartedAt time.Time
-
- // TODO: add any extra important values that must be persisted in order
- // to restore a task.
- //
- // The plugin keeps track of its running tasks in a in-memory data
- // structure. If the plugin crashes, this data will be lost, so Nomad
- // will respawn a new instance of the plugin and try to restore its
- // in-memory representation of the running tasks using the RecoverTask()
- // method below.
- Pid int
-}
-
-// HelloDriverPlugin is an example driver plugin. When provisioned in a job,
-// the taks will output a greet specified by the user.
-type HelloDriverPlugin struct {
+// Driver fork/execs tasks using many of the underlying OS's isolation
+// features where configured.
+type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
- // config is the plugin configuration set by the SetConfig RPC
- config *Config
+ // config is the driver configuration set by the SetConfig RPC
+ config Config
- // nomadConfig is the client config from Nomad
+ // nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
- // tasks is the in memory datastore mapping taskIDs to driver handles
+ // tasks is the in memory datastore mapping taskIDs to driverHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
- // signalShutdown is called when the driver is shutting down and cancels
- // the ctx passed to any subsystems
- signalShutdown context.CancelFunc
-
// logger will log to the Nomad agent
logger hclog.Logger
+
+ // A tri-state boolean to know if the fingerprinting has happened and
+ // whether it has been successful
+ fingerprintSuccess *bool
+ fingerprintLock sync.Mutex
}
-// NewPlugin returns a new example driver plugin
-func NewPlugin(logger hclog.Logger) drivers.DriverPlugin {
- ctx, cancel := context.WithCancel(context.Background())
- logger = logger.Named(pluginName)
+// Config is the driver configuration set by the SetConfig RPC call
+type Config struct {
+ // NoPivotRoot disables the use of pivot_root, useful when the root partition
+ // is on ramdisk
+ NoPivotRoot bool `codec:"no_pivot_root"`
+
+ // DefaultModePID is the default PID isolation set for all tasks using
+ // exec-based task drivers.
+ DefaultModePID string `codec:"default_pid_mode"`
+
+ // DefaultModeIPC is the default IPC isolation set for all tasks using
+ // exec-based task drivers.
+ DefaultModeIPC string `codec:"default_ipc_mode"`
+
+ // AllowCaps configures which Linux Capabilities are enabled for tasks
+ // running on this node.
+ AllowCaps []string `codec:"allow_caps"`
+}
+
+func (c *Config) validate() error {
+ switch c.DefaultModePID {
+ case executor.IsolationModePrivate, executor.IsolationModeHost:
+ default:
+ return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID)
+ }
+
+ switch c.DefaultModeIPC {
+ case executor.IsolationModePrivate, executor.IsolationModeHost:
+ default:
+ return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC)
+ }
+
+ badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps))
+ if !badCaps.Empty() {
+ return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps)
+ }
+
+ return nil
+}
+
+// TaskConfig is the driver configuration of a task within a job
+type TaskConfig struct {
+ // Command is the thing to exec.
+ Command string `codec:"command"`
+
+ // Args are passed along to Command.
+ Args []string `codec:"args"`
+
+ // ModePID indicates whether PID namespace isolation is enabled for the task.
+ // Must be "private" or "host" if set.
+ ModePID string `codec:"pid_mode"`
+
+ // ModeIPC indicates whether IPC namespace isolation is enabled for the task.
+ // Must be "private" or "host" if set.
+ ModeIPC string `codec:"ipc_mode"`
+
+ // CapAdd is a set of linux capabilities to enable.
+ CapAdd []string `codec:"cap_add"`
+
+ // CapDrop is a set of linux capabilities to disable.
+ CapDrop []string `codec:"cap_drop"`
+}
+
+func (tc *TaskConfig) validate() error {
+ switch tc.ModePID {
+ case "", executor.IsolationModePrivate, executor.IsolationModeHost:
+ default:
+ return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID)
+ }
+
+ switch tc.ModeIPC {
+ case "", executor.IsolationModePrivate, executor.IsolationModeHost:
+ default:
+ return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC)
+ }
+
+ supported := capabilities.Supported()
+ badAdds := supported.Difference(capabilities.New(tc.CapAdd))
+ if !badAdds.Empty() {
+ return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds)
+ }
+ badDrops := supported.Difference(capabilities.New(tc.CapDrop))
+ if !badDrops.Empty() {
+ return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops)
+ }
- return &HelloDriverPlugin{
- eventer: eventer.NewEventer(ctx, logger),
- config: &Config{},
- tasks: newTaskStore(),
- ctx: ctx,
- signalShutdown: cancel,
- logger: logger,
+ return nil
+}
+
+// TaskState is the state which is encoded in the handle returned in
+// StartTask. This information is needed to rebuild the task state and handler
+// during recovery.
+type TaskState struct {
+ ReattachConfig *pstructs.ReattachConfig
+ TaskConfig *drivers.TaskConfig
+ Pid int
+ StartedAt time.Time
+}
+
+// NewExecDriver returns a new DrivePlugin implementation
+func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
+ logger = logger.Named(pluginName)
+ return &Driver{
+ eventer: eventer.NewEventer(ctx, logger),
+ tasks: newTaskStore(),
+ ctx: ctx,
+ logger: logger,
}
}
-// PluginInfo returns information describing the plugin.
-func (d *HelloDriverPlugin) PluginInfo() (*base.PluginInfoResponse, error) {
+// setFingerprintSuccess marks the driver as having fingerprinted successfully
+func (d *Driver) setFingerprintSuccess() {
+ d.fingerprintLock.Lock()
+ d.fingerprintSuccess = pointer.Of(true)
+ d.fingerprintLock.Unlock()
+}
+
+// setFingerprintFailure marks the driver as having failed fingerprinting
+func (d *Driver) setFingerprintFailure() {
+ d.fingerprintLock.Lock()
+ d.fingerprintSuccess = pointer.Of(false)
+ d.fingerprintLock.Unlock()
+}
+
+// fingerprintSuccessful returns true if the driver has
+// never fingerprinted or has successfully fingerprinted
+func (d *Driver) fingerprintSuccessful() bool {
+ d.fingerprintLock.Lock()
+ defer d.fingerprintLock.Unlock()
+ return d.fingerprintSuccess == nil || *d.fingerprintSuccess
+}
+
+func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
-// ConfigSchema returns the plugin configuration schema.
-func (d *HelloDriverPlugin) ConfigSchema() (*hclspec.Spec, error) {
+func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
-// SetConfig is called by the client to pass the configuration for the plugin.
-func (d *HelloDriverPlugin) SetConfig(cfg *base.Config) error {
+func (d *Driver) SetConfig(cfg *base.Config) error {
+ // unpack, validate, and set agent plugin config
var config Config
if len(cfg.PluginConfig) != 0 {
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
return err
}
}
-
- // Save the configuration to the plugin
- d.config = &config
-
- // TODO: parse and validated any configuration value if necessary.
- //
- // If your driver agent configuration requires any complex validation
- // (some dependency between attributes) or special data parsing (the
- // string "10s" into a time.Interval) you can do it here and update the
- // value in d.config.
- //
- // In the example below we check if the shell specified by the user is
- // supported by the plugin.
- shell := d.config.Shell
- if shell != "bash" && shell != "fish" {
- return fmt.Errorf("invalid shell %s", d.config.Shell)
+ if err := config.validate(); err != nil {
+ return err
}
+ d.config = config
- // Save the Nomad agent configuration
- if cfg.AgentConfig != nil {
+ if cfg != nil && cfg.AgentConfig != nil {
d.nomadConfig = cfg.AgentConfig.Driver
}
-
- // TODO: initialize any extra requirements if necessary.
- //
- // Here you can use the config values to initialize any resources that are
- // shared by all tasks that use this driver, such as a daemon process.
-
return nil
}
-// TaskConfigSchema returns the HCL schema for the configuration of a task.
-func (d *HelloDriverPlugin) TaskConfigSchema() (*hclspec.Spec, error) {
+func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
-// Capabilities returns the features supported by the driver.
-func (d *HelloDriverPlugin) Capabilities() (*drivers.Capabilities, error) {
- return capabilities, nil
+// Capabilities is returned by the Capabilities RPC and indicates what
+// optional features this driver supports
+func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
+ return driverCapabilities, nil
}
-// Fingerprint returns a channel that will be used to send health information
-// and other driver specific node attributes.
-func (d *HelloDriverPlugin) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
+func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
-}
-// handleFingerprint manages the channel and the flow of fingerprint data.
-func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
+}
+func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
-
- // Nomad expects the initial fingerprint to be sent immediately
ticker := time.NewTimer(0)
for {
select {
@@ -279,65 +319,110 @@ func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *dr
case <-d.ctx.Done():
return
case <-ticker.C:
- // after the initial fingerprint we can set the proper fingerprint
- // period
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
-// buildFingerprint returns the driver's fingerprint data
-func (d *HelloDriverPlugin) buildFingerprint() *drivers.Fingerprint {
+func (d *Driver) buildFingerprint() *drivers.Fingerprint {
+ if runtime.GOOS != "linux" {
+ d.setFingerprintFailure()
+ return &drivers.Fingerprint{
+ Health: drivers.HealthStateUndetected,
+ HealthDescription: "exec driver unsupported on client OS",
+ }
+ }
+
fp := &drivers.Fingerprint{
- Attributes: map[string]*structs.Attribute{},
+ Attributes: map[string]*pstructs.Attribute{},
Health: drivers.HealthStateHealthy,
HealthDescription: drivers.DriverHealthy,
}
- // TODO: implement fingerprinting logic to populate health and driver
- // attributes.
- //
- // Fingerprinting is used by the plugin to relay two important information
- // to Nomad: health state and node attributes.
- //
- // If the plugin reports to be unhealthy, or doesn't send any fingerprint
- // data in the expected interval of time, Nomad will restart it.
- //
- // Node attributes can be used to report any relevant information about
- // the node in which the plugin is running (specific library availability,
- // installed versions of a software etc.). These attributes can then be
- // used by an operator to set job constrains.
- //
- // In the example below we check if the shell specified by the user exists
- // in the node.
- shell := d.config.Shell
-
- cmd := exec.Command("which", shell)
- if err := cmd.Run(); err != nil {
- return &drivers.Fingerprint{
- Health: drivers.HealthStateUndetected,
- HealthDescription: fmt.Sprintf("shell %s not found", shell),
- }
+ if !utils.IsUnixRoot() {
+ fp.Health = drivers.HealthStateUndetected
+ fp.HealthDescription = drivers.DriverRequiresRootMessage
+ d.setFingerprintFailure()
+ return fp
}
- // We also set the shell and its version as attributes
- cmd = exec.Command(shell, "--version")
- if out, err := cmd.Output(); err != nil {
- d.logger.Warn("failed to find shell version: %v", err)
- } else {
- re := regexp.MustCompile("[0-9]\\.[0-9]\\.[0-9]")
- version := re.FindString(string(out))
+ mount, err := cgutil.FindCgroupMountpointDir()
+ if err != nil {
+ fp.Health = drivers.HealthStateUnhealthy
+ fp.HealthDescription = drivers.NoCgroupMountMessage
+ if d.fingerprintSuccessful() {
+ d.logger.Warn(fp.HealthDescription, "error", err)
+ }
+ d.setFingerprintFailure()
+ return fp
+ }
- fp.Attributes["driver.hello.shell_version"] = structs.NewStringAttribute(version)
- fp.Attributes["driver.hello.shell"] = structs.NewStringAttribute(shell)
+ if mount == "" {
+ fp.Health = drivers.HealthStateUnhealthy
+ fp.HealthDescription = drivers.CgroupMountEmpty
+ d.setFingerprintFailure()
+ return fp
}
+ fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true)
+ d.setFingerprintSuccess()
return fp
}
-// StartTask returns a task handle and a driver network if necessary.
-func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
+func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
+ if handle == nil {
+ return fmt.Errorf("handle cannot be nil")
+ }
+
+ // If already attached to handle there's nothing to recover.
+ if _, ok := d.tasks.Get(handle.Config.ID); ok {
+ d.logger.Trace("nothing to recover; task already exists",
+ "task_id", handle.Config.ID,
+ "task_name", handle.Config.Name,
+ )
+ return nil
+ }
+
+ // Handle doesn't already exist, try to reattach
+ var taskState TaskState
+ if err := handle.GetDriverState(&taskState); err != nil {
+ d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
+ return fmt.Errorf("failed to decode task state from handle: %v", err)
+ }
+
+ // Create client for reattached executor
+ plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
+ if err != nil {
+ d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
+ return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
+ }
+
+ exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
+ d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
+ if err != nil {
+ d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
+ return fmt.Errorf("failed to reattach to executor: %v", err)
+ }
+
+ h := &taskHandle{
+ exec: exec,
+ pid: taskState.Pid,
+ pluginClient: pluginClient,
+ taskConfig: taskState.TaskConfig,
+ procState: drivers.TaskStateRunning,
+ startedAt: taskState.StartedAt,
+ exitResult: &drivers.ExitResult{},
+ logger: d.logger,
+ }
+
+ d.tasks.Set(taskState.TaskConfig.ID, h)
+
+ go h.run()
+ return nil
+}
+
+func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
@@ -347,42 +432,66 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
+ if err := driverConfig.validate(); err != nil {
+ return nil, nil, fmt.Errorf("failed driver config validation: %v", err)
+ }
+
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
- // TODO: implement driver specific mechanism to start the task.
- //
- // Once the task is started you will need to store any relevant runtime
- // information in a taskHandle and TaskState. The taskHandle will be
- // stored in-memory in the plugin and will be used to interact with the
- // task.
- //
- // The TaskState will be returned to the Nomad client inside a
- // drivers.TaskHandle instance. This TaskHandle will be sent back to plugin
- // if the task ever needs to be recovered, so the TaskState should contain
- // enough information to handle that.
- //
- // In the example below we use an executor to fork a process to run our
- // greeter. The executor is then stored in the handle so we can access it
- // later and the the plugin.Client is used to generate a reattach
- // configuration that can be used to recover communication with the task.
+ pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &executor.ExecutorConfig{
- LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"),
- LogLevel: "debug",
+ LogFile: pluginLogFile,
+ LogLevel: "debug",
+ FSIsolation: true,
}
- exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig)
+ exec, pluginClient, err := executor.CreateExecutor(
+ d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
+ d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
- echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Greeting)
+ user := cfg.User
+ if user == "" {
+ user = "nobody"
+ }
+
+ if cfg.DNS != nil {
+ dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err)
+ }
+ cfg.Mounts = append(cfg.Mounts, dnsMount)
+ }
+
+ caps, err := capabilities.Calculate(
+ capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop,
+ )
+ if err != nil {
+ return nil, nil, err
+ }
+ d.logger.Debug("task capabilities", "capabilities", caps)
+
execCmd := &executor.ExecCommand{
- Cmd: d.config.Shell,
- Args: []string{"-c", echoCmd},
- StdoutPath: cfg.StdoutPath,
- StderrPath: cfg.StderrPath,
+ Cmd: driverConfig.Command,
+ Args: driverConfig.Args,
+ Env: cfg.EnvList(),
+ User: user,
+ ResourceLimits: true,
+ NoPivotRoot: d.config.NoPivotRoot,
+ Resources: cfg.Resources,
+ TaskDir: cfg.TaskDir().Dir,
+ StdoutPath: cfg.StdoutPath,
+ StderrPath: cfg.StderrPath,
+ Mounts: cfg.Mounts,
+ Devices: cfg.Devices,
+ NetworkIsolation: cfg.NetworkIsolation,
+ ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID),
+ ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC),
+ Capabilities: caps,
}
ps, err := exec.Launch(execCmd)
@@ -402,13 +511,16 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan
}
driverState := TaskState{
- ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
+ ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
+ d.logger.Error("failed to start task, error setting driver state", "error", err)
+ _ = exec.Shutdown("", 0)
+ pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
@@ -417,61 +529,7 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan
return handle, nil, nil
}
-// RecoverTask recreates the in-memory state of a task from a TaskHandle.
-func (d *HelloDriverPlugin) RecoverTask(handle *drivers.TaskHandle) error {
- if handle == nil {
- return errors.New("error: handle cannot be nil")
- }
-
- if _, ok := d.tasks.Get(handle.Config.ID); ok {
- return nil
- }
-
- var taskState TaskState
- if err := handle.GetDriverState(&taskState); err != nil {
- return fmt.Errorf("failed to decode task state from handle: %v", err)
- }
-
- var driverConfig TaskConfig
- if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
- return fmt.Errorf("failed to decode driver config: %v", err)
- }
-
- // TODO: implement driver specific logic to recover a task.
- //
- // Recovering a task involves recreating and storing a taskHandle as if the
- // task was just started.
- //
- // In the example below we use the executor to re-attach to the process
- // that was created when the task first started.
- plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
- if err != nil {
- return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
- }
-
- execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger)
- if err != nil {
- return fmt.Errorf("failed to reattach to executor: %v", err)
- }
-
- h := &taskHandle{
- exec: execImpl,
- pid: taskState.Pid,
- pluginClient: pluginClient,
- taskConfig: taskState.TaskConfig,
- procState: drivers.TaskStateRunning,
- startedAt: taskState.StartedAt,
- exitResult: &drivers.ExitResult{},
- }
-
- d.tasks.Set(taskState.TaskConfig.ID, h)
-
- go h.run()
- return nil
-}
-
-// WaitTask returns a channel used to notify Nomad when a task exits.
-func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
+func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
@@ -479,23 +537,13 @@ func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
+
return ch, nil
}
-func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
+func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
-
- // TODO: implement driver specific logic to notify Nomad the task has been
- // completed and what was the exit result.
- //
- // When a result is sent in the result channel Nomad will stop the task and
- // emit an event that an operator can use to get an insight on why the task
- // stopped.
- //
- // In the example below we block and wait until the executor finishes
- // running, at which point we send the exit code and signal in the result
- // channel.
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
@@ -508,33 +556,21 @@ func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle,
}
}
- for {
- select {
- case <-ctx.Done():
- return
- case <-d.ctx.Done():
- return
- case ch <- result:
- }
+ select {
+ case <-ctx.Done():
+ return
+ case <-d.ctx.Done():
+ return
+ case ch <- result:
}
}
-// StopTask stops a running task with the given signal and within the timeout window.
-func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signal string) error {
+func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
- // TODO: implement driver specific logic to stop a task.
- //
- // The StopTask function is expected to stop a running task by sending the
- // given signal to it. If the task does not stop during the given timeout,
- // the driver must forcefully kill the task.
- //
- // In the example below we let the executor handle the task shutdown
- // process for us, but you might need to customize this for your own
- // implementation.
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
@@ -545,39 +581,51 @@ func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signa
return nil
}
-// DestroyTask cleans up and removes a task that has terminated.
-func (d *HelloDriverPlugin) DestroyTask(taskID string, force bool) error {
+// resetCgroup will re-create the v2 cgroup for the task after the task has been
+// destroyed by libcontainer. In the case of a task restart we call DestroyTask
+// which removes the cgroup - but we still need it!
+//
+// Ideally the cgroup management would be more unified - and we could do the creation
+// on a task runner pre-start hook, eliminating the need for this hack.
+func (d *Driver) resetCgroup(handle *taskHandle) {
+ if cgutil.UseV2 {
+ if handle.taskConfig.Resources != nil &&
+ handle.taskConfig.Resources.LinuxResources != nil &&
+ handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" {
+ err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755)
+ if err != nil {
+ d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath)
+ }
+ }
+ }
+}
+
+func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
- return errors.New("cannot destroy running task")
+ return fmt.Errorf("cannot destroy running task")
}
- // TODO: implement driver specific logic to destroy a complete task.
- //
- // Destroying a task includes removing any resources used by task and any
- // local references in the plugin. If force is set to true the task should
- // be destroyed even if it's currently running.
- //
- // In the example below we use the executor to force shutdown the task
- // (timeout equals 0).
if !handle.pluginClient.Exited() {
if err := handle.exec.Shutdown("", 0); err != nil {
- handle.logger.Error("destroying executor failed", "err", err)
+ handle.logger.Error("destroying executor failed", "error", err)
}
handle.pluginClient.Kill()
}
+ // workaround for the case where DestroyTask was issued on task restart
+ d.resetCgroup(handle)
+
d.tasks.Delete(taskID)
return nil
}
-// InspectTask returns detailed status information for the referenced taskID.
-func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, error) {
+func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
@@ -586,42 +634,25 @@ func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, err
return handle.TaskStatus(), nil
}
-// TaskStats returns a channel which the driver should send stats to at the given interval.
-func (d *HelloDriverPlugin) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
+func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
- // TODO: implement driver specific logic to send task stats.
- //
- // This function returns a channel that Nomad will use to listen for task
- // stats (e.g., CPU and memory usage) in a given interval. It should send
- // stats until the context is canceled or the task stops running.
- //
- // In the example below we use the Stats function provided by the executor,
- // but you can build a set of functions similar to the fingerprint process.
return handle.exec.Stats(ctx, interval)
}
-// TaskEvents returns a channel that the plugin can use to emit task related events.
-func (d *HelloDriverPlugin) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
+func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
-// SignalTask forwards a signal to a task.
-// This is an optional capability.
-func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error {
+func (d *Driver) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
- // TODO: implement driver specific signal handling logic.
- //
- // The given signal must be forwarded to the target taskID. If this plugin
- // doesn't support receiving signals (capability SendSignals is set to
- // false) you can just return nil.
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
sig = s
@@ -632,9 +663,48 @@ func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error {
return handle.exec.Signal(sig)
}
-// ExecTask returns the result of executing the given command inside a task.
-// This is an optional capability.
-func (d *HelloDriverPlugin) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
- // TODO: implement driver specific logic to execute commands in a task.
- return nil, errors.New("This driver does not support exec")
+func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
+ if len(cmd) == 0 {
+ return nil, fmt.Errorf("error cmd must have at least one value")
+ }
+ handle, ok := d.tasks.Get(taskID)
+ if !ok {
+ return nil, drivers.ErrTaskNotFound
+ }
+
+ args := []string{}
+ if len(cmd) > 1 {
+ args = cmd[1:]
+ }
+
+ out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args)
+ if err != nil {
+ return nil, err
+ }
+
+ return &drivers.ExecTaskResult{
+ Stdout: out,
+ ExitResult: &drivers.ExitResult{
+ ExitCode: exitCode,
+ },
+ }, nil
+}
+
+var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)
+
+func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
+ taskID string,
+ command []string,
+ tty bool,
+ stream drivers.ExecTaskStream) error {
+
+ if len(command) == 0 {
+ return fmt.Errorf("error cmd must have at least one value")
+ }
+ handle, ok := d.tasks.Get(taskID)
+ if !ok {
+ return drivers.ErrTaskNotFound
+ }
+
+ return handle.exec.ExecStreaming(ctx, command, tty, stream)
}
diff --git a/exec2/handle.go b/exec2/handle.go
index 6dfb976..606406b 100644
--- a/exec2/handle.go
+++ b/exec2/handle.go
@@ -1,4 +1,4 @@
-package hello
+package exec
import (
"context"
@@ -6,30 +6,26 @@ import (
"sync"
"time"
- "github.com/hashicorp/go-hclog"
- "github.com/hashicorp/go-plugin"
+ hclog "github.com/hashicorp/go-hclog"
+ plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/drivers"
)
-// taskHandle should store all relevant runtime information
-// such as process ID if this is a local task or other meta
-// data if this driver deals with external APIs
type taskHandle struct {
- // stateLock syncs access to all fields below
- stateLock sync.RWMutex
-
- logger hclog.Logger
exec executor.Executor
+ pid int
pluginClient *plugin.Client
- taskConfig *drivers.TaskConfig
- procState drivers.TaskState
- startedAt time.Time
- completedAt time.Time
- exitResult *drivers.ExitResult
+ logger hclog.Logger
- // TODO: add any extra relevant information about the task.
- pid int
+ // stateLock syncs access to all fields below
+ stateLock sync.RWMutex
+
+ taskConfig *drivers.TaskConfig
+ procState drivers.TaskState
+ startedAt time.Time
+ completedAt time.Time
+ exitResult *drivers.ExitResult
}
func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
@@ -62,8 +58,9 @@ func (h *taskHandle) run() {
}
h.stateLock.Unlock()
- // TODO: wait for your task to complete and upate its state.
+ // Block until process exits
ps, err := h.exec.Wait(context.Background())
+
h.stateLock.Lock()
defer h.stateLock.Unlock()
@@ -77,4 +74,6 @@ func (h *taskHandle) run() {
h.exitResult.ExitCode = ps.ExitCode
h.exitResult.Signal = ps.Signal
h.completedAt = ps.Time
+
+ // TODO: detect if the task OOMed
}
diff --git a/exec2/pull-upstream.sh b/exec2/pull-upstream.sh
new file mode 100755
index 0000000..a797951
--- /dev/null
+++ b/exec2/pull-upstream.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+REF=v1.4.3
+
+wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/driver.go -O driver.go
+wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/handle.go -O handle.go
+wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/state.go -O state.go
diff --git a/exec2/state.go b/exec2/state.go
index 30f10d7..08cdee8 100644
--- a/exec2/state.go
+++ b/exec2/state.go
@@ -1,12 +1,9 @@
-package hello
+package exec
import (
"sync"
)
-// taskStore provides a mechanism to store and retrieve
-// task handles given a string identifier. The ID should
-// be unique per task
type taskStore struct {
store map[string]*taskHandle
lock sync.RWMutex