aboutsummaryrefslogblamecommitdiff
path: root/exec2/driver.go
blob: f035d1568de02af2875297e2d119a418e5ad6981 (plain) (tree)
1
2
3
4
5
6
7
8
9
            


                 

             
                       

                 


                                                      


                                                                

                                                            


                                                              

                                                    
                                                          
                                                           
                                                                    



                                               
                           
 
                                                                                                

                                            

                                                                                 



                             














                                                                                                                


                                                                   
                                           


                                              
                                                                               
                                                                














                                                                             


                  

                                                                                   
                                                                    





                                                                               

          


                                                                               
                                  






                                                              


         


                                                                      



                                                                                  

                                                                      
 
                                                      

                                            
                                                                            





                                                                                 

                                             




                                                                             

 




















































































                                                                                                                                                           
 




















                                                                                   


         






















                                                                              


                              
                                                        


                              

                                                        





                                                                                     

                                                 
         
                         
 
                                                 

                                                      


                  
                                                            


                                  



                                                                      

 
                                                                                        


                                             
 

                                                                                         
                       







                                    





                                                       








                                                                                  
                                   
                                                                    



                                                              




                                                                        

         









                                                                         
 




                                                               

         

                                                                      


                 




















































                                                                                                                           








                                                                                      



                                                                                       



                                                                                    
                                                                         
                                                   


                                           

         


                                                                                                  



                                                                                 




















                                                                                                            
                                         















                                                                                                        


















                                                                                              
                                                                                                   





                                                                   


                                                                                                







                                                                                  
                                                                                                   






                                                   
 


                      
                                                                                                   

                                      











                                                                                       





                            


         
                                                                                      




                                              









                                                                      



















                                                                                                                                             





                                              
                                                                

         

                                                                   
                                                                                       




                                          


                                                                               



                              
                                                                          







                                                   
                                                                                                                                   




                                                   


                                               
                                                                                     


                                        
                                                                 




                                              









                                                                                                                                        











































                                                                                                                
 
package exec

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"runtime"
	"sync"
	"time"

	"github.com/hashicorp/consul-template/signals"
	hclog "github.com/hashicorp/go-hclog"
	"github.com/hashicorp/nomad/client/lib/cgutil"
	"github.com/hashicorp/nomad/drivers/shared/capabilities"
	"github.com/hashicorp/nomad/drivers/shared/eventer"
	"github.com/hashicorp/nomad/drivers/shared/executor"
	"github.com/hashicorp/nomad/drivers/shared/resolvconf"
	"github.com/hashicorp/nomad/helper/pluginutils/loader"
	"github.com/hashicorp/nomad/helper/pointer"
	"github.com/hashicorp/nomad/plugins/base"
	"github.com/hashicorp/nomad/plugins/drivers"
	"github.com/hashicorp/nomad/plugins/drivers/utils"
	"github.com/hashicorp/nomad/plugins/shared/hclspec"
	pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)

const (
	// pluginName is the name of the plugin
	pluginName = "exec"

	// fingerprintPeriod is the interval at which the driver will send fingerprint responses
	fingerprintPeriod = 30 * time.Second

	// taskHandleVersion is the version of task handle which this driver sets
	// and understands how to decode driver state
	taskHandleVersion = 1
)

