aboutsummaryrefslogtreecommitdiff
path: root/executor/grpc_server.go
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-11-28 17:15:12 +0100
committerAlex Auvolat <alex@adnab.me>2022-11-28 17:15:12 +0100
commitbf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2 (patch)
tree32f52eeb5d60ae33e8a40c2d8b26d70cac19a473 /executor/grpc_server.go
parent63e31b9ed97f34f4ea709f505c37f5e8968a0f36 (diff)
downloadnomad-driver-nix2-bf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2.tar.gz
nomad-driver-nix2-bf3165a7069fc6dcf9ae3a28be3af07fe8b4e1c2.zip
Vendor executor module so that we can patch it
Diffstat (limited to 'executor/grpc_server.go')
-rw-r--r--executor/grpc_server.go178
1 files changed, 178 insertions, 0 deletions
diff --git a/executor/grpc_server.go b/executor/grpc_server.go
new file mode 100644
index 0000000..231d650
--- /dev/null
+++ b/executor/grpc_server.go
@@ -0,0 +1,178 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "syscall"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/hashicorp/nomad/drivers/shared/executor/proto"
+ "github.com/hashicorp/nomad/nomad/structs"
+ "github.com/hashicorp/nomad/plugins/drivers"
+ sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto"
+)
+
+type grpcExecutorServer struct {
+ impl Executor
+}
+
+func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) {
+ ps, err := s.impl.Launch(&ExecCommand{
+ Cmd: req.Cmd,
+ Args: req.Args,
+ Resources: drivers.ResourcesFromProto(req.Resources),
+ StdoutPath: req.StdoutPath,
+ StderrPath: req.StderrPath,
+ Env: req.Env,
+ User: req.User,
+ TaskDir: req.TaskDir,
+ ResourceLimits: req.ResourceLimits,
+ BasicProcessCgroup: req.BasicProcessCgroup,
+ NoPivotRoot: req.NoPivotRoot,
+ Mounts: drivers.MountsFromProto(req.Mounts),
+ Devices: drivers.DevicesFromProto(req.Devices),
+ NetworkIsolation: drivers.NetworkIsolationSpecFromProto(req.NetworkIsolation),
+ ModePID: req.DefaultPidMode,
+ ModeIPC: req.DefaultIpcMode,
+ Capabilities: req.Capabilities,
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ process, err := processStateToProto(ps)
+ if err != nil {
+ return nil, err
+ }
+
+ return &proto.LaunchResponse{
+ Process: process,
+ }, nil
+}
+
+func (s *grpcExecutorServer) Wait(ctx context.Context, req *proto.WaitRequest) (*proto.WaitResponse, error) {
+ ps, err := s.impl.Wait(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ process, err := processStateToProto(ps)
+ if err != nil {
+ return nil, err
+ }
+
+ return &proto.WaitResponse{
+ Process: process,
+ }, nil
+}
+
+func (s *grpcExecutorServer) Shutdown(ctx context.Context, req *proto.ShutdownRequest) (*proto.ShutdownResponse, error) {
+ if err := s.impl.Shutdown(req.Signal, time.Duration(req.GracePeriod)); err != nil {
+ return nil, err
+ }
+
+ return &proto.ShutdownResponse{}, nil
+}
+
+func (s *grpcExecutorServer) UpdateResources(ctx context.Context, req *proto.UpdateResourcesRequest) (*proto.UpdateResourcesResponse, error) {
+ if err := s.impl.UpdateResources(drivers.ResourcesFromProto(req.Resources)); err != nil {
+ return nil, err
+ }
+
+ return &proto.UpdateResourcesResponse{}, nil
+}
+
+func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*proto.VersionResponse, error) {
+ v, err := s.impl.Version()
+ if err != nil {
+ return nil, err
+ }
+
+ return &proto.VersionResponse{
+ Version: v.Version,
+ }, nil
+}
+
+func (s *grpcExecutorServer) Stats(req *proto.StatsRequest, stream proto.Executor_StatsServer) error {
+ interval := time.Duration(req.Interval)
+ if interval == 0 {
+ interval = time.Second
+ }
+
+ outCh, err := s.impl.Stats(stream.Context(), interval)
+ if err != nil {
+ if rec, ok := err.(structs.Recoverable); ok {
+ st := status.New(codes.FailedPrecondition, rec.Error())
+ st, err := st.WithDetails(&sproto.RecoverableError{Recoverable: rec.IsRecoverable()})
+ if err != nil {
+ // If this error, it will always error
+ panic(err)
+ }
+ return st.Err()
+ }
+ return err
+ }
+
+ for resp := range outCh {
+ pbStats, err := drivers.TaskStatsToProto(resp)
+ if err != nil {
+ return err
+ }
+
+ presp := &proto.StatsResponse{
+ Stats: pbStats,
+ }
+
+ // Send the stats
+ if err := stream.Send(presp); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) {
+ sig := syscall.Signal(req.Signal)
+ if err := s.impl.Signal(sig); err != nil {
+ return nil, err
+ }
+ return &proto.SignalResponse{}, nil
+}
+
+func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (*proto.ExecResponse, error) {
+ deadline, err := ptypes.Timestamp(req.Deadline)
+ if err != nil {
+ return nil, err
+ }
+
+ out, exit, err := s.impl.Exec(deadline, req.Cmd, req.Args)
+ if err != nil {
+ return nil, err
+ }
+
+ return &proto.ExecResponse{
+ Output: out,
+ ExitCode: int32(exit),
+ }, nil
+}
+
+func (s *grpcExecutorServer) ExecStreaming(server proto.Executor_ExecStreamingServer) error {
+ msg, err := server.Recv()
+ if err != nil {
+ return fmt.Errorf("failed to receive initial message: %v", err)
+ }
+
+ if msg.Setup == nil {
+ return fmt.Errorf("first message should always be setup")
+ }
+
+ return s.impl.ExecStreaming(server.Context(),
+ msg.Setup.Command, msg.Setup.Tty,
+ server)
+}