diff options
author | Alex Auvolat <alex@adnab.me> | 2022-11-28 17:15:12 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-11-28 17:15:12 +0100 |
commit | bf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2 (patch) | |
tree | 32f52eeb5d60ae33e8a40c2d8b26d70cac19a473 /executor/executor.go | |
parent | 63e31b9ed97f34f4ea709f505c37f5e8968a0f36 (diff) | |
download | nomad-driver-nix2-bf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2.tar.gz nomad-driver-nix2-bf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2.zip |
Vendor executor module so that we can patch it
Diffstat (limited to 'executor/executor.go')
-rw-r--r-- | executor/executor.go | 722 |
1 files changed, 722 insertions, 0 deletions
diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 0000000..a3fe56c --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,722 @@ +package executor + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "syscall" + "time" + + "github.com/armon/circbuf" + "github.com/creack/pty" + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/fifo" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" + shelpers "github.com/hashicorp/nomad/helper/stats" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/syndtr/gocapability/capability" +) + +const ( + // ExecutorVersionLatest is the current and latest version of the executor + ExecutorVersionLatest = "2.0.0" + + // ExecutorVersionPre0_9 is the version of executor use prior to the release + // of 0.9.x + ExecutorVersionPre0_9 = "1.1.0" + + // IsolationModePrivate represents the private isolation mode for a namespace + IsolationModePrivate = "private" + + // IsolationModeHost represents the host isolation mode for a namespace + IsolationModeHost = "host" +) + +var ( + // The statistics the basic executor exposes + ExecutorBasicMeasuredMemStats = []string{"RSS", "Swap"} + ExecutorBasicMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} +) + +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + // Launch a user process configured by the given ExecCommand + Launch(launchCmd *ExecCommand) (*ProcessState, error) + + // Wait blocks until the process exits or an error occures + Wait(ctx context.Context) (*ProcessState, error) + + // Shutdown will shutdown the executor by stopping the user process, + // cleaning up and resources created by the executor. The shutdown sequence + // will first send the given signal to the process. This defaults to "SIGINT" + // if not specified. The executor will then wait for the process to exit + // before cleaning up other resources. If the executor waits longer than the + // given grace period, the process is forcefully killed. + // + // To force kill the user process, gracePeriod can be set to 0. + Shutdown(signal string, gracePeriod time.Duration) error + + // UpdateResources updates any resource isolation enforcement with new + // constraints if supported. + UpdateResources(*drivers.Resources) error + + // Version returns the executor API version + Version() (*ExecutorVersion, error) + + // Returns a channel of stats. Stats are collected and + // pushed to the channel on the given interval + Stats(context.Context, time.Duration) (<-chan *cstructs.TaskResourceUsage, error) + + // Signal sends the given signal to the user process + Signal(os.Signal) error + + // Exec executes the given command and args inside the executor context + // and returns the output and exit code. + Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) + + ExecStreaming(ctx context.Context, cmd []string, tty bool, + stream drivers.ExecTaskStream) error +} + +// ExecCommand holds the user command, args, and other isolation related +// settings. +// +// Important (!): when adding fields, make sure to update the RPC methods in +// grpcExecutorClient.Launch and grpcExecutorServer.Launch. Number of hours +// spent tracking this down: too many. +type ExecCommand struct { + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. + Args []string + + // Resources defined by the task + Resources *drivers.Resources + + // StdoutPath is the path the process stdout should be written to + StdoutPath string + stdout io.WriteCloser + + // StderrPath is the path the process stderr should be written to + StderrPath string + stderr io.WriteCloser + + // Env is the list of KEY=val pairs of environment variables to be set + Env []string + + // User is the user which the executor uses to run the command. + User string + + // TaskDir is the directory path on the host where for the task + TaskDir string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool + + // Cgroup marks whether we put the process in a cgroup. Setting this field + // doesn't enforce resource limits. To enforce limits, set ResourceLimits. + // Using the cgroup does allow more precise cleanup of processes. + BasicProcessCgroup bool + + // NoPivotRoot disables using pivot_root for isolation, useful when the root + // partition is on a ramdisk which does not support pivot_root, + // see man 2 pivot_root + NoPivotRoot bool + + // Mounts are the host paths to be be made available inside rootfs + Mounts []*drivers.MountConfig + + // Devices are the the device nodes to be created in isolation environment + Devices []*drivers.DeviceConfig + + // NetworkIsolation is the network isolation configuration. + NetworkIsolation *drivers.NetworkIsolationSpec + + // ModePID is the PID isolation mode (private or host). + ModePID string + + // ModeIPC is the IPC isolation mode (private or host). + ModeIPC string + + // Capabilities are the linux capabilities to be enabled by the task driver. + Capabilities []string +} + +// SetWriters sets the writer for the process stdout and stderr. This should +// not be used if writing to a file path such as a fifo file. SetStdoutWriter +// is mainly used for unit testing purposes. +func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) { + c.stdout = out + c.stderr = err +} + +// GetWriters returns the unexported io.WriteCloser for the stdout and stderr +// handles. This is mainly used for unit testing purposes. +func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) { + return c.stdout, c.stderr +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } + +// Stdout returns a writer for the configured file descriptor +func (c *ExecCommand) Stdout() (io.WriteCloser, error) { + if c.stdout == nil { + if c.StdoutPath != "" { + f, err := fifo.OpenWriter(c.StdoutPath) + if err != nil { + return nil, fmt.Errorf("failed to create stdout: %v", err) + } + c.stdout = f + } else { + c.stdout = nopCloser{ioutil.Discard} + } + } + return c.stdout, nil +} + +// Stderr returns a writer for the configured file descriptor +func (c *ExecCommand) Stderr() (io.WriteCloser, error) { + if c.stderr == nil { + if c.StderrPath != "" { + f, err := fifo.OpenWriter(c.StderrPath) + if err != nil { + return nil, fmt.Errorf("failed to create stderr: %v", err) + } + c.stderr = f + } else { + c.stderr = nopCloser{ioutil.Discard} + } + } + return c.stderr, nil +} + +func (c *ExecCommand) Close() { + if c.stdout != nil { + c.stdout.Close() + } + if c.stderr != nil { + c.stderr.Close() + } +} + +// ProcessState holds information about the state of a user process. +type ProcessState struct { + Pid int + ExitCode int + Signal int + Time time.Time +} + +// ExecutorVersion is the version of the executor +type ExecutorVersion struct { + Version string +} + +func (v *ExecutorVersion) GoString() string { + return v.Version +} + +// UniversalExecutor is an implementation of the Executor which launches and +// supervises processes. In addition to process supervision it provides resource +// and file system isolation +type UniversalExecutor struct { + childCmd exec.Cmd + commandCfg *ExecCommand + + exitState *ProcessState + processExited chan interface{} + + // containment is used to cleanup resources created by the executor + // currently only used for killing pids via freezer cgroup on linux + containment resources.Containment + + totalCpuStats *stats.CpuStats + userCpuStats *stats.CpuStats + systemCpuStats *stats.CpuStats + pidCollector *pidCollector + + logger hclog.Logger +} + +// NewExecutor returns an Executor +func NewExecutor(logger hclog.Logger) Executor { + logger = logger.Named("executor") + if err := shelpers.Init(); err != nil { + logger.Error("unable to initialize stats", "error", err) + } + + return &UniversalExecutor{ + logger: logger, + processExited: make(chan interface{}), + totalCpuStats: stats.NewCpuStats(), + userCpuStats: stats.NewCpuStats(), + systemCpuStats: stats.NewCpuStats(), + pidCollector: newPidCollector(logger), + } +} + +// Version returns the api version of the executor +func (e *UniversalExecutor) Version() (*ExecutorVersion, error) { + return &ExecutorVersion{Version: ExecutorVersionLatest}, nil +} + +// Launch launches the main process and returns its state. It also +// configures an applies isolation on certain platforms. +func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) { + e.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) + + e.commandCfg = command + + // setting the user of the process + if command.User != "" { + e.logger.Debug("running command as user", "user", command.User) + if err := setCmdUser(&e.childCmd, command.User); err != nil { + return nil, err + } + } + + // set the task dir as the working directory for the command + e.childCmd.Dir = e.commandCfg.TaskDir + + // start command in separate process group + if err := e.setNewProcessGroup(); err != nil { + return nil, err + } + + // Maybe setup containment (for now, cgroups only only on linux) + if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup { + pid := os.Getpid() + if err := e.configureResourceContainer(pid); err != nil { + e.logger.Error("failed to configure resource container", "pid", pid, "error", err) + return nil, err + } + } + + stdout, err := e.commandCfg.Stdout() + if err != nil { + return nil, err + } + stderr, err := e.commandCfg.Stderr() + if err != nil { + return nil, err + } + + e.childCmd.Stdout = stdout + e.childCmd.Stderr = stderr + + // Look up the binary path and make it executable + absPath, err := lookupBin(command.TaskDir, command.Cmd) + if err != nil { + return nil, err + } + + if err := makeExecutable(absPath); err != nil { + return nil, err + } + + path := absPath + + // Set the commands arguments + e.childCmd.Path = path + e.childCmd.Args = append([]string{e.childCmd.Path}, command.Args...) + e.childCmd.Env = e.commandCfg.Env + + // Start the process + if err = withNetworkIsolation(e.childCmd.Start, command.NetworkIsolation); err != nil { + return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err) + } + + go e.pidCollector.collectPids(e.processExited, e.getAllPids) + go e.wait() + return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil +} + +// Exec a command inside a container for exec and java drivers. +func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) { + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + return ExecScript(ctx, e.childCmd.Dir, e.commandCfg.Env, e.childCmd.SysProcAttr, e.commandCfg.NetworkIsolation, name, args) +} + +// ExecScript executes cmd with args and returns the output, exit code, and +// error. Output is truncated to drivers/shared/structs.CheckBufSize +func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.SysProcAttr, + netSpec *drivers.NetworkIsolationSpec, name string, args []string) ([]byte, int, error) { + + cmd := exec.CommandContext(ctx, name, args...) + + // Copy runtime environment from the main command + cmd.SysProcAttr = attrs + cmd.Dir = dir + cmd.Env = env + + // Capture output + buf, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize)) + cmd.Stdout = buf + cmd.Stderr = buf + + if err := withNetworkIsolation(cmd.Run, netSpec); err != nil { + exitErr, ok := err.(*exec.ExitError) + if !ok { + // Non-exit error, return it and let the caller treat + // it as a critical failure + return nil, 0, err + } + + // Some kind of error happened; default to critical + exitCode := 2 + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + + // Don't return the exitError as the caller only needs the + // output and code. + return buf.Bytes(), exitCode, nil + } + return buf.Bytes(), 0, nil +} + +func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("command is required") + } + + cmd := exec.CommandContext(ctx, command[0], command[1:]...) + + cmd.Dir = "/" + cmd.Env = e.childCmd.Env + + execHelper := &execHelper{ + logger: e.logger, + + newTerminal: func() (func() (*os.File, error), *os.File, error) { + pty, tty, err := pty.Open() + if err != nil { + return nil, nil, err + } + + return func() (*os.File, error) { return pty, nil }, tty, err + }, + setTTY: func(tty *os.File) error { + cmd.SysProcAttr = sessionCmdAttr(tty) + + cmd.Stdin = tty + cmd.Stdout = tty + cmd.Stderr = tty + return nil + }, + setIO: func(stdin io.Reader, stdout, stderr io.Writer) error { + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr + return nil + }, + processStart: func() error { + if u := e.commandCfg.User; u != "" { + if err := setCmdUser(cmd, u); err != nil { + return err + } + } + + return withNetworkIsolation(cmd.Start, e.commandCfg.NetworkIsolation) + }, + processWait: func() (*os.ProcessState, error) { + err := cmd.Wait() + return cmd.ProcessState, err + }, + } + + return execHelper.run(ctx, tty, stream) +} + +// Wait waits until a process has exited and returns it's exitcode and errors +func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.processExited: + return e.exitState, nil + } +} + +func (e *UniversalExecutor) UpdateResources(resources *drivers.Resources) error { + return nil +} + +func (e *UniversalExecutor) wait() { + defer close(e.processExited) + defer e.commandCfg.Close() + pid := e.childCmd.Process.Pid + err := e.childCmd.Wait() + if err == nil { + e.exitState = &ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} + return + } + + exitCode := 1 + var signal int + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + if status.Signaled() { + // bash(1) uses the lower 7 bits of a uint8 + // to indicate normal program failure (see + // <sysexits.h>). If a process terminates due + // to a signal, encode the signal number to + // indicate which signal caused the process + // to terminate. Mirror this exit code + // encoding scheme. + const exitSignalBase = 128 + signal = int(status.Signal()) + exitCode = exitSignalBase + signal + } + } + } else { + e.logger.Warn("unexpected Cmd.Wait() error type", "error", err) + } + + e.exitState = &ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} +} + +var ( + // finishedErr is the error message received when trying to kill and already + // exited process. + finishedErr = "os: process already finished" + + // noSuchProcessErr is the error message received when trying to kill a non + // existing process (e.g. when killing a process group). + noSuchProcessErr = "no such process" +) + +// Shutdown cleans up the alloc directory, destroys resource container and +// kills the user process. +func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { + e.logger.Debug("shutdown requested", "signal", signal, "grace_period_ms", grace.Round(time.Millisecond)) + var merr multierror.Error + + // If the executor did not launch a process, return. + if e.commandCfg == nil { + return nil + } + + // If there is no process we can't shutdown + if e.childCmd.Process == nil { + e.logger.Warn("failed to shutdown due to missing process", "error", "no process found") + return fmt.Errorf("executor failed to shutdown error: no process found") + } + + proc, err := os.FindProcess(e.childCmd.Process.Pid) + if err != nil { + err = fmt.Errorf("executor failed to find process: %v", err) + e.logger.Warn("failed to shutdown due to inability to find process", "pid", e.childCmd.Process.Pid, "error", err) + return err + } + + // If grace is 0 then skip shutdown logic + if grace > 0 { + // Default signal to SIGINT if not set + if signal == "" { + signal = "SIGINT" + } + + sig, ok := signals.SignalLookup[signal] + if !ok { + err = fmt.Errorf("error unknown signal given for shutdown: %s", signal) + e.logger.Warn("failed to shutdown", "error", err) + return err + } + + if err := e.shutdownProcess(sig, proc); err != nil { + e.logger.Warn("failed to shutdown process", "pid", proc.Pid, "error", err) + return err + } + + select { + case <-e.processExited: + case <-time.After(grace): + proc.Kill() + } + } else { + proc.Kill() + } + + // Wait for process to exit + select { + case <-e.processExited: + case <-time.After(time.Second * 15): + e.logger.Warn("process did not exit after 15 seconds") + merr.Errors = append(merr.Errors, fmt.Errorf("process did not exit after 15 seconds")) + } + + // prefer killing the process via platform-dependent resource containment + killByContainment := e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup + + if !killByContainment { + // there is no containment, so kill the group the old fashioned way by sending + // SIGKILL to the negative pid + if cleanupChildrenErr := e.killProcessTree(proc); cleanupChildrenErr != nil && cleanupChildrenErr.Error() != finishedErr { + merr.Errors = append(merr.Errors, + fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, cleanupChildrenErr)) + } + } else { + // there is containment available (e.g. cgroups) so defer to that implementation + // for killing the processes + if cleanupErr := e.containment.Cleanup(); cleanupErr != nil { + e.logger.Warn("containment cleanup failed", "error", cleanupErr) + merr.Errors = append(merr.Errors, cleanupErr) + } + } + + if err = merr.ErrorOrNil(); err != nil { + e.logger.Warn("failed to shutdown due to some error", "error", err.Error()) + return err + } + + return nil +} + +// Signal sends the passed signal to the task +func (e *UniversalExecutor) Signal(s os.Signal) error { + if e.childCmd.Process == nil { + return fmt.Errorf("Task not yet run") + } + + e.logger.Debug("sending signal to PID", "signal", s, "pid", e.childCmd.Process.Pid) + err := e.childCmd.Process.Signal(s) + if err != nil { + e.logger.Error("sending signal failed", "signal", s, "error", err) + return err + } + + return nil +} + +func (e *UniversalExecutor) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) { + ch := make(chan *cstructs.TaskResourceUsage) + go e.handleStats(ch, ctx, interval) + return ch, nil +} + +func (e *UniversalExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx context.Context, interval time.Duration) { + defer close(ch) + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + timer.Reset(interval) + } + + pidStats, err := e.pidCollector.pidStats() + if err != nil { + e.logger.Warn("error collecting stats", "error", err) + return + } + + select { + case <-ctx.Done(): + return + case ch <- aggregatedResourceUsage(e.systemCpuStats, pidStats): + } + } +} + +// lookupBin looks for path to the binary to run by looking for the binary in +// the following locations, in-order: +// task/local/, task/, on the host file system, in host $PATH +// The return path is absolute. +func lookupBin(taskDir string, bin string) (string, error) { + // Check in the local directory + local := filepath.Join(taskDir, allocdir.TaskLocal, bin) + if _, err := os.Stat(local); err == nil { + return local, nil + } + + // Check at the root of the task's directory + root := filepath.Join(taskDir, bin) + if _, err := os.Stat(root); err == nil { + return root, nil + } + + // when checking host paths, check with Stat first if path is absolute + // as exec.LookPath only considers files already marked as executable + // and only consider this for absolute paths to avoid depending on + // current directory of nomad which may cause unexpected behavior + if _, err := os.Stat(bin); err == nil && filepath.IsAbs(bin) { + return bin, nil + } + + // Check the $PATH + if host, err := exec.LookPath(bin); err == nil { + return host, nil + } + + return "", fmt.Errorf("binary %q could not be found", bin) +} + +// makeExecutable makes the given file executable for root,group,others. +func makeExecutable(binPath string) error { + if runtime.GOOS == "windows" { + return nil + } + + fi, err := os.Stat(binPath) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("binary %q does not exist", binPath) + } + return fmt.Errorf("specified binary is invalid: %v", err) + } + + // If it is not executable, make it so. + perm := fi.Mode().Perm() + req := os.FileMode(0555) + if perm&req != req { + if err := os.Chmod(binPath, perm|req); err != nil { + return fmt.Errorf("error making %q executable: %s", binPath, err) + } + } + return nil +} + +// SupportedCaps returns a list of all supported capabilities in kernel. +func SupportedCaps(allowNetRaw bool) []string { + var allCaps []string + last := capability.CAP_LAST_CAP + // workaround for RHEL6 which has no /proc/sys/kernel/cap_last_cap + if last == capability.Cap(63) { + last = capability.CAP_BLOCK_SUSPEND + } + for _, cap := range capability.List() { + if cap > last { + continue + } + if !allowNetRaw && cap == capability.CAP_NET_RAW { + continue + } + allCaps = append(allCaps, fmt.Sprintf("CAP_%s", strings.ToUpper(cap.String()))) + } + return allCaps +} |