var (
	// PluginID is the exec plugin metadata registered in the plugin
	// catalog.
	PluginID = loader.PluginID{
		Name:       pluginName,
		PluginType: base.PluginTypeDriver,
	}

	// PluginConfig is the exec driver factory function registered in the
	// plugin catalog.
	PluginConfig = &loader.InternalPluginConfig{
		Config:  map[string]interface{}{},
		Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
	}

	// pluginInfo is the response returned for the PluginInfo RPC
	pluginInfo = &base.PluginInfoResponse{
		Type:              base.PluginTypeDriver,
		PluginApiVersions: []string{drivers.ApiVersion010},
		PluginVersion:     "0.1.0",
		Name:              pluginName,
	}

	// configSpec is the hcl specification returned by the ConfigSchema RPC
	configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
		"no_pivot_root": hclspec.NewDefault(
			hclspec.NewAttr("no_pivot_root", "bool", false),
			hclspec.NewLiteral("false"),
		),
		"default_pid_mode": hclspec.NewDefault(
			hclspec.NewAttr("default_pid_mode", "string", false),
			hclspec.NewLiteral(`"private"`),
		),
		"default_ipc_mode": hclspec.NewDefault(
			hclspec.NewAttr("default_ipc_mode", "string", false),
			hclspec.NewLiteral(`"private"`),
		),
		"allow_caps": hclspec.NewDefault(
			hclspec.NewAttr("allow_caps", "list(string)", false),
			hclspec.NewLiteral(capabilities.HCLSpecLiteral),
		),
	})

	// taskConfigSpec is the hcl specification for the driver config section of
	// a task within a job. It is returned in the TaskConfigSchema RPC
	taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
		"command":  hclspec.NewAttr("command", "string", true),
		"args":     hclspec.NewAttr("args", "list(string)", false),
		"pid_mode": hclspec.NewAttr("pid_mode", "string", false),
		"ipc_mode": hclspec.NewAttr("ipc_mode", "string", false),
		"cap_add":  hclspec.NewAttr("cap_add", "list(string)", false),
		"cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false),
	})

	// driverCapabilities represents the RPC response for what features are
	// implemented by the exec task driver
	driverCapabilities = &drivers.Capabilities{
		SendSignals: true,
		Exec:        true,
		FSIsolation: drivers.FSIsolationChroot,
		NetIsolationModes: []drivers.NetIsolationMode{
			drivers.NetIsolationModeHost,
			drivers.NetIsolationModeGroup,
		},
		MountConfigs: drivers.MountConfigSupportAll,
	}
)

// Driver fork/execs tasks using many of the underlying OS's isolation
// features where configured.
type Driver struct {
	// eventer is used to handle multiplexing of TaskEvents calls such that an
	// event can be broadcast to all callers
	eventer *eventer.Eventer

	// config is the driver configuration set by the SetConfig RPC
	config Config

	// nomadConfig is the client config from nomad
	nomadConfig *base.ClientDriverConfig

	// tasks is the in memory datastore mapping taskIDs to driverHandles
	tasks *taskStore

	// ctx is the context for the driver. It is passed to other subsystems to
	// coordinate shutdown
	ctx context.Context

	// logger will log to the Nomad agent
	logger hclog.Logger

	// A tri-state boolean to know if the fingerprinting has happened and
	// whether it has been successful
	fingerprintSuccess *bool
	fingerprintLock    sync.Mutex
}

// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
	// NoPivotRoot disables the use of pivot_root, useful when the root partition
	// is on ramdisk
	NoPivotRoot bool `codec:"no_pivot_root"`

	// DefaultModePID is the default PID isolation set for all tasks using
	// exec-based task drivers.
	DefaultModePID string `codec:"default_pid_mode"`

	// DefaultModeIPC is the default IPC isolation set for all tasks using
	// exec-based task drivers.
	DefaultModeIPC string `codec:"default_ipc_mode"`

	// AllowCaps configures which Linux Capabilities are enabled for tasks
	// running on this node.
	AllowCaps []string `codec:"allow_caps"`
}

func (c *Config) validate() error {
	switch c.DefaultModePID {
	case executor.IsolationModePrivate, executor.IsolationModeHost:
	default:
		return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID)
	}

	switch c.DefaultModeIPC {
	case executor.IsolationModePrivate, executor.IsolationModeHost:
	default:
		return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC)
	}

	badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps))
	if !badCaps.Empty() {
		return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps)
	}

	return nil
}

// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
	// Command is the thing to exec.
	Command string `codec:"command"`

	// Args are passed along to Command.
	Args []string `codec:"args"`

	// ModePID indicates whether PID namespace isolation is enabled for the task.
	// Must be "private" or "host" if set.
	ModePID string `codec:"pid_mode"`

	// ModeIPC indicates whether IPC namespace isolation is enabled for the task.
	// Must be "private" or "host" if set.
	ModeIPC string `codec:"ipc_mode"`

	// CapAdd is a set of linux capabilities to enable.
	CapAdd []string `codec:"cap_add"`

	// CapDrop is a set of linux capabilities to disable.
	CapDrop []string `codec:"cap_drop"`
}

