From 8977e5122acdb41e5e04f9d32338baf27fda6f49 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 28 Nov 2022 12:24:07 +0100 Subject: rename things --- .gitignore | 4 +- exec2/driver.go | 640 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ exec2/handle.go | 80 +++++++ exec2/state.go | 36 ++++ go.mod | 3 +- hello/driver.go | 640 -------------------------------------------------------- hello/handle.go | 80 ------- hello/state.go | 36 ---- main.go | 5 +- 9 files changed, 761 insertions(+), 763 deletions(-) create mode 100644 exec2/driver.go create mode 100644 exec2/handle.go create mode 100644 exec2/state.go delete mode 100644 hello/driver.go delete mode 100644 hello/handle.go delete mode 100644 hello/state.go diff --git a/.gitignore b/.gitignore index dd66f8f..39a5816 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -nomad-skeleton-driver-plugin -hello-driver +nomad-driver-exec2 +exec2-driver diff --git a/exec2/driver.go b/exec2/driver.go new file mode 100644 index 0000000..312cb76 --- /dev/null +++ b/exec2/driver.go @@ -0,0 +1,640 @@ +package hello + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "time" + + "github.com/hashicorp/consul-template/signals" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/structs" +) + +const ( + // pluginName is the name of the plugin + // this is used for logging and (along with the version) for uniquely + // identifying plugin binaries fingerprinted by the client + pluginName = "hello-world-example" + + // pluginVersion allows the client to identify and use newer versions of + // an installed plugin + pluginVersion = "v0.1.0" + + // fingerprintPeriod is the interval at which the plugin will send + // fingerprint responses + fingerprintPeriod = 30 * time.Second + + // taskHandleVersion is the version of task handle which this plugin sets + // and understands how to decode + // this is used to allow modification and migration of the task schema + // used by the plugin + taskHandleVersion = 1 +) + +var ( + // pluginInfo describes the plugin + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersions: []string{drivers.ApiVersion010}, + PluginVersion: pluginVersion, + Name: pluginName, + } + + // configSpec is the specification of the plugin's configuration + // this is used to validate the configuration specified for the plugin + // on the client. + // this is not global, but can be specified on a per-client basis. + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + // TODO: define plugin's agent configuration schema. + // + // The schema should be defined using HCL specs and it will be used to + // validate the agent configuration provided by the user in the + // `plugin` stanza (https://www.nomadproject.io/docs/configuration/plugin.html). + // + // For example, for the schema below a valid configuration would be: + // + // plugin "hello-driver-plugin" { + // config { + // shell = "fish" + // } + // } + "shell": hclspec.NewDefault( + hclspec.NewAttr("shell", "string", false), + hclspec.NewLiteral(`"bash"`), + ), + }) + + // taskConfigSpec is the specification of the plugin's configuration for + // a task + // this is used to validated the configuration specified for the plugin + // when a job is submitted. + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + // TODO: define plugin's task configuration schema + // + // The schema should be defined using HCL specs and it will be used to + // validate the task configuration provided by the user when they + // submit a job. + // + // For example, for the schema below a valid task would be: + // job "example" { + // group "example" { + // task "say-hi" { + // driver = "hello-driver-plugin" + // config { + // greeting = "Hi" + // } + // } + // } + // } + "greeting": hclspec.NewDefault( + hclspec.NewAttr("greeting", "string", false), + hclspec.NewLiteral(`"Hello, World!"`), + ), + }) + + // capabilities indicates what optional features this driver supports + // this should be set according to the target run time. + capabilities = &drivers.Capabilities{ + // TODO: set plugin's capabilities + // + // The plugin's capabilities signal Nomad which extra functionalities + // are supported. For a list of available options check the docs page: + // https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities + SendSignals: true, + Exec: false, + } +) + +// Config contains configuration information for the plugin +type Config struct { + // TODO: create decoded plugin configuration struct + // + // This struct is the decoded version of the schema defined in the + // configSpec variable above. It's used to convert the HCL configuration + // passed by the Nomad agent into Go contructs. + Shell string `codec:"shell"` +} + +// TaskConfig contains configuration information for a task that runs with +// this plugin +type TaskConfig struct { + // TODO: create decoded plugin task configuration struct + // + // This struct is the decoded version of the schema defined in the + // taskConfigSpec variable above. It's used to convert the string + // configuration for the task into Go contructs. + Greeting string `codec:"greeting"` +} + +// TaskState is the runtime state which is encoded in the handle returned to +// Nomad client. +// This information is needed to rebuild the task state and handler during +// recovery. +type TaskState struct { + ReattachConfig *structs.ReattachConfig + TaskConfig *drivers.TaskConfig + StartedAt time.Time + + // TODO: add any extra important values that must be persisted in order + // to restore a task. + // + // The plugin keeps track of its running tasks in a in-memory data + // structure. If the plugin crashes, this data will be lost, so Nomad + // will respawn a new instance of the plugin and try to restore its + // in-memory representation of the running tasks using the RecoverTask() + // method below. + Pid int +} + +// HelloDriverPlugin is an example driver plugin. When provisioned in a job, +// the taks will output a greet specified by the user. +type HelloDriverPlugin struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the plugin configuration set by the SetConfig RPC + config *Config + + // nomadConfig is the client config from Nomad + nomadConfig *base.ClientDriverConfig + + // tasks is the in memory datastore mapping taskIDs to driver handles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels + // the ctx passed to any subsystems + signalShutdown context.CancelFunc + + // logger will log to the Nomad agent + logger hclog.Logger +} + +// NewPlugin returns a new example driver plugin +func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { + ctx, cancel := context.WithCancel(context.Background()) + logger = logger.Named(pluginName) + + return &HelloDriverPlugin{ + eventer: eventer.NewEventer(ctx, logger), + config: &Config{}, + tasks: newTaskStore(), + ctx: ctx, + signalShutdown: cancel, + logger: logger, + } +} + +// PluginInfo returns information describing the plugin. +func (d *HelloDriverPlugin) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +// ConfigSchema returns the plugin configuration schema. +func (d *HelloDriverPlugin) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +// SetConfig is called by the client to pass the configuration for the plugin. +func (d *HelloDriverPlugin) SetConfig(cfg *base.Config) error { + var config Config + if len(cfg.PluginConfig) != 0 { + if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { + return err + } + } + + // Save the configuration to the plugin + d.config = &config + + // TODO: parse and validated any configuration value if necessary. + // + // If your driver agent configuration requires any complex validation + // (some dependency between attributes) or special data parsing (the + // string "10s" into a time.Interval) you can do it here and update the + // value in d.config. + // + // In the example below we check if the shell specified by the user is + // supported by the plugin. + shell := d.config.Shell + if shell != "bash" && shell != "fish" { + return fmt.Errorf("invalid shell %s", d.config.Shell) + } + + // Save the Nomad agent configuration + if cfg.AgentConfig != nil { + d.nomadConfig = cfg.AgentConfig.Driver + } + + // TODO: initialize any extra requirements if necessary. + // + // Here you can use the config values to initialize any resources that are + // shared by all tasks that use this driver, such as a daemon process. + + return nil +} + +// TaskConfigSchema returns the HCL schema for the configuration of a task. +func (d *HelloDriverPlugin) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +// Capabilities returns the features supported by the driver. +func (d *HelloDriverPlugin) Capabilities() (*drivers.Capabilities, error) { + return capabilities, nil +} + +// Fingerprint returns a channel that will be used to send health information +// and other driver specific node attributes. +func (d *HelloDriverPlugin) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil +} + +// handleFingerprint manages the channel and the flow of fingerprint data. +func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { + defer close(ch) + + // Nomad expects the initial fingerprint to be sent immediately + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + // after the initial fingerprint we can set the proper fingerprint + // period + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +// buildFingerprint returns the driver's fingerprint data +func (d *HelloDriverPlugin) buildFingerprint() *drivers.Fingerprint { + fp := &drivers.Fingerprint{ + Attributes: map[string]*structs.Attribute{}, + Health: drivers.HealthStateHealthy, + HealthDescription: drivers.DriverHealthy, + } + + // TODO: implement fingerprinting logic to populate health and driver + // attributes. + // + // Fingerprinting is used by the plugin to relay two important information + // to Nomad: health state and node attributes. + // + // If the plugin reports to be unhealthy, or doesn't send any fingerprint + // data in the expected interval of time, Nomad will restart it. + // + // Node attributes can be used to report any relevant information about + // the node in which the plugin is running (specific library availability, + // installed versions of a software etc.). These attributes can then be + // used by an operator to set job constrains. + // + // In the example below we check if the shell specified by the user exists + // in the node. + shell := d.config.Shell + + cmd := exec.Command("which", shell) + if err := cmd.Run(); err != nil { + return &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + HealthDescription: fmt.Sprintf("shell %s not found", shell), + } + } + + // We also set the shell and its version as attributes + cmd = exec.Command(shell, "--version") + if out, err := cmd.Output(); err != nil { + d.logger.Warn("failed to find shell version: %v", err) + } else { + re := regexp.MustCompile("[0-9]\\.[0-9]\\.[0-9]") + version := re.FindString(string(out)) + + fp.Attributes["driver.hello.shell_version"] = structs.NewStringAttribute(version) + fp.Attributes["driver.hello.shell"] = structs.NewStringAttribute(shell) + } + + return fp +} + +// StartTask returns a task handle and a driver network if necessary. +func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { + if _, ok := d.tasks.Get(cfg.ID); ok { + return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) + } + + d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) + handle := drivers.NewTaskHandle(taskHandleVersion) + handle.Config = cfg + + // TODO: implement driver specific mechanism to start the task. + // + // Once the task is started you will need to store any relevant runtime + // information in a taskHandle and TaskState. The taskHandle will be + // stored in-memory in the plugin and will be used to interact with the + // task. + // + // The TaskState will be returned to the Nomad client inside a + // drivers.TaskHandle instance. This TaskHandle will be sent back to plugin + // if the task ever needs to be recovered, so the TaskState should contain + // enough information to handle that. + // + // In the example below we use an executor to fork a process to run our + // greeter. The executor is then stored in the handle so we can access it + // later and the the plugin.Client is used to generate a reattach + // configuration that can be used to recover communication with the task. + executorConfig := &executor.ExecutorConfig{ + LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"), + LogLevel: "debug", + } + + exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } + + echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Greeting) + execCmd := &executor.ExecCommand{ + Cmd: d.config.Shell, + Args: []string{"-c", echoCmd}, + StdoutPath: cfg.StdoutPath, + StderrPath: cfg.StderrPath, + } + + ps, err := exec.Launch(execCmd) + if err != nil { + pluginClient.Kill() + return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) + } + + h := &taskHandle{ + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, + } + + driverState := TaskState{ + ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, + } + + if err := handle.SetDriverState(&driverState); err != nil { + return nil, nil, fmt.Errorf("failed to set driver state: %v", err) + } + + d.tasks.Set(cfg.ID, h) + go h.run() + return handle, nil, nil +} + +// RecoverTask recreates the in-memory state of a task from a TaskHandle. +func (d *HelloDriverPlugin) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { + return errors.New("error: handle cannot be nil") + } + + if _, ok := d.tasks.Get(handle.Config.ID); ok { + return nil + } + + var taskState TaskState + if err := handle.GetDriverState(&taskState); err != nil { + return fmt.Errorf("failed to decode task state from handle: %v", err) + } + + var driverConfig TaskConfig + if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { + return fmt.Errorf("failed to decode driver config: %v", err) + } + + // TODO: implement driver specific logic to recover a task. + // + // Recovering a task involves recreating and storing a taskHandle as if the + // task was just started. + // + // In the example below we use the executor to re-attach to the process + // that was created when the task first started. + plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) + } + + execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger) + if err != nil { + return fmt.Errorf("failed to reattach to executor: %v", err) + } + + h := &taskHandle{ + exec: execImpl, + pid: taskState.Pid, + pluginClient: pluginClient, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +// WaitTask returns a channel used to notify Nomad when a task exits. +func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + ch := make(chan *drivers.ExitResult) + go d.handleWait(ctx, handle, ch) + return ch, nil +} + +func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + var result *drivers.ExitResult + + // TODO: implement driver specific logic to notify Nomad the task has been + // completed and what was the exit result. + // + // When a result is sent in the result channel Nomad will stop the task and + // emit an event that an operator can use to get an insight on why the task + // stopped. + // + // In the example below we block and wait until the executor finishes + // running, at which point we send the exit code and signal in the result + // channel. + ps, err := handle.exec.Wait(ctx) + if err != nil { + result = &drivers.ExitResult{ + Err: fmt.Errorf("executor: error waiting on process: %v", err), + } + } else { + result = &drivers.ExitResult{ + ExitCode: ps.ExitCode, + Signal: ps.Signal, + } + } + + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case ch <- result: + } + } +} + +// StopTask stops a running task with the given signal and within the timeout window. +func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + // TODO: implement driver specific logic to stop a task. + // + // The StopTask function is expected to stop a running task by sending the + // given signal to it. If the task does not stop during the given timeout, + // the driver must forcefully kill the task. + // + // In the example below we let the executor handle the task shutdown + // process for us, but you might need to customize this for your own + // implementation. + if err := handle.exec.Shutdown(signal, timeout); err != nil { + if handle.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + + return nil +} + +// DestroyTask cleans up and removes a task that has terminated. +func (d *HelloDriverPlugin) DestroyTask(taskID string, force bool) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if handle.IsRunning() && !force { + return errors.New("cannot destroy running task") + } + + // TODO: implement driver specific logic to destroy a complete task. + // + // Destroying a task includes removing any resources used by task and any + // local references in the plugin. If force is set to true the task should + // be destroyed even if it's currently running. + // + // In the example below we use the executor to force shutdown the task + // (timeout equals 0). + if !handle.pluginClient.Exited() { + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "err", err) + } + + handle.pluginClient.Kill() + } + + d.tasks.Delete(taskID) + return nil +} + +// InspectTask returns detailed status information for the referenced taskID. +func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return handle.TaskStatus(), nil +} + +// TaskStats returns a channel which the driver should send stats to at the given interval. +func (d *HelloDriverPlugin) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + // TODO: implement driver specific logic to send task stats. + // + // This function returns a channel that Nomad will use to listen for task + // stats (e.g., CPU and memory usage) in a given interval. It should send + // stats until the context is canceled or the task stops running. + // + // In the example below we use the Stats function provided by the executor, + // but you can build a set of functions similar to the fingerprint process. + return handle.exec.Stats(ctx, interval) +} + +// TaskEvents returns a channel that the plugin can use to emit task related events. +func (d *HelloDriverPlugin) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return d.eventer.TaskEvents(ctx) +} + +// SignalTask forwards a signal to a task. +// This is an optional capability. +func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + // TODO: implement driver specific signal handling logic. + // + // The given signal must be forwarded to the target taskID. If this plugin + // doesn't support receiving signals (capability SendSignals is set to + // false) you can just return nil. + sig := os.Interrupt + if s, ok := signals.SignalLookup[signal]; ok { + sig = s + } else { + d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) + + } + return handle.exec.Signal(sig) +} + +// ExecTask returns the result of executing the given command inside a task. +// This is an optional capability. +func (d *HelloDriverPlugin) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + // TODO: implement driver specific logic to execute commands in a task. + return nil, errors.New("This driver does not support exec") +} diff --git a/exec2/handle.go b/exec2/handle.go new file mode 100644 index 0000000..6dfb976 --- /dev/null +++ b/exec2/handle.go @@ -0,0 +1,80 @@ +package hello + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// taskHandle should store all relevant runtime information +// such as process ID if this is a local task or other meta +// data if this driver deals with external APIs +type taskHandle struct { + // stateLock syncs access to all fields below + stateLock sync.RWMutex + + logger hclog.Logger + exec executor.Executor + pluginClient *plugin.Client + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult + + // TODO: add any extra relevant information about the task. + pid int +} + +func (h *taskHandle) TaskStatus() *drivers.TaskStatus { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + + return &drivers.TaskStatus{ + ID: h.taskConfig.ID, + Name: h.taskConfig.Name, + State: h.procState, + StartedAt: h.startedAt, + CompletedAt: h.completedAt, + ExitResult: h.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(h.pid), + }, + } +} + +func (h *taskHandle) IsRunning() bool { + h.stateLock.RLock() + defer h.stateLock.RUnlock() + return h.procState == drivers.TaskStateRunning +} + +func (h *taskHandle) run() { + h.stateLock.Lock() + if h.exitResult == nil { + h.exitResult = &drivers.ExitResult{} + } + h.stateLock.Unlock() + + // TODO: wait for your task to complete and upate its state. + ps, err := h.exec.Wait(context.Background()) + h.stateLock.Lock() + defer h.stateLock.Unlock() + + if err != nil { + h.exitResult.Err = err + h.procState = drivers.TaskStateUnknown + h.completedAt = time.Now() + return + } + h.procState = drivers.TaskStateExited + h.exitResult.ExitCode = ps.ExitCode + h.exitResult.Signal = ps.Signal + h.completedAt = ps.Time +} diff --git a/exec2/state.go b/exec2/state.go new file mode 100644 index 0000000..30f10d7 --- /dev/null +++ b/exec2/state.go @@ -0,0 +1,36 @@ +package hello + +import ( + "sync" +) + +// taskStore provides a mechanism to store and retrieve +// task handles given a string identifier. The ID should +// be unique per task +type taskStore struct { + store map[string]*taskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*taskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *taskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*taskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} diff --git a/go.mod b/go.mod index 69e2869..2d4bcc5 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,4 @@ -// TODO: update the module path below to match your own repository -module github.com/hashicorp/nomad-skeleton-driver-plugin +module github.com/Alexis211/nomad-driver-exec2 go 1.19 diff --git a/hello/driver.go b/hello/driver.go deleted file mode 100644 index 312cb76..0000000 --- a/hello/driver.go +++ /dev/null @@ -1,640 +0,0 @@ -package hello - -import ( - "context" - "errors" - "fmt" - "os" - "os/exec" - "path/filepath" - "regexp" - "time" - - "github.com/hashicorp/consul-template/signals" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared/hclspec" - "github.com/hashicorp/nomad/plugins/shared/structs" -) - -const ( - // pluginName is the name of the plugin - // this is used for logging and (along with the version) for uniquely - // identifying plugin binaries fingerprinted by the client - pluginName = "hello-world-example" - - // pluginVersion allows the client to identify and use newer versions of - // an installed plugin - pluginVersion = "v0.1.0" - - // fingerprintPeriod is the interval at which the plugin will send - // fingerprint responses - fingerprintPeriod = 30 * time.Second - - // taskHandleVersion is the version of task handle which this plugin sets - // and understands how to decode - // this is used to allow modification and migration of the task schema - // used by the plugin - taskHandleVersion = 1 -) - -var ( - // pluginInfo describes the plugin - pluginInfo = &base.PluginInfoResponse{ - Type: base.PluginTypeDriver, - PluginApiVersions: []string{drivers.ApiVersion010}, - PluginVersion: pluginVersion, - Name: pluginName, - } - - // configSpec is the specification of the plugin's configuration - // this is used to validate the configuration specified for the plugin - // on the client. - // this is not global, but can be specified on a per-client basis. - configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - // TODO: define plugin's agent configuration schema. - // - // The schema should be defined using HCL specs and it will be used to - // validate the agent configuration provided by the user in the - // `plugin` stanza (https://www.nomadproject.io/docs/configuration/plugin.html). - // - // For example, for the schema below a valid configuration would be: - // - // plugin "hello-driver-plugin" { - // config { - // shell = "fish" - // } - // } - "shell": hclspec.NewDefault( - hclspec.NewAttr("shell", "string", false), - hclspec.NewLiteral(`"bash"`), - ), - }) - - // taskConfigSpec is the specification of the plugin's configuration for - // a task - // this is used to validated the configuration specified for the plugin - // when a job is submitted. - taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - // TODO: define plugin's task configuration schema - // - // The schema should be defined using HCL specs and it will be used to - // validate the task configuration provided by the user when they - // submit a job. - // - // For example, for the schema below a valid task would be: - // job "example" { - // group "example" { - // task "say-hi" { - // driver = "hello-driver-plugin" - // config { - // greeting = "Hi" - // } - // } - // } - // } - "greeting": hclspec.NewDefault( - hclspec.NewAttr("greeting", "string", false), - hclspec.NewLiteral(`"Hello, World!"`), - ), - }) - - // capabilities indicates what optional features this driver supports - // this should be set according to the target run time. - capabilities = &drivers.Capabilities{ - // TODO: set plugin's capabilities - // - // The plugin's capabilities signal Nomad which extra functionalities - // are supported. For a list of available options check the docs page: - // https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities - SendSignals: true, - Exec: false, - } -) - -// Config contains configuration information for the plugin -type Config struct { - // TODO: create decoded plugin configuration struct - // - // This struct is the decoded version of the schema defined in the - // configSpec variable above. It's used to convert the HCL configuration - // passed by the Nomad agent into Go contructs. - Shell string `codec:"shell"` -} - -// TaskConfig contains configuration information for a task that runs with -// this plugin -type TaskConfig struct { - // TODO: create decoded plugin task configuration struct - // - // This struct is the decoded version of the schema defined in the - // taskConfigSpec variable above. It's used to convert the string - // configuration for the task into Go contructs. - Greeting string `codec:"greeting"` -} - -// TaskState is the runtime state which is encoded in the handle returned to -// Nomad client. -// This information is needed to rebuild the task state and handler during -// recovery. -type TaskState struct { - ReattachConfig *structs.ReattachConfig - TaskConfig *drivers.TaskConfig - StartedAt time.Time - - // TODO: add any extra important values that must be persisted in order - // to restore a task. - // - // The plugin keeps track of its running tasks in a in-memory data - // structure. If the plugin crashes, this data will be lost, so Nomad - // will respawn a new instance of the plugin and try to restore its - // in-memory representation of the running tasks using the RecoverTask() - // method below. - Pid int -} - -// HelloDriverPlugin is an example driver plugin. When provisioned in a job, -// the taks will output a greet specified by the user. -type HelloDriverPlugin struct { - // eventer is used to handle multiplexing of TaskEvents calls such that an - // event can be broadcast to all callers - eventer *eventer.Eventer - - // config is the plugin configuration set by the SetConfig RPC - config *Config - - // nomadConfig is the client config from Nomad - nomadConfig *base.ClientDriverConfig - - // tasks is the in memory datastore mapping taskIDs to driver handles - tasks *taskStore - - // ctx is the context for the driver. It is passed to other subsystems to - // coordinate shutdown - ctx context.Context - - // signalShutdown is called when the driver is shutting down and cancels - // the ctx passed to any subsystems - signalShutdown context.CancelFunc - - // logger will log to the Nomad agent - logger hclog.Logger -} - -// NewPlugin returns a new example driver plugin -func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { - ctx, cancel := context.WithCancel(context.Background()) - logger = logger.Named(pluginName) - - return &HelloDriverPlugin{ - eventer: eventer.NewEventer(ctx, logger), - config: &Config{}, - tasks: newTaskStore(), - ctx: ctx, - signalShutdown: cancel, - logger: logger, - } -} - -// PluginInfo returns information describing the plugin. -func (d *HelloDriverPlugin) PluginInfo() (*base.PluginInfoResponse, error) { - return pluginInfo, nil -} - -// ConfigSchema returns the plugin configuration schema. -func (d *HelloDriverPlugin) ConfigSchema() (*hclspec.Spec, error) { - return configSpec, nil -} - -// SetConfig is called by the client to pass the configuration for the plugin. -func (d *HelloDriverPlugin) SetConfig(cfg *base.Config) error { - var config Config - if len(cfg.PluginConfig) != 0 { - if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { - return err - } - } - - // Save the configuration to the plugin - d.config = &config - - // TODO: parse and validated any configuration value if necessary. - // - // If your driver agent configuration requires any complex validation - // (some dependency between attributes) or special data parsing (the - // string "10s" into a time.Interval) you can do it here and update the - // value in d.config. - // - // In the example below we check if the shell specified by the user is - // supported by the plugin. - shell := d.config.Shell - if shell != "bash" && shell != "fish" { - return fmt.Errorf("invalid shell %s", d.config.Shell) - } - - // Save the Nomad agent configuration - if cfg.AgentConfig != nil { - d.nomadConfig = cfg.AgentConfig.Driver - } - - // TODO: initialize any extra requirements if necessary. - // - // Here you can use the config values to initialize any resources that are - // shared by all tasks that use this driver, such as a daemon process. - - return nil -} - -// TaskConfigSchema returns the HCL schema for the configuration of a task. -func (d *HelloDriverPlugin) TaskConfigSchema() (*hclspec.Spec, error) { - return taskConfigSpec, nil -} - -// Capabilities returns the features supported by the driver. -func (d *HelloDriverPlugin) Capabilities() (*drivers.Capabilities, error) { - return capabilities, nil -} - -// Fingerprint returns a channel that will be used to send health information -// and other driver specific node attributes. -func (d *HelloDriverPlugin) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { - ch := make(chan *drivers.Fingerprint) - go d.handleFingerprint(ctx, ch) - return ch, nil -} - -// handleFingerprint manages the channel and the flow of fingerprint data. -func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { - defer close(ch) - - // Nomad expects the initial fingerprint to be sent immediately - ticker := time.NewTimer(0) - for { - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case <-ticker.C: - // after the initial fingerprint we can set the proper fingerprint - // period - ticker.Reset(fingerprintPeriod) - ch <- d.buildFingerprint() - } - } -} - -// buildFingerprint returns the driver's fingerprint data -func (d *HelloDriverPlugin) buildFingerprint() *drivers.Fingerprint { - fp := &drivers.Fingerprint{ - Attributes: map[string]*structs.Attribute{}, - Health: drivers.HealthStateHealthy, - HealthDescription: drivers.DriverHealthy, - } - - // TODO: implement fingerprinting logic to populate health and driver - // attributes. - // - // Fingerprinting is used by the plugin to relay two important information - // to Nomad: health state and node attributes. - // - // If the plugin reports to be unhealthy, or doesn't send any fingerprint - // data in the expected interval of time, Nomad will restart it. - // - // Node attributes can be used to report any relevant information about - // the node in which the plugin is running (specific library availability, - // installed versions of a software etc.). These attributes can then be - // used by an operator to set job constrains. - // - // In the example below we check if the shell specified by the user exists - // in the node. - shell := d.config.Shell - - cmd := exec.Command("which", shell) - if err := cmd.Run(); err != nil { - return &drivers.Fingerprint{ - Health: drivers.HealthStateUndetected, - HealthDescription: fmt.Sprintf("shell %s not found", shell), - } - } - - // We also set the shell and its version as attributes - cmd = exec.Command(shell, "--version") - if out, err := cmd.Output(); err != nil { - d.logger.Warn("failed to find shell version: %v", err) - } else { - re := regexp.MustCompile("[0-9]\\.[0-9]\\.[0-9]") - version := re.FindString(string(out)) - - fp.Attributes["driver.hello.shell_version"] = structs.NewStringAttribute(version) - fp.Attributes["driver.hello.shell"] = structs.NewStringAttribute(shell) - } - - return fp -} - -// StartTask returns a task handle and a driver network if necessary. -func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { - if _, ok := d.tasks.Get(cfg.ID); ok { - return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) - } - - var driverConfig TaskConfig - if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { - return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) - } - - d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) - handle := drivers.NewTaskHandle(taskHandleVersion) - handle.Config = cfg - - // TODO: implement driver specific mechanism to start the task. - // - // Once the task is started you will need to store any relevant runtime - // information in a taskHandle and TaskState. The taskHandle will be - // stored in-memory in the plugin and will be used to interact with the - // task. - // - // The TaskState will be returned to the Nomad client inside a - // drivers.TaskHandle instance. This TaskHandle will be sent back to plugin - // if the task ever needs to be recovered, so the TaskState should contain - // enough information to handle that. - // - // In the example below we use an executor to fork a process to run our - // greeter. The executor is then stored in the handle so we can access it - // later and the the plugin.Client is used to generate a reattach - // configuration that can be used to recover communication with the task. - executorConfig := &executor.ExecutorConfig{ - LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"), - LogLevel: "debug", - } - - exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig) - if err != nil { - return nil, nil, fmt.Errorf("failed to create executor: %v", err) - } - - echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Greeting) - execCmd := &executor.ExecCommand{ - Cmd: d.config.Shell, - Args: []string{"-c", echoCmd}, - StdoutPath: cfg.StdoutPath, - StderrPath: cfg.StderrPath, - } - - ps, err := exec.Launch(execCmd) - if err != nil { - pluginClient.Kill() - return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) - } - - h := &taskHandle{ - exec: exec, - pid: ps.Pid, - pluginClient: pluginClient, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, - } - - driverState := TaskState{ - ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, - } - - if err := handle.SetDriverState(&driverState); err != nil { - return nil, nil, fmt.Errorf("failed to set driver state: %v", err) - } - - d.tasks.Set(cfg.ID, h) - go h.run() - return handle, nil, nil -} - -// RecoverTask recreates the in-memory state of a task from a TaskHandle. -func (d *HelloDriverPlugin) RecoverTask(handle *drivers.TaskHandle) error { - if handle == nil { - return errors.New("error: handle cannot be nil") - } - - if _, ok := d.tasks.Get(handle.Config.ID); ok { - return nil - } - - var taskState TaskState - if err := handle.GetDriverState(&taskState); err != nil { - return fmt.Errorf("failed to decode task state from handle: %v", err) - } - - var driverConfig TaskConfig - if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { - return fmt.Errorf("failed to decode driver config: %v", err) - } - - // TODO: implement driver specific logic to recover a task. - // - // Recovering a task involves recreating and storing a taskHandle as if the - // task was just started. - // - // In the example below we use the executor to re-attach to the process - // that was created when the task first started. - plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig) - if err != nil { - return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) - } - - execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger) - if err != nil { - return fmt.Errorf("failed to reattach to executor: %v", err) - } - - h := &taskHandle{ - exec: execImpl, - pid: taskState.Pid, - pluginClient: pluginClient, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - } - - d.tasks.Set(taskState.TaskConfig.ID, h) - - go h.run() - return nil -} - -// WaitTask returns a channel used to notify Nomad when a task exits. -func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - ch := make(chan *drivers.ExitResult) - go d.handleWait(ctx, handle, ch) - return ch, nil -} - -func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { - defer close(ch) - var result *drivers.ExitResult - - // TODO: implement driver specific logic to notify Nomad the task has been - // completed and what was the exit result. - // - // When a result is sent in the result channel Nomad will stop the task and - // emit an event that an operator can use to get an insight on why the task - // stopped. - // - // In the example below we block and wait until the executor finishes - // running, at which point we send the exit code and signal in the result - // channel. - ps, err := handle.exec.Wait(ctx) - if err != nil { - result = &drivers.ExitResult{ - Err: fmt.Errorf("executor: error waiting on process: %v", err), - } - } else { - result = &drivers.ExitResult{ - ExitCode: ps.ExitCode, - Signal: ps.Signal, - } - } - - for { - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case ch <- result: - } - } -} - -// StopTask stops a running task with the given signal and within the timeout window. -func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - // TODO: implement driver specific logic to stop a task. - // - // The StopTask function is expected to stop a running task by sending the - // given signal to it. If the task does not stop during the given timeout, - // the driver must forcefully kill the task. - // - // In the example below we let the executor handle the task shutdown - // process for us, but you might need to customize this for your own - // implementation. - if err := handle.exec.Shutdown(signal, timeout); err != nil { - if handle.pluginClient.Exited() { - return nil - } - return fmt.Errorf("executor Shutdown failed: %v", err) - } - - return nil -} - -// DestroyTask cleans up and removes a task that has terminated. -func (d *HelloDriverPlugin) DestroyTask(taskID string, force bool) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - if handle.IsRunning() && !force { - return errors.New("cannot destroy running task") - } - - // TODO: implement driver specific logic to destroy a complete task. - // - // Destroying a task includes removing any resources used by task and any - // local references in the plugin. If force is set to true the task should - // be destroyed even if it's currently running. - // - // In the example below we use the executor to force shutdown the task - // (timeout equals 0). - if !handle.pluginClient.Exited() { - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "err", err) - } - - handle.pluginClient.Kill() - } - - d.tasks.Delete(taskID) - return nil -} - -// InspectTask returns detailed status information for the referenced taskID. -func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - return handle.TaskStatus(), nil -} - -// TaskStats returns a channel which the driver should send stats to at the given interval. -func (d *HelloDriverPlugin) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } - - // TODO: implement driver specific logic to send task stats. - // - // This function returns a channel that Nomad will use to listen for task - // stats (e.g., CPU and memory usage) in a given interval. It should send - // stats until the context is canceled or the task stops running. - // - // In the example below we use the Stats function provided by the executor, - // but you can build a set of functions similar to the fingerprint process. - return handle.exec.Stats(ctx, interval) -} - -// TaskEvents returns a channel that the plugin can use to emit task related events. -func (d *HelloDriverPlugin) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { - return d.eventer.TaskEvents(ctx) -} - -// SignalTask forwards a signal to a task. -// This is an optional capability. -func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound - } - - // TODO: implement driver specific signal handling logic. - // - // The given signal must be forwarded to the target taskID. If this plugin - // doesn't support receiving signals (capability SendSignals is set to - // false) you can just return nil. - sig := os.Interrupt - if s, ok := signals.SignalLookup[signal]; ok { - sig = s - } else { - d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) - - } - return handle.exec.Signal(sig) -} - -// ExecTask returns the result of executing the given command inside a task. -// This is an optional capability. -func (d *HelloDriverPlugin) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - // TODO: implement driver specific logic to execute commands in a task. - return nil, errors.New("This driver does not support exec") -} diff --git a/hello/handle.go b/hello/handle.go deleted file mode 100644 index 6dfb976..0000000 --- a/hello/handle.go +++ /dev/null @@ -1,80 +0,0 @@ -package hello - -import ( - "context" - "strconv" - "sync" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor" - "github.com/hashicorp/nomad/plugins/drivers" -) - -// taskHandle should store all relevant runtime information -// such as process ID if this is a local task or other meta -// data if this driver deals with external APIs -type taskHandle struct { - // stateLock syncs access to all fields below - stateLock sync.RWMutex - - logger hclog.Logger - exec executor.Executor - pluginClient *plugin.Client - taskConfig *drivers.TaskConfig - procState drivers.TaskState - startedAt time.Time - completedAt time.Time - exitResult *drivers.ExitResult - - // TODO: add any extra relevant information about the task. - pid int -} - -func (h *taskHandle) TaskStatus() *drivers.TaskStatus { - h.stateLock.RLock() - defer h.stateLock.RUnlock() - - return &drivers.TaskStatus{ - ID: h.taskConfig.ID, - Name: h.taskConfig.Name, - State: h.procState, - StartedAt: h.startedAt, - CompletedAt: h.completedAt, - ExitResult: h.exitResult, - DriverAttributes: map[string]string{ - "pid": strconv.Itoa(h.pid), - }, - } -} - -func (h *taskHandle) IsRunning() bool { - h.stateLock.RLock() - defer h.stateLock.RUnlock() - return h.procState == drivers.TaskStateRunning -} - -func (h *taskHandle) run() { - h.stateLock.Lock() - if h.exitResult == nil { - h.exitResult = &drivers.ExitResult{} - } - h.stateLock.Unlock() - - // TODO: wait for your task to complete and upate its state. - ps, err := h.exec.Wait(context.Background()) - h.stateLock.Lock() - defer h.stateLock.Unlock() - - if err != nil { - h.exitResult.Err = err - h.procState = drivers.TaskStateUnknown - h.completedAt = time.Now() - return - } - h.procState = drivers.TaskStateExited - h.exitResult.ExitCode = ps.ExitCode - h.exitResult.Signal = ps.Signal - h.completedAt = ps.Time -} diff --git a/hello/state.go b/hello/state.go deleted file mode 100644 index 30f10d7..0000000 --- a/hello/state.go +++ /dev/null @@ -1,36 +0,0 @@ -package hello - -import ( - "sync" -) - -// taskStore provides a mechanism to store and retrieve -// task handles given a string identifier. The ID should -// be unique per task -type taskStore struct { - store map[string]*taskHandle - lock sync.RWMutex -} - -func newTaskStore() *taskStore { - return &taskStore{store: map[string]*taskHandle{}} -} - -func (ts *taskStore) Set(id string, handle *taskHandle) { - ts.lock.Lock() - defer ts.lock.Unlock() - ts.store[id] = handle -} - -func (ts *taskStore) Get(id string) (*taskHandle, bool) { - ts.lock.RLock() - defer ts.lock.RUnlock() - t, ok := ts.store[id] - return t, ok -} - -func (ts *taskStore) Delete(id string) { - ts.lock.Lock() - defer ts.lock.Unlock() - delete(ts.store, id) -} diff --git a/main.go b/main.go index 44781ae..7963a63 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,7 @@ package main import ( - // TODO: update the path below to match your own repository - "github.com/hashicorp/nomad-skeleton-driver-plugin/hello" + exec2 "github.com/Alexis211/nomad-driver-exec2/exec2" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/plugins" @@ -15,5 +14,5 @@ func main() { // factory returns a new instance of a nomad driver plugin func factory(log hclog.Logger) interface{} { - return hello.NewPlugin(log) + return exec2.NewPlugin(log) } -- cgit v1.2.3