diff options
Diffstat (limited to 'executor/pid_collector.go')
-rw-r--r-- | executor/pid_collector.go | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/executor/pid_collector.go b/executor/pid_collector.go new file mode 100644 index 0000000..2413f8e --- /dev/null +++ b/executor/pid_collector.go @@ -0,0 +1,211 @@ +package executor + +import ( + "os" + "strconv" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/stats" + "github.com/hashicorp/nomad/plugins/drivers" + ps "github.com/mitchellh/go-ps" + "github.com/shirou/gopsutil/v3/process" +) + +var ( + // pidScanInterval is the interval at which the executor scans the process + // tree for finding out the pids that the executor and it's child processes + // have forked + pidScanInterval = 5 * time.Second +) + +// pidCollector is a utility that can be embedded in an executor to collect pid +// stats +type pidCollector struct { + pids map[int]*resources.PID + pidLock sync.RWMutex + logger hclog.Logger +} + +// allPidGetter is a func which is used by the pid collector to gather +// stats on +type allPidGetter func() (resources.PIDs, error) + +func newPidCollector(logger hclog.Logger) *pidCollector { + return &pidCollector{ + pids: make(map[int]*resources.PID), + logger: logger.Named("pid_collector"), + } +} + +// collectPids collects the pids of the child processes that the executor is +// running every 5 seconds +func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGetter) { + // Fire the timer right away when the executor starts from there on the pids + // are collected every scan interval + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + pids, err := pidGetter() + if err != nil { + c.logger.Debug("error collecting pids", "error", err) + } + c.pidLock.Lock() + + // Adding pids which are not being tracked + for pid, np := range pids { + if _, ok := c.pids[pid]; !ok { + c.pids[pid] = np + } + } + // Removing pids which are no longer present + for pid := range c.pids { + if _, ok := pids[pid]; !ok { + delete(c.pids, pid) + } + } + c.pidLock.Unlock() + timer.Reset(pidScanInterval) + case <-stopCh: + return + } + } +} + +// scanPids scans all the pids on the machine running the current executor and +// returns the child processes of the executor. +func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) { + processFamily := make(map[int]struct{}) + processFamily[parentPid] = struct{}{} + + // A mapping of pids to their parent pids. It is used to build the process + // tree of the executing task + pidsRemaining := make(map[int]int, len(allPids)) + for _, pid := range allPids { + pidsRemaining[pid.Pid()] = pid.PPid() + } + + for { + // flag to indicate if we have found a match + foundNewPid := false + + for pid, ppid := range pidsRemaining { + _, childPid := processFamily[ppid] + + // checking if the pid is a child of any of the parents + if childPid { + processFamily[pid] = struct{}{} + delete(pidsRemaining, pid) + foundNewPid = true + } + } + + // not scanning anymore if we couldn't find a single match + if !foundNewPid { + break + } + } + + res := make(map[int]*resources.PID) + for pid := range processFamily { + res[pid] = &resources.PID{ + PID: pid, + StatsTotalCPU: stats.NewCpuStats(), + StatsUserCPU: stats.NewCpuStats(), + StatsSysCPU: stats.NewCpuStats(), + } + } + return res, nil +} + +// pidStats returns the resource usage stats per pid +func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) { + stats := make(map[string]*drivers.ResourceUsage) + c.pidLock.RLock() + pids := make(map[int]*resources.PID, len(c.pids)) + for k, v := range c.pids { + pids[k] = v + } + c.pidLock.RUnlock() + for pid, np := range pids { + p, err := process.NewProcess(int32(pid)) + if err != nil { + c.logger.Trace("unable to create new process", "pid", pid, "error", err) + continue + } + ms := &drivers.MemoryStats{} + if memInfo, err := p.MemoryInfo(); err == nil { + ms.RSS = memInfo.RSS + ms.Swap = memInfo.Swap + ms.Measured = ExecutorBasicMeasuredMemStats + } + + cs := &drivers.CpuStats{} + if cpuStats, err := p.Times(); err == nil { + cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second)) + cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second)) + cs.Measured = ExecutorBasicMeasuredCpuStats + + // calculate cpu usage percent + cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second)) + } + stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs} + } + + return stats, nil +} + +// aggregatedResourceUsage aggregates the resource usage of all the pids and +// returns a TaskResourceUsage data point +func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string]*drivers.ResourceUsage) *drivers.TaskResourceUsage { + ts := time.Now().UTC().UnixNano() + var ( + systemModeCPU, userModeCPU, percent float64 + totalRSS, totalSwap uint64 + ) + + for _, pidStat := range pidStats { + systemModeCPU += pidStat.CpuStats.SystemMode + userModeCPU += pidStat.CpuStats.UserMode + percent += pidStat.CpuStats.Percent + + totalRSS += pidStat.MemoryStats.RSS + totalSwap += pidStat.MemoryStats.Swap + } + + totalCPU := &drivers.CpuStats{ + SystemMode: systemModeCPU, + UserMode: userModeCPU, + Percent: percent, + Measured: ExecutorBasicMeasuredCpuStats, + TotalTicks: systemCpuStats.TicksConsumed(percent), + } + + totalMemory := &drivers.MemoryStats{ + RSS: totalRSS, + Swap: totalSwap, + Measured: ExecutorBasicMeasuredMemStats, + } + + resourceUsage := drivers.ResourceUsage{ + MemoryStats: totalMemory, + CpuStats: totalCPU, + } + return &drivers.TaskResourceUsage{ + ResourceUsage: &resourceUsage, + Timestamp: ts, + Pids: pidStats, + } +} + +func getAllPidsByScanning() (resources.PIDs, error) { + allProcesses, err := ps.Processes() + if err != nil { + return nil, err + } + return scanPids(os.Getpid(), allProcesses) +} |