func (tc *TaskConfig) validate() error {
	switch tc.ModePID {
	case "", executor.IsolationModePrivate, executor.IsolationModeHost:
	default:
		return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID)
	}

	switch tc.ModeIPC {
	case "", executor.IsolationModePrivate, executor.IsolationModeHost:
	default:
		return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC)
	}

	supported := capabilities.Supported()
	badAdds := supported.Difference(capabilities.New(tc.CapAdd))
	if !badAdds.Empty() {
		return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds)
	}
	badDrops := supported.Difference(capabilities.New(tc.CapDrop))
	if !badDrops.Empty() {
		return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops)
	}

	return nil
}

// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
	ReattachConfig *pstructs.ReattachConfig
	TaskConfig     *drivers.TaskConfig
	Pid            int
	StartedAt      time.Time
}

// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
	logger = logger.Named(pluginName)
	return &Driver{
		eventer: eventer.NewEventer(ctx, logger),
		tasks:   newTaskStore(),
		ctx:     ctx,
		logger:  logger,
	}
}

// setFingerprintSuccess marks the driver as having fingerprinted successfully
func (d *Driver) setFingerprintSuccess() {
	d.fingerprintLock.Lock()
	d.fingerprintSuccess = pointer.Of(true)
	d.fingerprintLock.Unlock()
}

// setFingerprintFailure marks the driver as having failed fingerprinting
func (d *Driver) setFingerprintFailure() {
	d.fingerprintLock.Lock()
	d.fingerprintSuccess = pointer.Of(false)
	d.fingerprintLock.Unlock()
}

// fingerprintSuccessful returns true if the driver has
// never fingerprinted or has successfully fingerprinted
func (d *Driver) fingerprintSuccessful() bool {
	d.fingerprintLock.Lock()
	defer d.fingerprintLock.Unlock()
	return d.fingerprintSuccess == nil || *d.fingerprintSuccess
}

func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
	return pluginInfo, nil
}

func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
	return configSpec, nil
}

func (d *Driver) SetConfig(cfg *base.Config) error {
	// unpack, validate, and set agent plugin config
	var config Config
	if len(cfg.PluginConfig) != 0 {
		if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
			return err
		}
	}
	if err := config.validate(); err != nil {
		return err
	}
	d.config = config

	if cfg != nil && cfg.AgentConfig != nil {
		d.nomadConfig = cfg.AgentConfig.Driver
	}
	return nil
}

func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
	return taskConfigSpec, nil
}

// Capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
	return driverCapabilities, nil
}

func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
	ch := make(chan *drivers.Fingerprint)
	go d.handleFingerprint(ctx, ch)
	return ch, nil

}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
	defer close(ch)
	ticker := time.NewTimer(0)
	for {
		select {
		case <-ctx.Done():
			return
		case <-d.ctx.Done():
			return
		case <-ticker.C:
			ticker.Reset(fingerprintPeriod)
			ch <- d.buildFingerprint()
		}
	}
}

func (d *Driver) buildFingerprint() *drivers.Fingerprint {
	if runtime.GOOS != "linux" {
		d.setFingerprintFailure()
		return &drivers.Fingerprint{
			Health:            drivers.HealthStateUndetected,
			HealthDescription: "exec driver unsupported on client OS",
		}
	}

	fp := &drivers.Fingerprint{
		Attributes:        map[string]*pstructs.Attribute{},
		Health:            drivers.HealthStateHealthy,
		HealthDescription: drivers.DriverHealthy,
	}

	if !utils.IsUnixRoot() {
		fp.Health = drivers.HealthStateUndetected
		fp.HealthDescription = drivers.DriverRequiresRootMessage
		d.setFingerprintFailure()
		return fp
	}

	mount, err := cgutil.FindCgroupMountpointDir()
	if err != nil {
		fp.Health = drivers.HealthStateUnhealthy
		fp.HealthDescription = drivers.NoCgroupMountMessage
		if d.fingerprintSuccessful() {
			d.logger.Warn(fp.HealthDescription, "error", err)
		}
		d.setFingerprintFailure()
		return fp
	}

	if mount == "" {
		fp.Health = drivers.HealthStateUnhealthy
		fp.HealthDescription = drivers.CgroupMountEmpty
		d.setFingerprintFailure()
		return fp
	}

	fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true)
	d.setFingerprintSuccess()
	return fp
}

