diff options
-rw-r--r-- | exec2/driver.go | 2 | ||||
-rw-r--r-- | exec2/handle.go | 2 | ||||
-rw-r--r-- | executor/exec_utils.go | 285 | ||||
-rw-r--r-- | executor/executor.go | 722 | ||||
-rw-r--r-- | executor/executor_basic.go | 33 | ||||
-rw-r--r-- | executor/executor_linux.go | 926 | ||||
-rw-r--r-- | executor/executor_plugin.go | 34 | ||||
-rw-r--r-- | executor/executor_universal_linux.go | 154 | ||||
-rw-r--r-- | executor/executor_unix.go | 50 | ||||
-rw-r--r-- | executor/grpc_client.go | 267 | ||||
-rw-r--r-- | executor/grpc_server.go | 178 | ||||
-rw-r--r-- | executor/libcontainer_nsenter_linux.go | 29 | ||||
-rw-r--r-- | executor/pid_collector.go | 211 | ||||
-rw-r--r-- | executor/plugins.go | 55 | ||||
-rw-r--r-- | executor/pty_unix.go | 43 | ||||
-rw-r--r-- | executor/resource_container_default.go | 12 | ||||
-rw-r--r-- | executor/utils.go | 138 | ||||
-rw-r--r-- | executor/utils_unix.go | 19 | ||||
-rw-r--r-- | executor/z_executor_cmd.go | 55 |
19 files changed, 3213 insertions, 2 deletions
diff --git a/exec2/driver.go b/exec2/driver.go index 827d351..3624c7c 100644 --- a/exec2/driver.go +++ b/exec2/driver.go @@ -13,7 +13,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/drivers/shared/capabilities" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/Alexis211/nomad-driver-exec2/executor" "github.com/hashicorp/nomad/drivers/shared/resolvconf" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/pluginutils/loader" diff --git a/exec2/handle.go b/exec2/handle.go index b4d55a2..9cd1cc3 100644 --- a/exec2/handle.go +++ b/exec2/handle.go @@ -7,7 +7,7 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/Alexis211/nomad-driver-exec2/executor" "github.com/hashicorp/nomad/plugins/drivers" ) diff --git a/executor/exec_utils.go b/executor/exec_utils.go new file mode 100644 index 0000000..1a048eb --- /dev/null +++ b/executor/exec_utils.go @@ -0,0 +1,285 @@ +package executor + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "sync" + "syscall" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/plugins/drivers" + dproto "github.com/hashicorp/nomad/plugins/drivers/proto" +) + +// execHelper is a convenient wrapper for starting and executing commands, and handling their output +type execHelper struct { + logger hclog.Logger + + // newTerminal function creates a tty appropriate for the command + // The returned pty end of tty function is to be called after process start. + newTerminal func() (pty func() (*os.File, error), tty *os.File, err error) + + // setTTY is a callback to configure the command with slave end of the tty of the terminal, when tty is enabled + setTTY func(tty *os.File) error + + // setTTY is a callback to configure the command with std{in|out|err}, when tty is disabled + setIO func(stdin io.Reader, stdout, stderr io.Writer) error + + // processStart starts the process, like `exec.Cmd.Start()` + processStart func() error + + // processWait blocks until command terminates and returns its final state + processWait func() (*os.ProcessState, error) +} + +func (e *execHelper) run(ctx context.Context, tty bool, stream drivers.ExecTaskStream) error { + if tty { + return e.runTTY(ctx, stream) + } + return e.runNoTTY(ctx, stream) +} + +func (e *execHelper) runTTY(ctx context.Context, stream drivers.ExecTaskStream) error { + ptyF, tty, err := e.newTerminal() + if err != nil { + return fmt.Errorf("failed to open a tty: %v", err) + } + defer tty.Close() + + if err := e.setTTY(tty); err != nil { + return fmt.Errorf("failed to set command tty: %v", err) + } + if err := e.processStart(); err != nil { + return fmt.Errorf("failed to start command: %v", err) + } + + var wg sync.WaitGroup + errCh := make(chan error, 3) + + pty, err := ptyF() + if err != nil { + return fmt.Errorf("failed to get pty: %v", err) + } + + defer pty.Close() + wg.Add(1) + go handleStdin(e.logger, pty, stream, errCh) + // when tty is on, stdout and stderr point to the same pty so only read once + go handleStdout(e.logger, pty, &wg, stream.Send, errCh) + + ps, err := e.processWait() + + // force close streams to close out the stream copying goroutines + tty.Close() + + // wait until we get all process output + wg.Wait() + + // wait to flush out output + stream.Send(cmdExitResult(ps, err)) + + select { + case cerr := <-errCh: + return cerr + default: + return nil + } +} + +func (e *execHelper) runNoTTY(ctx context.Context, stream drivers.ExecTaskStream) error { + var sendLock sync.Mutex + send := func(v *drivers.ExecTaskStreamingResponseMsg) error { + sendLock.Lock() + defer sendLock.Unlock() + + return stream.Send(v) + } + + stdinPr, stdinPw := io.Pipe() + stdoutPr, stdoutPw := io.Pipe() + stderrPr, stderrPw := io.Pipe() + + defer stdoutPw.Close() + defer stderrPw.Close() + + if err := e.setIO(stdinPr, stdoutPw, stderrPw); err != nil { + return fmt.Errorf("failed to set command io: %v", err) + } + + if err := e.processStart(); err != nil { + return fmt.Errorf("failed to start command: %v", err) + } + + var wg sync.WaitGroup + errCh := make(chan error, 3) + + wg.Add(2) + go handleStdin(e.logger, stdinPw, stream, errCh) + go handleStdout(e.logger, stdoutPr, &wg, send, errCh) + go handleStderr(e.logger, stderrPr, &wg, send, errCh) + + ps, err := e.processWait() + + // force close streams to close out the stream copying goroutines + stdinPr.Close() + stdoutPw.Close() + stderrPw.Close() + + // wait until we get all process output + wg.Wait() + + // wait to flush out output + stream.Send(cmdExitResult(ps, err)) + + select { + case cerr := <-errCh: + return cerr + default: + return nil + } +} +func cmdExitResult(ps *os.ProcessState, err error) *drivers.ExecTaskStreamingResponseMsg { + exitCode := -1 + + if ps == nil { + if ee, ok := err.(*exec.ExitError); ok { + ps = ee.ProcessState + } + } + + if ps == nil { + exitCode = -2 + } else if status, ok := ps.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + if status.Signaled() { + const exitSignalBase = 128 + signal := int(status.Signal()) + exitCode = exitSignalBase + signal + } + } + + return &drivers.ExecTaskStreamingResponseMsg{ + Exited: true, + Result: &dproto.ExitResult{ + ExitCode: int32(exitCode), + }, + } +} + +func handleStdin(logger hclog.Logger, stdin io.WriteCloser, stream drivers.ExecTaskStream, errCh chan<- error) { + for { + m, err := stream.Recv() + if isClosedError(err) { + return + } else if err != nil { + errCh <- err + return + } + + if m.Stdin != nil { + if len(m.Stdin.Data) != 0 { + _, err := stdin.Write(m.Stdin.Data) + if err != nil { + errCh <- err + return + } + } + if m.Stdin.Close { + stdin.Close() + } + } else if m.TtySize != nil { + err := setTTYSize(stdin, m.TtySize.Height, m.TtySize.Width) + if err != nil { + errCh <- fmt.Errorf("failed to resize tty: %v", err) + return + } + } + } +} + +func handleStdout(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { + defer wg.Done() + + buf := make([]byte, 4096) + for { + n, err := reader.Read(buf) + // always send output first if we read something + if n > 0 { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stdout: &dproto.ExecTaskStreamingIOOperation{ + Data: buf[:n], + }, + }); err != nil { + errCh <- err + return + } + } + + // then process error + if isClosedError(err) { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stdout: &dproto.ExecTaskStreamingIOOperation{ + Close: true, + }, + }); err != nil { + errCh <- err + return + } + return + } else if err != nil { + errCh <- err + return + } + + } +} + +func handleStderr(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { + defer wg.Done() + + buf := make([]byte, 4096) + for { + n, err := reader.Read(buf) + // always send output first if we read something + if n > 0 { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stderr: &dproto.ExecTaskStreamingIOOperation{ + Data: buf[:n], + }, + }); err != nil { + errCh <- err + return + } + } + + // then process error + if isClosedError(err) { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stderr: &dproto.ExecTaskStreamingIOOperation{ + Close: true, + }, + }); err != nil { + errCh <- err + return + } + return + } else if err != nil { + errCh <- err + return + } + + } +} + +func isClosedError(err error) bool { + if err == nil { + return false + } + + return err == io.EOF || + err == io.ErrClosedPipe || + isUnixEIOErr(err) +} diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 0000000..a3fe56c --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,722 @@ +package executor + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "syscall" + "time" + + "github.com/armon/circbuf" + "github.com/creack/pty" + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/fifo" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" + shelpers "github.com/hashicorp/nomad/helper/stats" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/syndtr/gocapability/capability" +) + +const ( + // ExecutorVersionLatest is the current and latest version of the executor + ExecutorVersionLatest = "2.0.0" + + // ExecutorVersionPre0_9 is the version of executor use prior to the release + // of 0.9.x + ExecutorVersionPre0_9 = "1.1.0" + + // IsolationModePrivate represents the private isolation mode for a namespace + IsolationModePrivate = "private" + + // IsolationModeHost represents the host isolation mode for a namespace + IsolationModeHost = "host" +) + +var ( + // The statistics the basic executor exposes + ExecutorBasicMeasuredMemStats = []string{"RSS", "Swap"} + ExecutorBasicMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} +) + +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + // Launch a user process configured by the given ExecCommand + Launch(launchCmd *ExecCommand) (*ProcessState, error) + + // Wait blocks until the process exits or an error occures + Wait(ctx context.Context) (*ProcessState, error) + + // Shutdown will shutdown the executor by stopping the user process, + // cleaning up and resources created by the executor. The shutdown sequence + // will first send the given signal to the process. This defaults to "SIGINT" + // if not specified. The executor will then wait for the process to exit + // before cleaning up other resources. If the executor waits longer than the + // given grace period, the process is forcefully killed. + // + // To force kill the user process, gracePeriod can be set to 0. + Shutdown(signal string, gracePeriod time.Duration) error + + // UpdateResources updates any resource isolation enforcement with new + // constraints if supported. + UpdateResources(*drivers.Resources) error + + // Version returns the executor API version + Version() (*ExecutorVersion, error) + + // Returns a channel of stats. Stats are collected and + // pushed to the channel on the given interval + Stats(context.Context, time.Duration) (<-chan *cstructs.TaskResourceUsage, error) + + // Signal sends the given signal to the user process + Signal(os.Signal) error + + // Exec executes the given command and args inside the executor context + // and returns the output and exit code. + Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) + + ExecStreaming(ctx context.Context, cmd []string, tty bool, + stream drivers.ExecTaskStream) error +} + +// ExecCommand holds the user command, args, and other isolation related +// settings. +// +// Important (!): when adding fields, make sure to update the RPC methods in +// grpcExecutorClient.Launch and grpcExecutorServer.Launch. Number of hours +// spent tracking this down: too many. +type ExecCommand struct { + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. + Args []string + + // Resources defined by the task + Resources *drivers.Resources + + // StdoutPath is the path the process stdout should be written to + StdoutPath string + stdout io.WriteCloser + + // StderrPath is the path the process stderr should be written to + StderrPath string + stderr io.WriteCloser + + // Env is the list of KEY=val pairs of environment variables to be set + Env []string + + // User is the user which the executor uses to run the command. + User string + + // TaskDir is the directory path on the host where for the task + TaskDir string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool + + // Cgroup marks whether we put the process in a cgroup. Setting this field + // doesn't enforce resource limits. To enforce limits, set ResourceLimits. + // Using the cgroup does allow more precise cleanup of processes. + BasicProcessCgroup bool + + // NoPivotRoot disables using pivot_root for isolation, useful when the root + // partition is on a ramdisk which does not support pivot_root, + // see man 2 pivot_root + NoPivotRoot bool + + // Mounts are the host paths to be be made available inside rootfs + Mounts []*drivers.MountConfig + + // Devices are the the device nodes to be created in isolation environment + Devices []*drivers.DeviceConfig + + // NetworkIsolation is the network isolation configuration. + NetworkIsolation *drivers.NetworkIsolationSpec + + // ModePID is the PID isolation mode (private or host). + ModePID string + + // ModeIPC is the IPC isolation mode (private or host). + ModeIPC string + + // Capabilities are the linux capabilities to be enabled by the task driver. + Capabilities []string +} + +// SetWriters sets the writer for the process stdout and stderr. This should +// not be used if writing to a file path such as a fifo file. SetStdoutWriter +// is mainly used for unit testing purposes. +func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) { + c.stdout = out + c.stderr = err +} + +// GetWriters returns the unexported io.WriteCloser for the stdout and stderr +// handles. This is mainly used for unit testing purposes. +func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) { + return c.stdout, c.stderr +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } + +// Stdout returns a writer for the configured file descriptor +func (c *ExecCommand) Stdout() (io.WriteCloser, error) { + if c.stdout == nil { + if c.StdoutPath != "" { + f, err := fifo.OpenWriter(c.StdoutPath) + if err != nil { + return nil, fmt.Errorf("failed to create stdout: %v", err) + } + c.stdout = f + } else { + c.stdout = nopCloser{ioutil.Discard} + } + } + return c.stdout, nil +} + +// Stderr returns a writer for the configured file descriptor +func (c *ExecCommand) Stderr() (io.WriteCloser, error) { + if c.stderr == nil { + if c.StderrPath != "" { + f, err := fifo.OpenWriter(c.StderrPath) + if err != nil { + return nil, fmt.Errorf("failed to create stderr: %v", err) + } + c.stderr = f + } else { + c.stderr = nopCloser{ioutil.Discard} + } + } + return c.stderr, nil +} + +func (c *ExecCommand) Close() { + if c.stdout != nil { + c.stdout.Close() + } + if c.stderr != nil { + c.stderr.Close() + } +} + +// ProcessState holds information about the state of a user process. +type ProcessState struct { + Pid int + ExitCode int + Signal int + Time time.Time +} + +// ExecutorVersion is the version of the executor +type ExecutorVersion struct { + Version string +} + +func (v *ExecutorVersion) GoString() string { + return v.Version +} + +// UniversalExecutor is an implementation of the Executor which launches and +// supervises processes. In addition to process supervision it provides resource +// and file system isolation +type UniversalExecutor struct { + childCmd exec.Cmd + commandCfg *ExecCommand + + exitState *ProcessState + processExited chan interface{} + + // containment is used to cleanup resources created by the executor + // currently only used for killing pids via freezer cgroup on linux + containment resources.Containment + + totalCpuStats *stats.CpuStats + userCpuStats *stats.CpuStats + systemCpuStats *stats.CpuStats + pidCollector *pidCollector + + logger hclog.Logger +} + +// NewExecutor returns an Executor +func NewExecutor(logger hclog.Logger) Executor { + logger = logger.Named("executor") + if err := shelpers.Init(); err != nil { + logger.Error("unable to initialize stats", "error", err) + } + + return &UniversalExecutor{ + logger: logger, + processExited: make(chan interface{}), + totalCpuStats: stats.NewCpuStats(), + userCpuStats: stats.NewCpuStats(), + systemCpuStats: stats.NewCpuStats(), + pidCollector: newPidCollector(logger), + } +} + +// Version returns the api version of the executor +func (e *UniversalExecutor) Version() (*ExecutorVersion, error) { + return &ExecutorVersion{Version: ExecutorVersionLatest}, nil +} + +// Launch launches the main process and returns its state. It also +// configures an applies isolation on certain platforms. +func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) { + e.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) + + e.commandCfg = command + + // setting the user of the process + if command.User != "" { + e.logger.Debug("running command as user", "user", command.User) + if err := setCmdUser(&e.childCmd, command.User); err != nil { + return nil, err + } + } + + // set the task dir as the working directory for the command + e.childCmd.Dir = e.commandCfg.TaskDir + + // start command in separate process group + if err := e.setNewProcessGroup(); err != nil { + return nil, err + } + + // Maybe setup containment (for now, cgroups only only on linux) + if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup { + pid := os.Getpid() + if err := e.configureResourceContainer(pid); err != nil { + e.logger.Error("failed to configure resource container", "pid", pid, "error", err) + return nil, err + } + } + + stdout, err := e.commandCfg.Stdout() + if err != nil { + return nil, err + } + stderr, err := e.commandCfg.Stderr() + if err != nil { + return nil, err + } + + e.childCmd.Stdout = stdout + e.childCmd.Stderr = stderr + + // Look up the binary path and make it executable + absPath, err := lookupBin(command.TaskDir, command.Cmd) + if err != nil { + return nil, err + } + + if err := makeExecutable(absPath); err != nil { + return nil, err + } + + path := absPath + + // Set the commands arguments + e.childCmd.Path = path + e.childCmd.Args = append([]string{e.childCmd.Path}, command.Args...) + e.childCmd.Env = e.commandCfg.Env + + // Start the process + if err = withNetworkIsolation(e.childCmd.Start, command.NetworkIsolation); err != nil { + return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err) + } + + go e.pidCollector.collectPids(e.processExited, e.getAllPids) + go e.wait() + return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil +} + +// Exec a command inside a container for exec and java drivers. +func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) { + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + return ExecScript(ctx, e.childCmd.Dir, e.commandCfg.Env, e.childCmd.SysProcAttr, e.commandCfg.NetworkIsolation, name, args) +} + +// ExecScript executes cmd with args and returns the output, exit code, and +// error. Output is truncated to drivers/shared/structs.CheckBufSize +func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.SysProcAttr, + netSpec *drivers.NetworkIsolationSpec, name string, args []string) ([]byte, int, error) { + + cmd := exec.CommandContext(ctx, name, args...) + + // Copy runtime environment from the main command + cmd.SysProcAttr = attrs + cmd.Dir = dir + cmd.Env = env + + // Capture output + buf, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize)) + cmd.Stdout = buf + cmd.Stderr = buf + + if err := withNetworkIsolation(cmd.Run, netSpec); err != nil { + exitErr, ok := err.(*exec.ExitError) + if !ok { + // Non-exit error, return it and let the caller treat + // it as a critical failure + return nil, 0, err + } + + // Some kind of error happened; default to critical + exitCode := 2 + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + + // Don't return the exitError as the caller only needs the + // output and code. + return buf.Bytes(), exitCode, nil + } + return buf.Bytes(), 0, nil +} + +func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("command is required") + } + + cmd := exec.CommandContext(ctx, command[0], command[1:]...) + + cmd.Dir = "/" + cmd.Env = e.childCmd.Env + + execHelper := &execHelper{ + logger: e.logger, + + newTerminal: func() (func() (*os.File, error), *os.File, error) { + pty, tty, err := pty.Open() + if err != nil { + return nil, nil, err + } + + return func() (*os.File, error) { return pty, nil }, tty, err + }, + setTTY: func(tty *os.File) error { + cmd.SysProcAttr = sessionCmdAttr(tty) + + cmd.Stdin = tty + cmd.Stdout = tty + cmd.Stderr = tty + return nil + }, + setIO: func(stdin io.Reader, stdout, stderr io.Writer) error { + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr + return nil + }, + processStart: func() error { + if u := e.commandCfg.User; u != "" { + if err := setCmdUser(cmd, u); err != nil { + return err + } + } + + return withNetworkIsolation(cmd.Start, e.commandCfg.NetworkIsolation) + }, + processWait: func() (*os.ProcessState, error) { + err := cmd.Wait() + return cmd.ProcessState, err + }, + } + + return execHelper.run(ctx, tty, stream) +} + +// Wait waits until a process has exited and returns it's exitcode and errors +func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.processExited: + return e.exitState, nil + } +} + +func (e *UniversalExecutor) UpdateResources(resources *drivers.Resources) error { + return nil +} + +func (e *UniversalExecutor) wait() { + defer close(e.processExited) + defer e.commandCfg.Close() + pid := e.childCmd.Process.Pid + err := e.childCmd.Wait() + if err == nil { + e.exitState = &ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} + return + } + + exitCode := 1 + var signal int + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + if status.Signaled() { + // bash(1) uses the lower 7 bits of a uint8 + // to indicate normal program failure (see + // <sysexits.h>). If a process terminates due + // to a signal, encode the signal number to + // indicate which signal caused the process + // to terminate. Mirror this exit code + // encoding scheme. + const exitSignalBase = 128 + signal = int(status.Signal()) + exitCode = exitSignalBase + signal + } + } + } else { + e.logger.Warn("unexpected Cmd.Wait() error type", "error", err) + } + + e.exitState = &ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} +} + +var ( + // finishedErr is the error message received when trying to kill and already + // exited process. + finishedErr = "os: process already finished" + + // noSuchProcessErr is the error message received when trying to kill a non + // existing process (e.g. when killing a process group). + noSuchProcessErr = "no such process" +) + +// Shutdown cleans up the alloc directory, destroys resource container and +// kills the user process. +func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { + e.logger.Debug("shutdown requested", "signal", signal, "grace_period_ms", grace.Round(time.Millisecond)) + var merr multierror.Error + + // If the executor did not launch a process, return. + if e.commandCfg == nil { + return nil + } + + // If there is no process we can't shutdown + if e.childCmd.Process == nil { + e.logger.Warn("failed to shutdown due to missing process", "error", "no process found") + return fmt.Errorf("executor failed to shutdown error: no process found") + } + + proc, err := os.FindProcess(e.childCmd.Process.Pid) + if err != nil { + err = fmt.Errorf("executor failed to find process: %v", err) + e.logger.Warn("failed to shutdown due to inability to find process", "pid", e.childCmd.Process.Pid, "error", err) + return err + } + + // If grace is 0 then skip shutdown logic + if grace > 0 { + // Default signal to SIGINT if not set + if signal == "" { + signal = "SIGINT" + } + + sig, ok := signals.SignalLookup[signal] + if !ok { + err = fmt.Errorf("error unknown signal given for shutdown: %s", signal) + e.logger.Warn("failed to shutdown", "error", err) + return err + } + + if err := e.shutdownProcess(sig, proc); err != nil { + e.logger.Warn("failed to shutdown process", "pid", proc.Pid, "error", err) + return err + } + + select { + case <-e.processExited: + case <-time.After(grace): + proc.Kill() + } + } else { + proc.Kill() + } + + // Wait for process to exit + select { + case <-e.processExited: + case <-time.After(time.Second * 15): + e.logger.Warn("process did not exit after 15 seconds") + merr.Errors = append(merr.Errors, fmt.Errorf("process did not exit after 15 seconds")) + } + + // prefer killing the process via platform-dependent resource containment + killByContainment := e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup + + if !killByContainment { + // there is no containment, so kill the group the old fashioned way by sending + // SIGKILL to the negative pid + if cleanupChildrenErr := e.killProcessTree(proc); cleanupChildrenErr != nil && cleanupChildrenErr.Error() != finishedErr { + merr.Errors = append(merr.Errors, + fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, cleanupChildrenErr)) + } + } else { + // there is containment available (e.g. cgroups) so defer to that implementation + // for killing the processes + if cleanupErr := e.containment.Cleanup(); cleanupErr != nil { + e.logger.Warn("containment cleanup failed", "error", cleanupErr) + merr.Errors = append(merr.Errors, cleanupErr) + } + } + + if err = merr.ErrorOrNil(); err != nil { + e.logger.Warn("failed to shutdown due to some error", "error", err.Error()) + return err + } + + return nil +} + +// Signal sends the passed signal to the task +func (e *UniversalExecutor) Signal(s os.Signal) error { + if e.childCmd.Process == nil { + return fmt.Errorf("Task not yet run") + } + + e.logger.Debug("sending signal to PID", "signal", s, "pid", e.childCmd.Process.Pid) + err := e.childCmd.Process.Signal(s) + if err != nil { + e.logger.Error("sending signal failed", "signal", s, "error", err) + return err + } + + return nil +} + +func (e *UniversalExecutor) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) { + ch := make(chan *cstructs.TaskResourceUsage) + go e.handleStats(ch, ctx, interval) + return ch, nil +} + +func (e *UniversalExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx context.Context, interval time.Duration) { + defer close(ch) + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + timer.Reset(interval) + } + + pidStats, err := e.pidCollector.pidStats() + if err != nil { + e.logger.Warn("error collecting stats", "error", err) + return + } + + select { + case <-ctx.Done(): + return + case ch <- aggregatedResourceUsage(e.systemCpuStats, pidStats): + } + } +} + +// lookupBin looks for path to the binary to run by looking for the binary in +// the following locations, in-order: +// task/local/, task/, on the host file system, in host $PATH +// The return path is absolute. +func lookupBin(taskDir string, bin string) (string, error) { + // Check in the local directory + local := filepath.Join(taskDir, allocdir.TaskLocal, bin) + if _, err := os.Stat(local); err == nil { + return local, nil + } + + // Check at the root of the task's directory + root := filepath.Join(taskDir, bin) + if _, err := os.Stat(root); err == nil { + return root, nil + } + + // when checking host paths, check with Stat first if path is absolute + // as exec.LookPath only considers files already marked as executable + // and only consider this for absolute paths to avoid depending on + // current directory of nomad which may cause unexpected behavior + if _, err := os.Stat(bin); err == nil && filepath.IsAbs(bin) { + return bin, nil + } + + // Check the $PATH + if host, err := exec.LookPath(bin); err == nil { + return host, nil + } + + return "", fmt.Errorf("binary %q could not be found", bin) +} + +// makeExecutable makes the given file executable for root,group,others. +func makeExecutable(binPath string) error { + if runtime.GOOS == "windows" { + return nil + } + + fi, err := os.Stat(binPath) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("binary %q does not exist", binPath) + } + return fmt.Errorf("specified binary is invalid: %v", err) + } + + // If it is not executable, make it so. + perm := fi.Mode().Perm() + req := os.FileMode(0555) + if perm&req != req { + if err := os.Chmod(binPath, perm|req); err != nil { + return fmt.Errorf("error making %q executable: %s", binPath, err) + } + } + return nil +} + +// SupportedCaps returns a list of all supported capabilities in kernel. +func SupportedCaps(allowNetRaw bool) []string { + var allCaps []string + last := capability.CAP_LAST_CAP + // workaround for RHEL6 which has no /proc/sys/kernel/cap_last_cap + if last == capability.Cap(63) { + last = capability.CAP_BLOCK_SUSPEND + } + for _, cap := range capability.List() { + if cap > last { + continue + } + if !allowNetRaw && cap == capability.CAP_NET_RAW { + continue + } + allCaps = append(allCaps, fmt.Sprintf("CAP_%s", strings.ToUpper(cap.String()))) + } + return allCaps +} diff --git a/executor/executor_basic.go b/executor/executor_basic.go new file mode 100644 index 0000000..ad42792 --- /dev/null +++ b/executor/executor_basic.go @@ -0,0 +1,33 @@ +//go:build !linux + +package executor + +import ( + "os/exec" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/plugins/drivers" +) + +func NewExecutorWithIsolation(logger hclog.Logger) Executor { + logger = logger.Named("executor") + logger.Error("isolation executor is not supported on this platform, using default") + return NewExecutor(logger) +} + +func (e *UniversalExecutor) configureResourceContainer(_ int) error { return nil } + +func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) { + return getAllPidsByScanning() +} + +func (e *UniversalExecutor) start(command *ExecCommand) error { + return e.childCmd.Start() +} + +func withNetworkIsolation(f func() error, _ *drivers.NetworkIsolationSpec) error { + return f() +} + +func setCmdUser(*exec.Cmd, string) error { return nil } diff --git a/executor/executor_linux.go b/executor/executor_linux.go new file mode 100644 index 0000000..4ab8367 --- /dev/null +++ b/executor/executor_linux.go @@ -0,0 +1,926 @@ +//go:build linux + +package executor + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/armon/circbuf" + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/capabilities" + shelpers "github.com/hashicorp/nomad/helper/stats" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/opencontainers/runc/libcontainer" + "github.com/opencontainers/runc/libcontainer/cgroups" + lconfigs "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/devices" + ldevices "github.com/opencontainers/runc/libcontainer/devices" + "github.com/opencontainers/runc/libcontainer/specconv" + lutils "github.com/opencontainers/runc/libcontainer/utils" + "github.com/opencontainers/runtime-spec/specs-go" + "golang.org/x/sys/unix" +) + +var ( + // ExecutorCgroupV1MeasuredMemStats is the list of memory stats captured by the executor with cgroup-v1 + ExecutorCgroupV1MeasuredMemStats = []string{"RSS", "Cache", "Swap", "Usage", "Max Usage", "Kernel Usage", "Kernel Max Usage"} + + // ExecutorCgroupV2MeasuredMemStats is the list of memory stats captured by the executor with cgroup-v2. cgroup-v2 exposes different memory stats and no longer reports rss or max usage. + ExecutorCgroupV2MeasuredMemStats = []string{"Cache", "Swap", "Usage"} + + // ExecutorCgroupMeasuredCpuStats is the list of CPU stats captures by the executor + ExecutorCgroupMeasuredCpuStats = []string{"System Mode", "User Mode", "Throttled Periods", "Throttled Time", "Percent"} +) + +// LibcontainerExecutor implements an Executor with the runc/libcontainer api +type LibcontainerExecutor struct { + id string + command *ExecCommand + + logger hclog.Logger + + totalCpuStats *stats.CpuStats + userCpuStats *stats.CpuStats + systemCpuStats *stats.CpuStats + pidCollector *pidCollector + + container libcontainer.Container + userProc *libcontainer.Process + userProcExited chan interface{} + exitState *ProcessState +} + +func NewExecutorWithIsolation(logger hclog.Logger) Executor { + logger = logger.Named("isolated_executor") + if err := shelpers.Init(); err != nil { + logger.Error("unable to initialize stats", "error", err) + } + return &LibcontainerExecutor{ + id: strings.ReplaceAll(uuid.Generate(), "-", "_"), + logger: logger, + totalCpuStats: stats.NewCpuStats(), + userCpuStats: stats.NewCpuStats(), + systemCpuStats: stats.NewCpuStats(), + pidCollector: newPidCollector(logger), + } +} + +// Launch creates a new container in libcontainer and starts a new process with it +func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) { + l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) + + if command.Resources == nil { + command.Resources = &drivers.Resources{ + NomadResources: &structs.AllocatedTaskResources{}, + } + } + + l.command = command + + // create a new factory which will store the container state in the allocDir + factory, err := libcontainer.New( + path.Join(command.TaskDir, "../alloc/container"), + // note that os.Args[0] refers to the executor shim typically + // and first args arguments is ignored now due + // until https://github.com/opencontainers/runc/pull/1888 is merged + libcontainer.InitArgs(os.Args[0], "libcontainer-shim"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create factory: %v", err) + } + + // A container groups processes under the same isolation enforcement + containerCfg, err := newLibcontainerConfig(command) + if err != nil { + return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err) + } + + container, err := factory.Create(l.id, containerCfg) + if err != nil { + return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err) + } + l.container = container + + // Look up the binary path and make it executable + taskPath, hostPath, err := lookupTaskBin(command) + if err != nil { + return nil, err + } + if err := makeExecutable(hostPath); err != nil { + return nil, err + } + + combined := append([]string{taskPath}, command.Args...) + stdout, err := command.Stdout() + if err != nil { + return nil, err + } + stderr, err := command.Stderr() + if err != nil { + return nil, err + } + + l.logger.Debug("launching", "command", command.Cmd, "args", strings.Join(command.Args, " ")) + + // the task process will be started by the container + process := &libcontainer.Process{ + Args: combined, + Env: command.Env, + Stdout: stdout, + Stderr: stderr, + Init: true, + } + + if command.User != "" { + process.User = command.User + } + l.userProc = process + + l.totalCpuStats = stats.NewCpuStats() + l.userCpuStats = stats.NewCpuStats() + l.systemCpuStats = stats.NewCpuStats() + + // Starts the task + if err := container.Run(process); err != nil { + container.Destroy() + return nil, err + } + + pid, err := process.Pid() + if err != nil { + container.Destroy() + return nil, err + } + + // start a goroutine to wait on the process to complete, so Wait calls can + // be multiplexed + l.userProcExited = make(chan interface{}) + go l.pidCollector.collectPids(l.userProcExited, l.getAllPids) + go l.wait() + + return &ProcessState{ + Pid: pid, + ExitCode: -1, + Time: time.Now(), + }, nil +} + +func (l *LibcontainerExecutor) getAllPids() (resources.PIDs, error) { + pids, err := l.container.Processes() + if err != nil { + return nil, err + } + m := make(resources.PIDs, 1) + for _, pid := range pids { + m[pid] = resources.NewPID(pid) + } + return m, nil +} + +// Wait waits until a process has exited and returns it's exitcode and errors +func (l *LibcontainerExecutor) Wait(ctx context.Context) (*ProcessState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-l.userProcExited: + return l.exitState, nil + } +} + +func (l *LibcontainerExecutor) wait() { + defer close(l.userProcExited) + + ps, err := l.userProc.Wait() + if err != nil { + // If the process has exited before we called wait an error is returned + // the process state is embedded in the error + if exitErr, ok := err.(*exec.ExitError); ok { + ps = exitErr.ProcessState + } else { + l.logger.Error("failed to call wait on user process", "error", err) + l.exitState = &ProcessState{Pid: 0, ExitCode: 1, Time: time.Now()} + return + } + } + + l.command.Close() + + exitCode := 1 + var signal int + if status, ok := ps.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + if status.Signaled() { + const exitSignalBase = 128 + signal = int(status.Signal()) + exitCode = exitSignalBase + signal + } + } + + l.exitState = &ProcessState{ + Pid: ps.Pid(), + ExitCode: exitCode, + Signal: signal, + Time: time.Now(), + } +} + +// Shutdown stops all processes started and cleans up any resources +// created (such as mountpoints, devices, etc). +func (l *LibcontainerExecutor) Shutdown(signal string, grace time.Duration) error { + if l.container == nil { + return nil + } + + status, err := l.container.Status() + if err != nil { + return err + } + + defer l.container.Destroy() + + if status == libcontainer.Stopped { + return nil + } + + if grace > 0 { + if signal == "" { + signal = "SIGINT" + } + + sig, ok := signals.SignalLookup[signal] + if !ok { + return fmt.Errorf("error unknown signal given for shutdown: %s", signal) + } + + // Signal initial container processes only during graceful + // shutdown; hence `false` arg. + err = l.container.Signal(sig, false) + if err != nil { + return err + } + + select { + case <-l.userProcExited: + return nil + case <-time.After(grace): + // Force kill all container processes after grace period, + // hence `true` argument. + if err := l.container.Signal(os.Kill, true); err != nil { + return err + } + } + } else { + err := l.container.Signal(os.Kill, true) + if err != nil { + return err + } + } + + select { + case <-l.userProcExited: + return nil + case <-time.After(time.Second * 15): + return fmt.Errorf("process failed to exit after 15 seconds") + } +} + +// UpdateResources updates the resource isolation with new values to be enforced +func (l *LibcontainerExecutor) UpdateResources(resources *drivers.Resources) error { + return nil +} + +// Version returns the api version of the executor +func (l *LibcontainerExecutor) Version() (*ExecutorVersion, error) { + return &ExecutorVersion{Version: ExecutorVersionLatest}, nil +} + +// Stats returns the resource statistics for processes managed by the executor +func (l *LibcontainerExecutor) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) { + ch := make(chan *cstructs.TaskResourceUsage) + go l.handleStats(ch, ctx, interval) + return ch, nil + +} + +func (l *LibcontainerExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx context.Context, interval time.Duration) { + defer close(ch) + timer := time.NewTimer(0) + + measuredMemStats := ExecutorCgroupV1MeasuredMemStats + if cgroups.IsCgroup2UnifiedMode() { + measuredMemStats = ExecutorCgroupV2MeasuredMemStats + } + + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + timer.Reset(interval) + } + + lstats, err := l.container.Stats() + if err != nil { + l.logger.Warn("error collecting stats", "error", err) + return + } + + pidStats, err := l.pidCollector.pidStats() + if err != nil { + l.logger.Warn("error collecting stats", "error", err) + return + } + + ts := time.Now() + stats := lstats.CgroupStats + + // Memory Related Stats + swap := stats.MemoryStats.SwapUsage + maxUsage := stats.MemoryStats.Usage.MaxUsage + rss := stats.MemoryStats.Stats["rss"] + cache := stats.MemoryStats.Stats["cache"] + mapped_file := stats.MemoryStats.Stats["mapped_file"] + ms := &cstructs.MemoryStats{ + RSS: rss, + Cache: cache, + Swap: swap.Usage, + MappedFile: mapped_file, + Usage: stats.MemoryStats.Usage.Usage, + MaxUsage: maxUsage, + KernelUsage: stats.MemoryStats.KernelUsage.Usage, + KernelMaxUsage: stats.MemoryStats.KernelUsage.MaxUsage, + Measured: measuredMemStats, + } + + // CPU Related Stats + totalProcessCPUUsage := float64(stats.CpuStats.CpuUsage.TotalUsage) + userModeTime := float64(stats.CpuStats.CpuUsage.UsageInUsermode) + kernelModeTime := float64(stats.CpuStats.CpuUsage.UsageInKernelmode) + + totalPercent := l.totalCpuStats.Percent(totalProcessCPUUsage) + cs := &cstructs.CpuStats{ + SystemMode: l.systemCpuStats.Percent(kernelModeTime), + UserMode: l.userCpuStats.Percent(userModeTime), + Percent: totalPercent, + ThrottledPeriods: stats.CpuStats.ThrottlingData.ThrottledPeriods, + ThrottledTime: stats.CpuStats.ThrottlingData.ThrottledTime, + TotalTicks: l.systemCpuStats.TicksConsumed(totalPercent), + Measured: ExecutorCgroupMeasuredCpuStats, + } + taskResUsage := cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: ms, + CpuStats: cs, + }, + Timestamp: ts.UTC().UnixNano(), + Pids: pidStats, + } + + select { + case <-ctx.Done(): + return + case ch <- &taskResUsage: + } + + } +} + +// Signal sends a signal to the process managed by the executor +func (l *LibcontainerExecutor) Signal(s os.Signal) error { + return l.userProc.Signal(s) +} + +// Exec starts an additional process inside the container +func (l *LibcontainerExecutor) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) { + combined := append([]string{cmd}, args...) + // Capture output + buf, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize)) + + process := &libcontainer.Process{ + Args: combined, + Env: l.command.Env, + Stdout: buf, + Stderr: buf, + } + + err := l.container.Run(process) + if err != nil { + return nil, 0, err + } + + waitCh := make(chan *waitResult) + defer close(waitCh) + go l.handleExecWait(waitCh, process) + + select { + case result := <-waitCh: + ps := result.ps + if result.err != nil { + if exitErr, ok := result.err.(*exec.ExitError); ok { + ps = exitErr.ProcessState + } else { + return nil, 0, result.err + } + } + var exitCode int + if status, ok := ps.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + return buf.Bytes(), exitCode, nil + + case <-time.After(time.Until(deadline)): + process.Signal(os.Kill) + return nil, 0, context.DeadlineExceeded + } + +} + +func (l *LibcontainerExecutor) newTerminalSocket() (pty func() (*os.File, error), tty *os.File, err error) { + parent, child, err := lutils.NewSockPair("socket") + if err != nil { + return nil, nil, fmt.Errorf("failed to create terminal: %v", err) + } + + return func() (*os.File, error) { return lutils.RecvFd(parent) }, child, err + +} + +func (l *LibcontainerExecutor) ExecStreaming(ctx context.Context, cmd []string, tty bool, + stream drivers.ExecTaskStream) error { + + // the task process will be started by the container + process := &libcontainer.Process{ + Args: cmd, + Env: l.userProc.Env, + User: l.userProc.User, + Init: false, + Cwd: "/", + } + + execHelper := &execHelper{ + logger: l.logger, + + newTerminal: l.newTerminalSocket, + setTTY: func(tty *os.File) error { + process.ConsoleSocket = tty + return nil + }, + setIO: func(stdin io.Reader, stdout, stderr io.Writer) error { + process.Stdin = stdin + process.Stdout = stdout + process.Stderr = stderr + return nil + }, + + processStart: func() error { return l.container.Run(process) }, + processWait: func() (*os.ProcessState, error) { + return process.Wait() + }, + } + + return execHelper.run(ctx, tty, stream) + +} + +type waitResult struct { + ps *os.ProcessState + err error +} + +func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libcontainer.Process) { + ps, err := process.Wait() + ch <- &waitResult{ps, err} +} + +func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { + switch command.User { + case "root": + // when running as root, use the legacy set of system capabilities, so + // that we do not break existing nomad clusters using this "feature" + legacyCaps := capabilities.LegacySupported().Slice(true) + cfg.Capabilities = &lconfigs.Capabilities{ + Bounding: legacyCaps, + Permitted: legacyCaps, + Effective: legacyCaps, + Ambient: nil, + Inheritable: nil, + } + default: + // otherwise apply the plugin + task capability configuration + cfg.Capabilities = &lconfigs.Capabilities{ + Bounding: command.Capabilities, + } + } +} + +func configureNamespaces(pidMode, ipcMode string) lconfigs.Namespaces { + namespaces := lconfigs.Namespaces{{Type: lconfigs.NEWNS}} + if pidMode == IsolationModePrivate { + namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWPID}) + } + if ipcMode == IsolationModePrivate { + namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWIPC}) + } + return namespaces +} + +// configureIsolation prepares the isolation primitives of the container. +// The process runs in a container configured with the following: +// +// * the task directory as the chroot +// * dedicated mount points namespace, but shares the PID, User, domain, network namespaces with host +// * small subset of devices (e.g. stdout/stderr/stdin, tty, shm, pts); default to using the same set of devices as Docker +// * some special filesystems: `/proc`, `/sys`. Some case is given to avoid exec escaping or setting malicious values through them. +func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { + defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV + + // set the new root directory for the container + cfg.Rootfs = command.TaskDir + + // disable pivot_root if set in the driver's configuration + cfg.NoPivotRoot = command.NoPivotRoot + + // set up default namespaces as configured + cfg.Namespaces = configureNamespaces(command.ModePID, command.ModeIPC) + + if command.NetworkIsolation != nil { + cfg.Namespaces = append(cfg.Namespaces, lconfigs.Namespace{ + Type: lconfigs.NEWNET, + Path: command.NetworkIsolation.Path, + }) + } + + // paths to mask using a bind mount to /dev/null to prevent reading + cfg.MaskPaths = []string{ + "/proc/kcore", + "/sys/firmware", + } + + // paths that should be remounted as readonly inside the container + cfg.ReadonlyPaths = []string{ + "/proc/sys", "/proc/sysrq-trigger", "/proc/irq", "/proc/bus", + } + + cfg.Devices = specconv.AllowedDevices + if len(command.Devices) > 0 { + devs, err := cmdDevices(command.Devices) + if err != nil { + return err + } + cfg.Devices = append(cfg.Devices, devs...) + } + + cfg.Mounts = []*lconfigs.Mount{ + { + Source: "tmpfs", + Destination: "/dev", + Device: "tmpfs", + Flags: syscall.MS_NOSUID | syscall.MS_STRICTATIME, + Data: "mode=755", + }, + { + Source: "proc", + Destination: "/proc", + Device: "proc", + Flags: defaultMountFlags, + }, + { + Source: "devpts", + Destination: "/dev/pts", + Device: "devpts", + Flags: syscall.MS_NOSUID | syscall.MS_NOEXEC, + Data: "newinstance,ptmxmode=0666,mode=0620,gid=5", + }, + { + Device: "tmpfs", + Source: "shm", + Destination: "/dev/shm", + Data: "mode=1777,size=65536k", + Flags: defaultMountFlags, + }, + { + Source: "mqueue", + Destination: "/dev/mqueue", + Device: "mqueue", + Flags: defaultMountFlags, + }, + { + Source: "sysfs", + Destination: "/sys", + Device: "sysfs", + Flags: defaultMountFlags | syscall.MS_RDONLY, + }, + } + + if len(command.Mounts) > 0 { + cfg.Mounts = append(cfg.Mounts, cmdMounts(command.Mounts)...) + } + + return nil +} + +func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error { + // If resources are not limited then manually create cgroups needed + if !command.ResourceLimits { + return cgutil.ConfigureBasicCgroups(cfg) + } + + // set cgroups path + if cgutil.UseV2 { + // in v2, the cgroup must have been created by the client already, + // which breaks a lot of existing tests that run drivers without a client + if command.Resources == nil || command.Resources.LinuxResources == nil || command.Resources.LinuxResources.CpusetCgroupPath == "" { + return errors.New("cgroup path must be set") + } + parent, cgroup := cgutil.SplitPath(command.Resources.LinuxResources.CpusetCgroupPath) + cfg.Cgroups.Path = filepath.Join("/", parent, cgroup) + } else { + // in v1, the cgroup is created using /nomad, which is a bug because it + // does not respect the cgroup_parent client configuration + // (but makes testing easy) + id := uuid.Generate() + cfg.Cgroups.Path = filepath.Join("/", cgutil.DefaultCgroupV1Parent, id) + } + + if command.Resources == nil || command.Resources.NomadResources == nil { + return nil + } + + // Total amount of memory allowed to consume + res := command.Resources.NomadResources + memHard, memSoft := res.Memory.MemoryMaxMB, res.Memory.MemoryMB + if memHard <= 0 { + memHard = res.Memory.MemoryMB + memSoft = 0 + } + + if memHard > 0 { + cfg.Cgroups.Resources.Memory = memHard * 1024 * 1024 + cfg.Cgroups.Resources.MemoryReservation = memSoft * 1024 * 1024 + + // Disable swap to avoid issues on the machine + var memSwappiness uint64 + cfg.Cgroups.Resources.MemorySwappiness = &memSwappiness + } + + cpuShares := res.Cpu.CpuShares + if cpuShares < 2 { + return fmt.Errorf("resources.Cpu.CpuShares must be equal to or greater than 2: %v", cpuShares) + } + + // Set the relative CPU shares for this cgroup, and convert for cgroupv2 + cfg.Cgroups.Resources.CpuShares = uint64(cpuShares) + cfg.Cgroups.Resources.CpuWeight = cgroups.ConvertCPUSharesToCgroupV2Value(uint64(cpuShares)) + + if command.Resources.LinuxResources != nil && command.Resources.LinuxResources.CpusetCgroupPath != "" { + cfg.Hooks = lconfigs.Hooks{ + lconfigs.CreateRuntime: lconfigs.HookList{ + newSetCPUSetCgroupHook(command.Resources.LinuxResources.CpusetCgroupPath), + }, + } + } + + return nil +} + +func newLibcontainerConfig(command *ExecCommand) (*lconfigs.Config, error) { + cfg := &lconfigs.Config{ + Cgroups: &lconfigs.Cgroup{ + Resources: &lconfigs.Resources{ + MemorySwappiness: nil, + }, + }, + Version: "1.0.0", + } + + for _, device := range specconv.AllowedDevices { + cfg.Cgroups.Resources.Devices = append(cfg.Cgroups.Resources.Devices, &device.Rule) + } + + configureCapabilities(cfg, command) + + // children should not inherit Nomad agent oom_score_adj value + oomScoreAdj := 0 + cfg.OomScoreAdj = &oomScoreAdj + + if err := configureIsolation(cfg, command); err != nil { + return nil, err + } + + if err := configureCgroups(cfg, command); err != nil { + return nil, err + } + + return cfg, nil +} + +// cmdDevices converts a list of driver.DeviceConfigs into excutor.Devices. +func cmdDevices(driverDevices []*drivers.DeviceConfig) ([]*devices.Device, error) { + if len(driverDevices) == 0 { + return nil, nil + } + + r := make([]*devices.Device, len(driverDevices)) + + for i, d := range driverDevices { + ed, err := ldevices.DeviceFromPath(d.HostPath, d.Permissions) + if err != nil { + return nil, fmt.Errorf("failed to make device out for %s: %v", d.HostPath, err) + } + ed.Path = d.TaskPath + r[i] = ed + } + + return r, nil +} + +var userMountToUnixMount = map[string]int{ + // Empty string maps to `rprivate` for backwards compatibility in restored + // older tasks, where mount propagation will not be present. + "": unix.MS_PRIVATE | unix.MS_REC, // rprivate + structs.VolumeMountPropagationPrivate: unix.MS_PRIVATE | unix.MS_REC, // rprivate + structs.VolumeMountPropagationHostToTask: unix.MS_SLAVE | unix.MS_REC, // rslave + structs.VolumeMountPropagationBidirectional: unix.MS_SHARED | unix.MS_REC, // rshared +} + +// cmdMounts converts a list of driver.MountConfigs into excutor.Mounts. +func cmdMounts(mounts []*drivers.MountConfig) []*lconfigs.Mount { + if len(mounts) == 0 { + return nil + } + + r := make([]*lconfigs.Mount, len(mounts)) + + for i, m := range mounts { + flags := unix.MS_BIND + if m.Readonly { + flags |= unix.MS_RDONLY + } + + r[i] = &lconfigs.Mount{ + Source: m.HostPath, + Destination: m.TaskPath, + Device: "bind", + Flags: flags, + PropagationFlags: []int{userMountToUnixMount[m.PropagationMode]}, + } + } + + return r +} + +// lookupTaskBin finds the file `bin`, searching in order: +// - taskDir/local +// - taskDir +// - each mount, in order listed in the jobspec +// - a PATH-like search of usr/local/bin/, usr/bin/, and bin/ inside the taskDir +// +// Returns an absolute path inside the container that will get passed as arg[0] +// to the launched process, and the absolute path to that binary as seen by the +// host (these will be identical for binaries that don't come from mounts). +// +// See also executor.lookupBin for a version used by non-isolated drivers. +func lookupTaskBin(command *ExecCommand) (string, string, error) { + taskDir := command.TaskDir + bin := command.Cmd + + // Check in the local directory + localDir := filepath.Join(taskDir, allocdir.TaskLocal) + taskPath, hostPath, err := getPathInTaskDir(command.TaskDir, localDir, bin) + if err == nil { + return taskPath, hostPath, nil + } + + // Check at the root of the task's directory + taskPath, hostPath, err = getPathInTaskDir(command.TaskDir, command.TaskDir, bin) + if err == nil { + return taskPath, hostPath, nil + } + + // Check in our mounts + for _, mount := range command.Mounts { + taskPath, hostPath, err = getPathInMount(mount.HostPath, mount.TaskPath, bin) + if err == nil { + return taskPath, hostPath, nil + } + } + + // If there's a / in the binary's path, we can't fallback to a PATH search + if strings.Contains(bin, "/") { + return "", "", fmt.Errorf("file %s not found under path %s", bin, taskDir) + } + + // look for a file using a PATH-style lookup inside the directory + // root. Similar to the stdlib's exec.LookPath except: + // - uses a restricted lookup PATH rather than the agent process's PATH env var. + // - does not require that the file is already executable (this will be ensured + // by the caller) + // - does not prevent using relative path as added to exec.LookPath in go1.19 + // (this gets fixed-up in the caller) + + // This is a fake PATH so that we're not using the agent's PATH + restrictedPaths := []string{"/usr/local/bin", "/usr/bin", "/bin"} + + for _, dir := range restrictedPaths { + pathDir := filepath.Join(command.TaskDir, dir) + taskPath, hostPath, err = getPathInTaskDir(command.TaskDir, pathDir, bin) + if err == nil { + return taskPath, hostPath, nil + } + } + + return "", "", fmt.Errorf("file %s not found under path", bin) +} + +// getPathInTaskDir searches for the binary in the task directory and nested +// search directory. It returns the absolute path rooted inside the container +// and the absolute path on the host. +func getPathInTaskDir(taskDir, searchDir, bin string) (string, string, error) { + + hostPath := filepath.Join(searchDir, bin) + err := filepathIsRegular(hostPath) + if err != nil { + return "", "", err + } + + // Find the path relative to the task directory + rel, err := filepath.Rel(taskDir, hostPath) + if rel == "" || err != nil { + return "", "", fmt.Errorf( + "failed to determine relative path base=%q target=%q: %v", + taskDir, hostPath, err) + } + + // Turn relative-to-taskdir path into re-rooted absolute path to avoid + // libcontainer trying to resolve the binary using $PATH. + // Do *not* use filepath.Join as it will translate ".."s returned by + // filepath.Rel. Prepending "/" will cause the path to be rooted in the + // chroot which is the desired behavior. + return filepath.Clean("/" + rel), hostPath, nil +} + +// getPathInMount for the binary in the mount's host path, constructing the path +// considering that the bin path is rooted in the mount's task path and not its +// host path. It returns the absolute path rooted inside the container and the +// absolute path on the host. +func getPathInMount(mountHostPath, mountTaskPath, bin string) (string, string, error) { + + // Find the path relative to the mount point in the task so that we can + // trim off any shared prefix when we search on the host path + mountRel, err := filepath.Rel(mountTaskPath, bin) + if mountRel == "" || err != nil { + return "", "", fmt.Errorf("path was not relative to the mount task path") + } + + hostPath := filepath.Join(mountHostPath, mountRel) + + err = filepathIsRegular(hostPath) + if err != nil { + return "", "", err + } + + // Turn relative-to-taskdir path into re-rooted absolute path to avoid + // libcontainer trying to resolve the binary using $PATH. + // Do *not* use filepath.Join as it will translate ".."s returned by + // filepath.Rel. Prepending "/" will cause the path to be rooted in the + // chroot which is the desired behavior. + return filepath.Clean("/" + bin), hostPath, nil +} + +// filepathIsRegular verifies that a filepath is a regular file (i.e. not a +// directory, socket, device, etc.) +func filepathIsRegular(path string) error { + f, err := os.Stat(path) + if err != nil { + return err + } + if !f.Mode().Type().IsRegular() { + return fmt.Errorf("path was not a regular file") + } + return nil +} + +func newSetCPUSetCgroupHook(cgroupPath string) lconfigs.Hook { + return lconfigs.NewFunctionHook(func(state *specs.State) error { + return cgroups.WriteCgroupProc(cgroupPath, state.Pid) + }) +} diff --git a/executor/executor_plugin.go b/executor/executor_plugin.go new file mode 100644 index 0000000..6eb7b35 --- /dev/null +++ b/executor/executor_plugin.go @@ -0,0 +1,34 @@ +package executor + +import ( + "context" + + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/drivers/shared/executor/proto" + "google.golang.org/grpc" +) + +type ExecutorPlugin struct { + // TODO: support backwards compatibility with pre 0.9 NetRPC plugin + plugin.NetRPCUnsupportedPlugin + logger hclog.Logger + fsIsolation bool +} + +func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + if p.fsIsolation { + proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutorWithIsolation(p.logger)}) + } else { + proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutor(p.logger)}) + } + return nil +} + +func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcExecutorClient{ + client: proto.NewExecutorClient(c), + doneCtx: ctx, + logger: p.logger, + }, nil +} diff --git a/executor/executor_universal_linux.go b/executor/executor_universal_linux.go new file mode 100644 index 0000000..2e6bf87 --- /dev/null +++ b/executor/executor_universal_linux.go @@ -0,0 +1,154 @@ +package executor + +import ( + "fmt" + "os/exec" + "path/filepath" + "strconv" + "strings" + "syscall" + + "github.com/containernetworking/plugins/pkg/ns" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/taskenv" + "github.com/hashicorp/nomad/helper/users" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/specconv" +) + +// setCmdUser takes a user id as a string and looks up the user, and sets the command +// to execute as that user. +func setCmdUser(cmd *exec.Cmd, userid string) error { + u, err := users.Lookup(userid) + if err != nil { + return fmt.Errorf("failed to identify user %v: %v", userid, err) + } + + // Get the groups the user is a part of + gidStrings, err := u.GroupIds() + if err != nil { + return fmt.Errorf("unable to lookup user's group membership: %v", err) + } + + gids := make([]uint32, len(gidStrings)) + for _, gidString := range gidStrings { + u, err := strconv.ParseUint(gidString, 10, 32) + if err != nil { + return fmt.Errorf("unable to convert user's group to uint32 %s: %v", gidString, err) + } + + gids = append(gids, uint32(u)) + } + + // Convert the uid and gid + uid, err := strconv.ParseUint(u.Uid, 10, 32) + if err != nil { + return fmt.Errorf("unable to convert userid to uint32: %s", err) + } + gid, err := strconv.ParseUint(u.Gid, 10, 32) + if err != nil { + return fmt.Errorf("unable to convert groupid to uint32: %s", err) + } + + // Set the command to run as that user and group. + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + if cmd.SysProcAttr.Credential == nil { + cmd.SysProcAttr.Credential = &syscall.Credential{} + } + cmd.SysProcAttr.Credential.Uid = uint32(uid) + cmd.SysProcAttr.Credential.Gid = uint32(gid) + cmd.SysProcAttr.Credential.Groups = gids + + return nil +} + +// configureResourceContainer configured the cgroups to be used to track pids +// created by the executor +func (e *UniversalExecutor) configureResourceContainer(pid int) error { + cfg := &configs.Config{ + Cgroups: &configs.Cgroup{ + Resources: &configs.Resources{}, + }, + } + + // note: this was always here, but not used until cgroups v2 support + for _, device := range specconv.AllowedDevices { + cfg.Cgroups.Resources.Devices = append(cfg.Cgroups.Resources.Devices, &device.Rule) + } + + lookup := func(env []string, name string) (result string) { + for _, s := range env { + if strings.HasPrefix(s, name+"=") { + result = strings.TrimLeft(s, name+"=") + return + } + } + return + } + + if cgutil.UseV2 { + // in v2 we have the definitive cgroup; create and enter it + + // use the task environment variables for determining the cgroup path - + // not ideal but plumbing the values directly requires grpc protobuf changes + parent := lookup(e.commandCfg.Env, taskenv.CgroupParent) + allocID := lookup(e.commandCfg.Env, taskenv.AllocID) + task := lookup(e.commandCfg.Env, taskenv.TaskName) + if parent == "" || allocID == "" || task == "" { + return fmt.Errorf( + "environment variables %s must be set", + strings.Join([]string{taskenv.CgroupParent, taskenv.AllocID, taskenv.TaskName}, ","), + ) + } + scope := cgutil.CgroupScope(allocID, task) + path := filepath.Join("/", cgutil.GetCgroupParent(parent), scope) + cfg.Cgroups.Path = path + e.containment = resources.Contain(e.logger, cfg.Cgroups) + return e.containment.Apply(pid) + + } else { + // in v1 create a freezer cgroup for use by containment + + if err := cgutil.ConfigureBasicCgroups(cfg); err != nil { + // Log this error to help diagnose cases where nomad is run with too few + // permissions, but don't return an error. There is no separate check for + // cgroup creation permissions, so this may be the happy path. + e.logger.Warn("failed to create cgroup", + "docs", "https://www.nomadproject.io/docs/drivers/raw_exec.html#no_cgroups", + "error", err) + return nil + } + path := cfg.Cgroups.Path + e.logger.Trace("cgroup created, now need to apply", "path", path) + e.containment = resources.Contain(e.logger, cfg.Cgroups) + return e.containment.Apply(pid) + } +} + +func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) { + if e.containment == nil { + return getAllPidsByScanning() + } + return e.containment.GetPIDs(), nil +} + +// withNetworkIsolation calls the passed function the network namespace `spec` +func withNetworkIsolation(f func() error, spec *drivers.NetworkIsolationSpec) error { + if spec != nil && spec.Path != "" { + // Get a handle to the target network namespace + netNS, err := ns.GetNS(spec.Path) + if err != nil { + return err + } + + // Start the container in the network namespace + return netNS.Do(func(ns.NetNS) error { + return f() + }) + } + return f() +} diff --git a/executor/executor_unix.go b/executor/executor_unix.go new file mode 100644 index 0000000..d93c8fb --- /dev/null +++ b/executor/executor_unix.go @@ -0,0 +1,50 @@ +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris + +package executor + +import ( + "fmt" + "os" + "syscall" +) + +// configure new process group for child process +func (e *UniversalExecutor) setNewProcessGroup() error { + if e.childCmd.SysProcAttr == nil { + e.childCmd.SysProcAttr = &syscall.SysProcAttr{} + } + e.childCmd.SysProcAttr.Setpgid = true + return nil +} + +// SIGKILL the process group starting at process.Pid +func (e *UniversalExecutor) killProcessTree(process *os.Process) error { + pid := process.Pid + negative := -pid // tells unix to kill entire process group + signal := syscall.SIGKILL + + // If new process group was created upon command execution + // we can kill the whole process group now to cleanup any leftovers. + if e.childCmd.SysProcAttr != nil && e.childCmd.SysProcAttr.Setpgid { + e.logger.Trace("sending sigkill to process group", "pid", pid, "negative", negative, "signal", signal) + if err := syscall.Kill(negative, signal); err != nil && err.Error() != noSuchProcessErr { + return err + } + return nil + } + return process.Kill() +} + +// Only send the process a shutdown signal (default INT), doesn't +// necessarily kill it. +func (e *UniversalExecutor) shutdownProcess(sig os.Signal, proc *os.Process) error { + if sig == nil { + sig = os.Interrupt + } + + if err := proc.Signal(sig); err != nil && err.Error() != finishedErr { + return fmt.Errorf("executor shutdown error: %v", err) + } + + return nil +} diff --git a/executor/grpc_client.go b/executor/grpc_client.go new file mode 100644 index 0000000..7ab2dbf --- /dev/null +++ b/executor/grpc_client.go @@ -0,0 +1,267 @@ +package executor + +import ( + "context" + "fmt" + "io" + "os" + "syscall" + "time" + + "github.com/LK4D4/joincontext" + "github.com/golang/protobuf/ptypes" + hclog "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/executor/proto" + "github.com/hashicorp/nomad/helper/pluginutils/grpcutils" + "github.com/hashicorp/nomad/plugins/drivers" + dproto "github.com/hashicorp/nomad/plugins/drivers/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var _ Executor = (*grpcExecutorClient)(nil) + +type grpcExecutorClient struct { + client proto.ExecutorClient + logger hclog.Logger + + // doneCtx is close when the plugin exits + doneCtx context.Context +} + +func (c *grpcExecutorClient) Launch(cmd *ExecCommand) (*ProcessState, error) { + ctx := context.Background() + req := &proto.LaunchRequest{ + Cmd: cmd.Cmd, + Args: cmd.Args, + Resources: drivers.ResourcesToProto(cmd.Resources), + StdoutPath: cmd.StdoutPath, + StderrPath: cmd.StderrPath, + Env: cmd.Env, + User: cmd.User, + TaskDir: cmd.TaskDir, + ResourceLimits: cmd.ResourceLimits, + BasicProcessCgroup: cmd.BasicProcessCgroup, + NoPivotRoot: cmd.NoPivotRoot, + Mounts: drivers.MountsToProto(cmd.Mounts), + Devices: drivers.DevicesToProto(cmd.Devices), + NetworkIsolation: drivers.NetworkIsolationSpecToProto(cmd.NetworkIsolation), + DefaultPidMode: cmd.ModePID, + DefaultIpcMode: cmd.ModeIPC, + Capabilities: cmd.Capabilities, + } + resp, err := c.client.Launch(ctx, req) + if err != nil { + return nil, err + } + + ps, err := processStateFromProto(resp.Process) + if err != nil { + return nil, err + } + return ps, nil +} + +func (c *grpcExecutorClient) Wait(ctx context.Context) (*ProcessState, error) { + // Join the passed context and the shutdown context + ctx, _ = joincontext.Join(ctx, c.doneCtx) + + resp, err := c.client.Wait(ctx, &proto.WaitRequest{}) + if err != nil { + return nil, err + } + + ps, err := processStateFromProto(resp.Process) + if err != nil { + return nil, err + } + + return ps, nil +} + +func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) error { + ctx := context.Background() + req := &proto.ShutdownRequest{ + Signal: signal, + GracePeriod: gracePeriod.Nanoseconds(), + } + if _, err := c.client.Shutdown(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) UpdateResources(r *drivers.Resources) error { + ctx := context.Background() + req := &proto.UpdateResourcesRequest{Resources: drivers.ResourcesToProto(r)} + if _, err := c.client.UpdateResources(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) Version() (*ExecutorVersion, error) { + ctx := context.Background() + resp, err := c.client.Version(ctx, &proto.VersionRequest{}) + if err != nil { + return nil, err + } + return &ExecutorVersion{Version: resp.Version}, nil +} + +func (c *grpcExecutorClient) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) { + stream, err := c.client.Stats(ctx, &proto.StatsRequest{ + Interval: int64(interval), + }) + if err != nil { + return nil, err + } + + ch := make(chan *cstructs.TaskResourceUsage) + go c.handleStats(ctx, stream, ch) + return ch, nil +} + +func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Executor_StatsClient, ch chan<- *cstructs.TaskResourceUsage) { + defer close(ch) + for { + resp, err := stream.Recv() + if ctx.Err() != nil { + // Context canceled; exit gracefully + return + } + + if err == io.EOF || + status.Code(err) == codes.Unavailable || + status.Code(err) == codes.Canceled || + err == context.Canceled { + c.logger.Trace("executor Stats stream closed", "msg", err) + return + } else if err != nil { + c.logger.Warn("failed to receive Stats executor RPC stream, closing stream", "error", err) + return + } + + stats, err := drivers.TaskStatsFromProto(resp.Stats) + if err != nil { + c.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats) + continue + } + + select { + case ch <- stats: + case <-ctx.Done(): + return + } + } +} + +func (c *grpcExecutorClient) Signal(s os.Signal) error { + ctx := context.Background() + sig, ok := s.(syscall.Signal) + if !ok { + return fmt.Errorf("unsupported signal type: %q", s.String()) + } + req := &proto.SignalRequest{ + Signal: int32(sig), + } + if _, err := c.client.Signal(ctx, req); err != nil { + return err + } + + return nil +} + +func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) { + ctx := context.Background() + pbDeadline, err := ptypes.TimestampProto(deadline) + if err != nil { + return nil, 0, err + } + req := &proto.ExecRequest{ + Deadline: pbDeadline, + Cmd: cmd, + Args: args, + } + + resp, err := c.client.Exec(ctx, req) + if err != nil { + return nil, 0, err + } + + return resp.Output, int(resp.ExitCode), nil +} + +func (c *grpcExecutorClient) ExecStreaming(ctx context.Context, + command []string, + tty bool, + execStream drivers.ExecTaskStream) error { + + err := c.execStreaming(ctx, command, tty, execStream) + if err != nil { + return grpcutils.HandleGrpcErr(err, c.doneCtx) + } + return nil +} + +func (c *grpcExecutorClient) execStreaming(ctx context.Context, + command []string, + tty bool, + execStream drivers.ExecTaskStream) error { + + stream, err := c.client.ExecStreaming(ctx) + if err != nil { + return err + } + + err = stream.Send(&dproto.ExecTaskStreamingRequest{ + Setup: &dproto.ExecTaskStreamingRequest_Setup{ + Command: command, + Tty: tty, + }, + }) + if err != nil { + return err + } + + errCh := make(chan error, 1) + go func() { + for { + m, err := execStream.Recv() + if err == io.EOF { + return + } else if err != nil { + errCh <- err + return + } + + if err := stream.Send(m); err != nil { + errCh <- err + return + } + + } + }() + + for { + select { + case err := <-errCh: + return err + default: + } + + m, err := stream.Recv() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + + if err := execStream.Send(m); err != nil { + return err + } + } +} diff --git a/executor/grpc_server.go b/executor/grpc_server.go new file mode 100644 index 0000000..231d650 --- /dev/null +++ b/executor/grpc_server.go @@ -0,0 +1,178 @@ +package executor + +import ( + "context" + "fmt" + "syscall" + "time" + + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/hashicorp/nomad/drivers/shared/executor/proto" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" + sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto" +) + +type grpcExecutorServer struct { + impl Executor +} + +func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) { + ps, err := s.impl.Launch(&ExecCommand{ + Cmd: req.Cmd, + Args: req.Args, + Resources: drivers.ResourcesFromProto(req.Resources), + StdoutPath: req.StdoutPath, + StderrPath: req.StderrPath, + Env: req.Env, + User: req.User, + TaskDir: req.TaskDir, + ResourceLimits: req.ResourceLimits, + BasicProcessCgroup: req.BasicProcessCgroup, + NoPivotRoot: req.NoPivotRoot, + Mounts: drivers.MountsFromProto(req.Mounts), + Devices: drivers.DevicesFromProto(req.Devices), + NetworkIsolation: drivers.NetworkIsolationSpecFromProto(req.NetworkIsolation), + ModePID: req.DefaultPidMode, + ModeIPC: req.DefaultIpcMode, + Capabilities: req.Capabilities, + }) + + if err != nil { + return nil, err + } + + process, err := processStateToProto(ps) + if err != nil { + return nil, err + } + + return &proto.LaunchResponse{ + Process: process, + }, nil +} + +func (s *grpcExecutorServer) Wait(ctx context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) { + ps, err := s.impl.Wait(ctx) + if err != nil { + return nil, err + } + + process, err := processStateToProto(ps) + if err != nil { + return nil, err + } + + return &proto.WaitResponse{ + Process: process, + }, nil +} + +func (s *grpcExecutorServer) Shutdown(ctx context.Context, req *proto.ShutdownRequest) (*proto.ShutdownResponse, error) { + if err := s.impl.Shutdown(req.Signal, time.Duration(req.GracePeriod)); err != nil { + return nil, err + } + + return &proto.ShutdownResponse{}, nil +} + +func (s *grpcExecutorServer) UpdateResources(ctx context.Context, req *proto.UpdateResourcesRequest) (*proto.UpdateResourcesResponse, error) { + if err := s.impl.UpdateResources(drivers.ResourcesFromProto(req.Resources)); err != nil { + return nil, err + } + + return &proto.UpdateResourcesResponse{}, nil +} + +func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*proto.VersionResponse, error) { + v, err := s.impl.Version() + if err != nil { + return nil, err + } + + return &proto.VersionResponse{ + Version: v.Version, + }, nil +} + +func (s *grpcExecutorServer) Stats(req *proto.StatsRequest, stream proto.Executor_StatsServer) error { + interval := time.Duration(req.Interval) + if interval == 0 { + interval = time.Second + } + + outCh, err := s.impl.Stats(stream.Context(), interval) + if err != nil { + if rec, ok := err.(structs.Recoverable); ok { + st := status.New(codes.FailedPrecondition, rec.Error()) + st, err := st.WithDetails(&sproto.RecoverableError{Recoverable: rec.IsRecoverable()}) + if err != nil { + // If this error, it will always error + panic(err) + } + return st.Err() + } + return err + } + + for resp := range outCh { + pbStats, err := drivers.TaskStatsToProto(resp) + if err != nil { + return err + } + + presp := &proto.StatsResponse{ + Stats: pbStats, + } + + // Send the stats + if err := stream.Send(presp); err != nil { + return err + } + } + + return nil +} + +func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) { + sig := syscall.Signal(req.Signal) + if err := s.impl.Signal(sig); err != nil { + return nil, err + } + return &proto.SignalResponse{}, nil +} + +func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (*proto.ExecResponse, error) { + deadline, err := ptypes.Timestamp(req.Deadline) + if err != nil { + return nil, err + } + + out, exit, err := s.impl.Exec(deadline, req.Cmd, req.Args) + if err != nil { + return nil, err + } + + return &proto.ExecResponse{ + Output: out, + ExitCode: int32(exit), + }, nil +} + +func (s *grpcExecutorServer) ExecStreaming(server proto.Executor_ExecStreamingServer) error { + msg, err := server.Recv() + if err != nil { + return fmt.Errorf("failed to receive initial message: %v", err) + } + + if msg.Setup == nil { + return fmt.Errorf("first message should always be setup") + } + + return s.impl.ExecStreaming(server.Context(), + msg.Setup.Command, msg.Setup.Tty, + server) +} diff --git a/executor/libcontainer_nsenter_linux.go b/executor/libcontainer_nsenter_linux.go new file mode 100644 index 0000000..9ecada4 --- /dev/null +++ b/executor/libcontainer_nsenter_linux.go @@ -0,0 +1,29 @@ +package executor + +import ( + "os" + "runtime" + + hclog "github.com/hashicorp/go-hclog" + "github.com/opencontainers/runc/libcontainer" + _ "github.com/opencontainers/runc/libcontainer/nsenter" +) + +// init is only run on linux and is used when the LibcontainerExecutor starts +// a new process. The libcontainer shim takes over the process, setting up the +// configured isolation and limitions before execve into the user process +// +// This subcommand handler is implemented as an `init`, libcontainer shim is handled anywhere +// this package is used (including tests) without needing to write special command handler. +func init() { + if len(os.Args) > 1 && os.Args[1] == "libcontainer-shim" { + runtime.GOMAXPROCS(1) + runtime.LockOSThread() + factory, _ := libcontainer.New("") + if err := factory.StartInitialization(); err != nil { + hclog.L().Error("failed to initialize libcontainer-shim", "error", err) + os.Exit(1) + } + panic("--this line should have never been executed, congratulations--") + } +} diff --git a/executor/pid_collector.go b/executor/pid_collector.go new file mode 100644 index 0000000..2413f8e --- /dev/null +++ b/executor/pid_collector.go @@ -0,0 +1,211 @@ +package executor + +import ( + "os" + "strconv" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/stats" + "github.com/hashicorp/nomad/plugins/drivers" + ps "github.com/mitchellh/go-ps" + "github.com/shirou/gopsutil/v3/process" +) + +var ( + // pidScanInterval is the interval at which the executor scans the process + // tree for finding out the pids that the executor and it's child processes + // have forked + pidScanInterval = 5 * time.Second +) + +// pidCollector is a utility that can be embedded in an executor to collect pid +// stats +type pidCollector struct { + pids map[int]*resources.PID + pidLock sync.RWMutex + logger hclog.Logger +} + +// allPidGetter is a func which is used by the pid collector to gather +// stats on +type allPidGetter func() (resources.PIDs, error) + +func newPidCollector(logger hclog.Logger) *pidCollector { + return &pidCollector{ + pids: make(map[int]*resources.PID), + logger: logger.Named("pid_collector"), + } +} + +// collectPids collects the pids of the child processes that the executor is +// running every 5 seconds +func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGetter) { + // Fire the timer right away when the executor starts from there on the pids + // are collected every scan interval + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + pids, err := pidGetter() + if err != nil { + c.logger.Debug("error collecting pids", "error", err) + } + c.pidLock.Lock() + + // Adding pids which are not being tracked + for pid, np := range pids { + if _, ok := c.pids[pid]; !ok { + c.pids[pid] = np + } + } + // Removing pids which are no longer present + for pid := range c.pids { + if _, ok := pids[pid]; !ok { + delete(c.pids, pid) + } + } + c.pidLock.Unlock() + timer.Reset(pidScanInterval) + case <-stopCh: + return + } + } +} + +// scanPids scans all the pids on the machine running the current executor and +// returns the child processes of the executor. +func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) { + processFamily := make(map[int]struct{}) + processFamily[parentPid] = struct{}{} + + // A mapping of pids to their parent pids. It is used to build the process + // tree of the executing task + pidsRemaining := make(map[int]int, len(allPids)) + for _, pid := range allPids { + pidsRemaining[pid.Pid()] = pid.PPid() + } + + for { + // flag to indicate if we have found a match + foundNewPid := false + + for pid, ppid := range pidsRemaining { + _, childPid := processFamily[ppid] + + // checking if the pid is a child of any of the parents + if childPid { + processFamily[pid] = struct{}{} + delete(pidsRemaining, pid) + foundNewPid = true + } + } + + // not scanning anymore if we couldn't find a single match + if !foundNewPid { + break + } + } + + res := make(map[int]*resources.PID) + for pid := range processFamily { + res[pid] = &resources.PID{ + PID: pid, + StatsTotalCPU: stats.NewCpuStats(), + StatsUserCPU: stats.NewCpuStats(), + StatsSysCPU: stats.NewCpuStats(), + } + } + return res, nil +} + +// pidStats returns the resource usage stats per pid +func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) { + stats := make(map[string]*drivers.ResourceUsage) + c.pidLock.RLock() + pids := make(map[int]*resources.PID, len(c.pids)) + for k, v := range c.pids { + pids[k] = v + } + c.pidLock.RUnlock() + for pid, np := range pids { + p, err := process.NewProcess(int32(pid)) + if err != nil { + c.logger.Trace("unable to create new process", "pid", pid, "error", err) + continue + } + ms := &drivers.MemoryStats{} + if memInfo, err := p.MemoryInfo(); err == nil { + ms.RSS = memInfo.RSS + ms.Swap = memInfo.Swap + ms.Measured = ExecutorBasicMeasuredMemStats + } + + cs := &drivers.CpuStats{} + if cpuStats, err := p.Times(); err == nil { + cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second)) + cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second)) + cs.Measured = ExecutorBasicMeasuredCpuStats + + // calculate cpu usage percent + cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second)) + } + stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs} + } + + return stats, nil +} + +// aggregatedResourceUsage aggregates the resource usage of all the pids and +// returns a TaskResourceUsage data point +func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string]*drivers.ResourceUsage) *drivers.TaskResourceUsage { + ts := time.Now().UTC().UnixNano() + var ( + systemModeCPU, userModeCPU, percent float64 + totalRSS, totalSwap uint64 + ) + + for _, pidStat := range pidStats { + systemModeCPU += pidStat.CpuStats.SystemMode + userModeCPU += pidStat.CpuStats.UserMode + percent += pidStat.CpuStats.Percent + + totalRSS += pidStat.MemoryStats.RSS + totalSwap += pidStat.MemoryStats.Swap + } + + totalCPU := &drivers.CpuStats{ + SystemMode: systemModeCPU, + UserMode: userModeCPU, + Percent: percent, + Measured: ExecutorBasicMeasuredCpuStats, + TotalTicks: systemCpuStats.TicksConsumed(percent), + } + + totalMemory := &drivers.MemoryStats{ + RSS: totalRSS, + Swap: totalSwap, + Measured: ExecutorBasicMeasuredMemStats, + } + + resourceUsage := drivers.ResourceUsage{ + MemoryStats: totalMemory, + CpuStats: totalCPU, + } + return &drivers.TaskResourceUsage{ + ResourceUsage: &resourceUsage, + Timestamp: ts, + Pids: pidStats, + } +} + +func getAllPidsByScanning() (resources.PIDs, error) { + allProcesses, err := ps.Processes() + if err != nil { + return nil, err + } + return scanPids(os.Getpid(), allProcesses) +} diff --git a/executor/plugins.go b/executor/plugins.go new file mode 100644 index 0000000..0e3b977 --- /dev/null +++ b/executor/plugins.go @@ -0,0 +1,55 @@ +package executor + +import ( + "net" + + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" +) + +// ExecutorConfig is the config that Nomad passes to the executor +type ExecutorConfig struct { + + // LogFile is the file to which Executor logs + LogFile string + + // LogLevel is the level of the logs to putout + LogLevel string + + // FSIsolation if set will use an executor implementation that support + // filesystem isolation + FSIsolation bool +} + +func GetPluginMap(logger hclog.Logger, fsIsolation bool) map[string]plugin.Plugin { + return map[string]plugin.Plugin{ + "executor": &ExecutorPlugin{ + logger: logger, + fsIsolation: fsIsolation, + }, + } +} + +// ExecutorReattachConfig is the config that we serialize and de-serialize and +// store in disk +type PluginReattachConfig struct { + Pid int + AddrNet string + AddrName string +} + +// PluginConfig returns a config from an ExecutorReattachConfig +func (c *PluginReattachConfig) PluginConfig() *plugin.ReattachConfig { + var addr net.Addr + switch c.AddrNet { + case "unix", "unixgram", "unixpacket": + addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) + case "tcp", "tcp4", "tcp6": + addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) + } + return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} +} + +func NewPluginReattachConfig(c *plugin.ReattachConfig) *PluginReattachConfig { + return &PluginReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} +} diff --git a/executor/pty_unix.go b/executor/pty_unix.go new file mode 100644 index 0000000..2df23e3 --- /dev/null +++ b/executor/pty_unix.go @@ -0,0 +1,43 @@ +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris +// +build darwin dragonfly freebsd linux netbsd openbsd solaris + +package executor + +import ( + "fmt" + "io" + "os" + "strings" + "syscall" + + "github.com/creack/pty" + "golang.org/x/sys/unix" +) + +func sessionCmdAttr(tty *os.File) *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + Setsid: true, + Setctty: true, + } +} + +func setTTYSize(w io.Writer, height, width int32) error { + f, ok := w.(*os.File) + if !ok { + return fmt.Errorf("attempted to resize a non-tty session") + } + + return pty.Setsize(f, &pty.Winsize{ + Rows: uint16(height), + Cols: uint16(width), + }) + +} + +func isUnixEIOErr(err error) bool { + if err == nil { + return false + } + + return strings.Contains(err.Error(), unix.EIO.Error()) +} diff --git a/executor/resource_container_default.go b/executor/resource_container_default.go new file mode 100644 index 0000000..0274e1b --- /dev/null +++ b/executor/resource_container_default.go @@ -0,0 +1,12 @@ +//go:build !linux + +package executor + +// resourceContainerContext is a platform-specific struct for managing a +// resource container. +type resourceContainerContext struct { +} + +func (rc *resourceContainerContext) executorCleanup() error { + return nil +} diff --git a/executor/utils.go b/executor/utils.go new file mode 100644 index 0000000..237152a --- /dev/null +++ b/executor/utils.go @@ -0,0 +1,138 @@ +package executor + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + + "github.com/golang/protobuf/ptypes" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/drivers/shared/executor/proto" + "github.com/hashicorp/nomad/plugins/base" +) + +const ( + // ExecutorDefaultMaxPort is the default max port used by the executor for + // searching for an available port + ExecutorDefaultMaxPort = 14512 + + // ExecutorDefaultMinPort is the default min port used by the executor for + // searching for an available port + ExecutorDefaultMinPort = 14000 +) + +// CreateExecutor launches an executor plugin and returns an instance of the +// Executor interface +func CreateExecutor(logger hclog.Logger, driverConfig *base.ClientDriverConfig, + executorConfig *ExecutorConfig) (Executor, *plugin.Client, error) { + + c, err := json.Marshal(executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("unable to create executor config: %v", err) + } + bin, err := os.Executable() + if err != nil { + return nil, nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + + p := &ExecutorPlugin{ + logger: logger, + fsIsolation: executorConfig.FSIsolation, + } + + config := &plugin.ClientConfig{ + HandshakeConfig: base.Handshake, + Plugins: map[string]plugin.Plugin{"executor": p}, + Cmd: exec.Command(bin, "executor", string(c)), + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + Logger: logger.Named("executor"), + } + + if driverConfig != nil { + config.MaxPort = driverConfig.ClientMaxPort + config.MinPort = driverConfig.ClientMinPort + } else { + config.MaxPort = ExecutorDefaultMaxPort + config.MinPort = ExecutorDefaultMinPort + } + + // setting the setsid of the plugin process so that it doesn't get signals sent to + // the nomad client. + if config.Cmd != nil { + isolateCommand(config.Cmd) + } + + return newExecutorClient(config, logger) +} + +// ReattachToExecutor launches a plugin with a given plugin config +func ReattachToExecutor(reattachConfig *plugin.ReattachConfig, logger hclog.Logger) (Executor, *plugin.Client, error) { + config := &plugin.ClientConfig{ + HandshakeConfig: base.Handshake, + Reattach: reattachConfig, + Plugins: GetPluginMap(logger, false), + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + Logger: logger.Named("executor"), + } + + return newExecutorClient(config, logger) +} + +func newExecutorClient(config *plugin.ClientConfig, logger hclog.Logger) (Executor, *plugin.Client, error) { + executorClient := plugin.NewClient(config) + rpcClient, err := executorClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err) + } + + raw, err := rpcClient.Dispense("executor") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) + } + executorPlugin, ok := raw.(Executor) + if !ok { + return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw) + } + return executorPlugin, executorClient, nil +} + +func processStateToProto(ps *ProcessState) (*proto.ProcessState, error) { + timestamp, err := ptypes.TimestampProto(ps.Time) + if err != nil { + return nil, err + } + pb := &proto.ProcessState{ + Pid: int32(ps.Pid), + ExitCode: int32(ps.ExitCode), + Signal: int32(ps.Signal), + Time: timestamp, + } + + return pb, nil +} + +func processStateFromProto(pb *proto.ProcessState) (*ProcessState, error) { + timestamp, err := ptypes.Timestamp(pb.Time) + if err != nil { + return nil, err + } + + return &ProcessState{ + Pid: int(pb.Pid), + ExitCode: int(pb.ExitCode), + Signal: int(pb.Signal), + Time: timestamp, + }, nil +} + +// IsolationMode returns the namespace isolation mode as determined from agent +// plugin configuration and task driver configuration. The task configuration +// takes precedence, if it is configured. +func IsolationMode(plugin, task string) string { + if task != "" { + return task + } + return plugin +} diff --git a/executor/utils_unix.go b/executor/utils_unix.go new file mode 100644 index 0000000..6f45ccf --- /dev/null +++ b/executor/utils_unix.go @@ -0,0 +1,19 @@ +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris +// +build darwin dragonfly freebsd linux netbsd openbsd solaris + +package executor + +import ( + "os/exec" + "syscall" +) + +// isolateCommand sets the setsid flag in exec.Cmd to true so that the process +// becomes the process leader in a new session and doesn't receive signals that +// are sent to the parent process. +func isolateCommand(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setsid = true +} diff --git a/executor/z_executor_cmd.go b/executor/z_executor_cmd.go new file mode 100644 index 0000000..5a5f13b --- /dev/null +++ b/executor/z_executor_cmd.go @@ -0,0 +1,55 @@ +package executor + +import ( + "encoding/json" + "os" + + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/plugins/base" +) + +// Install a plugin cli handler to ease working with tests +// and external plugins. +// This init() must be initialized last in package required by the child plugin +// process. It's recommended to avoid any other `init()` or inline any necessary calls +// here. See eeaa95d commit message for more details. +func init() { + if len(os.Args) > 1 && os.Args[1] == "executor" { + if len(os.Args) != 3 { + hclog.L().Error("json configuration not provided") + os.Exit(1) + } + + config := os.Args[2] + var executorConfig ExecutorConfig + if err := json.Unmarshal([]byte(config), &executorConfig); err != nil { + os.Exit(1) + } + + f, err := os.OpenFile(executorConfig.LogFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + hclog.L().Error(err.Error()) + os.Exit(1) + } + + // Create the logger + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.LevelFromString(executorConfig.LogLevel), + JSONFormat: true, + Output: f, + }) + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: base.Handshake, + Plugins: GetPluginMap( + logger, + executorConfig.FSIsolation, + ), + GRPCServer: plugin.DefaultGRPCServer, + Logger: logger, + }) + os.Exit(0) + } +} |