func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
	if handle == nil {
		return fmt.Errorf("handle cannot be nil")
	}

	// If already attached to handle there's nothing to recover.
	if _, ok := d.tasks.Get(handle.Config.ID); ok {
		d.logger.Trace("nothing to recover; task already exists",
			"task_id", handle.Config.ID,
			"task_name", handle.Config.Name,
		)
		return nil
	}

	// Handle doesn't already exist, try to reattach
	var taskState TaskState
	if err := handle.GetDriverState(&taskState); err != nil {
		d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
		return fmt.Errorf("failed to decode task state from handle: %v", err)
	}

	// Create client for reattached executor
	plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
	if err != nil {
		d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
		return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
	}

	exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
		d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
	if err != nil {
		d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
		return fmt.Errorf("failed to reattach to executor: %v", err)
	}

	h := &taskHandle{
		exec:         exec,
		pid:          taskState.Pid,
		pluginClient: pluginClient,
		taskConfig:   taskState.TaskConfig,
		procState:    drivers.TaskStateRunning,
		startedAt:    taskState.StartedAt,
		exitResult:   &drivers.ExitResult{},
		logger:       d.logger,
	}

	d.tasks.Set(taskState.TaskConfig.ID, h)

	go h.run()
	return nil
}

func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
	if _, ok := d.tasks.Get(cfg.ID); ok {
		return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
	}

	var driverConfig TaskConfig
	if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
		return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
	}

	if err := driverConfig.validate(); err != nil {
		return nil, nil, fmt.Errorf("failed driver config validation: %v", err)
	}

	d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
	handle := drivers.NewTaskHandle(taskHandleVersion)
	handle.Config = cfg

	pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
	executorConfig := &executor.ExecutorConfig{
		LogFile:     pluginLogFile,
		LogLevel:    "debug",
		FSIsolation: true,
	}

	exec, pluginClient, err := executor.CreateExecutor(
		d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
		d.nomadConfig, executorConfig)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create executor: %v", err)
	}

	user := cfg.User
	if user == "" {
		user = "nobody"
	}

	if cfg.DNS != nil {
		dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS)
		if err != nil {
			return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err)
		}
		cfg.Mounts = append(cfg.Mounts, dnsMount)
	}

	caps, err := capabilities.Calculate(
		capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop,
	)
	if err != nil {
		return nil, nil, err
	}
	d.logger.Debug("task capabilities", "capabilities", caps)

	execCmd := &executor.ExecCommand{
		Cmd:              driverConfig.Command,
		Args:             driverConfig.Args,
		Env:              cfg.EnvList(),
		User:             user,
		ResourceLimits:   true,
		NoPivotRoot:      d.config.NoPivotRoot,
		Resources:        cfg.Resources,
		TaskDir:          cfg.TaskDir().Dir,
		StdoutPath:       cfg.StdoutPath,
		StderrPath:       cfg.StderrPath,
		Mounts:           cfg.Mounts,
		Devices:          cfg.Devices,
		NetworkIsolation: cfg.NetworkIsolation,
		ModePID:          executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID),
		ModeIPC:          executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC),
		Capabilities:     caps,
	}

	ps, err := exec.Launch(execCmd)
	if err != nil {
		pluginClient.Kill()
		return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
	}

	h := &taskHandle{
		exec:         exec,
		pid:          ps.Pid,
		pluginClient: pluginClient,
		taskConfig:   cfg,
		procState:    drivers.TaskStateRunning,
		startedAt:    time.Now().Round(time.Millisecond),
		logger:       d.logger,
	}

	driverState := TaskState{
		ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
		Pid:            ps.Pid,
		TaskConfig:     cfg,
		StartedAt:      h.startedAt,
	}

	if err := handle.SetDriverState(&driverState); err != nil {
		d.logger.Error("failed to start task, error setting driver state", "error", err)
		_ = exec.Shutdown("", 0)
		pluginClient.Kill()
		return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
	}

	d.tasks.Set(cfg.ID, h)
	go h.run()
	return handle, nil, nil
}

func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return nil, drivers.ErrTaskNotFound
	}

	ch := make(chan *drivers.ExitResult)
	go d.handleWait(ctx, handle, ch)

	return ch, nil
}

func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
	defer close(ch)
	var result *drivers.ExitResult
	ps, err := handle.exec.Wait(ctx)
	if err != nil {
		result = &drivers.ExitResult{
			Err: fmt.Errorf("executor: error waiting on process: %v", err),
		}
	} else {
		result = &drivers.ExitResult{
			ExitCode: ps.ExitCode,
			Signal:   ps.Signal,
		}
	}

	select {
	case <-ctx.Done():
		return
	case <-d.ctx.Done():
		return
	case ch <- result:
	}
}

func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return drivers.ErrTaskNotFound
	}

	if err := handle.exec.Shutdown(signal, timeout); err != nil {
		if handle.pluginClient.Exited() {
			return nil
		}
		return fmt.Errorf("executor Shutdown failed: %v", err)
	}

	return nil
}

// resetCgroup will re-create the v2 cgroup for the task after the task has been
// destroyed by libcontainer. In the case of a task restart we call DestroyTask
// which removes the cgroup - but we still need it!
//
// Ideally the cgroup management would be more unified - and we could do the creation
// on a task runner pre-start hook, eliminating the need for this hack.
func (d *Driver) resetCgroup(handle *taskHandle) {
	if cgutil.UseV2 {
		if handle.taskConfig.Resources != nil &&
			handle.taskConfig.Resources.LinuxResources != nil &&
			handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" {
			err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755)
			if err != nil {
				d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath)
			}
		}
	}
}

func (d *Driver) DestroyTask(taskID string, force bool) error {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return drivers.ErrTaskNotFound
	}

	if handle.IsRunning() && !force {
		return fmt.Errorf("cannot destroy running task")
	}

	if !handle.pluginClient.Exited() {
		if err := handle.exec.Shutdown("", 0); err != nil {
			handle.logger.Error("destroying executor failed", "error", err)
		}

		handle.pluginClient.Kill()
	}

	// workaround for the case where DestroyTask was issued on task restart
	d.resetCgroup(handle)

	d.tasks.Delete(taskID)
	return nil
}

func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return nil, drivers.ErrTaskNotFound
	}

	return handle.TaskStatus(), nil
}

func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return nil, drivers.ErrTaskNotFound
	}

	return handle.exec.Stats(ctx, interval)
}

func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
	return d.eventer.TaskEvents(ctx)
}

func (d *Driver) SignalTask(taskID string, signal string) error {
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return drivers.ErrTaskNotFound
	}

	sig := os.Interrupt
	if s, ok := signals.SignalLookup[signal]; ok {
		sig = s
	} else {
		d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID)

	}
	return handle.exec.Signal(sig)
}

func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
	if len(cmd) == 0 {
		return nil, fmt.Errorf("error cmd must have at least one value")
	}
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return nil, drivers.ErrTaskNotFound
	}

	args := []string{}
	if len(cmd) > 1 {
		args = cmd[1:]
	}

	out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args)
	if err != nil {
		return nil, err
	}

	return &drivers.ExecTaskResult{
		Stdout: out,
		ExitResult: &drivers.ExitResult{
			ExitCode: exitCode,
		},
	}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
	taskID string,
	command []string,
	tty bool,
	stream drivers.ExecTaskStream) error {

	if len(command) == 0 {
		return fmt.Errorf("error cmd must have at least one value")
	}
	handle, ok := d.tasks.Get(taskID)
	if !ok {
		return drivers.ErrTaskNotFound
	}

	return handle.exec.ExecStreaming(ctx, command, tty, stream)
}