aboutsummaryrefslogtreecommitdiff
path: root/executor/pid_collector.go
diff options
context:
space:
mode:
Diffstat (limited to 'executor/pid_collector.go')
-rw-r--r--executor/pid_collector.go211
1 files changed, 211 insertions, 0 deletions
diff --git a/executor/pid_collector.go b/executor/pid_collector.go
new file mode 100644
index 0000000..2413f8e
--- /dev/null
+++ b/executor/pid_collector.go
@@ -0,0 +1,211 @@
+package executor
+
+import (
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ hclog "github.com/hashicorp/go-hclog"
+ "github.com/hashicorp/nomad/client/lib/resources"
+ "github.com/hashicorp/nomad/client/stats"
+ "github.com/hashicorp/nomad/plugins/drivers"
+ ps "github.com/mitchellh/go-ps"
+ "github.com/shirou/gopsutil/v3/process"
+)
+
+var (
+ // pidScanInterval is the interval at which the executor scans the process
+ // tree for finding out the pids that the executor and it's child processes
+ // have forked
+ pidScanInterval = 5 * time.Second
+)
+
+// pidCollector is a utility that can be embedded in an executor to collect pid
+// stats
+type pidCollector struct {
+ pids map[int]*resources.PID
+ pidLock sync.RWMutex
+ logger hclog.Logger
+}
+
+// allPidGetter is a func which is used by the pid collector to gather
+// stats on
+type allPidGetter func() (resources.PIDs, error)
+
+func newPidCollector(logger hclog.Logger) *pidCollector {
+ return &pidCollector{
+ pids: make(map[int]*resources.PID),
+ logger: logger.Named("pid_collector"),
+ }
+}
+
+// collectPids collects the pids of the child processes that the executor is
+// running every 5 seconds
+func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGetter) {
+ // Fire the timer right away when the executor starts from there on the pids
+ // are collected every scan interval
+ timer := time.NewTimer(0)
+ defer timer.Stop()
+ for {
+ select {
+ case <-timer.C:
+ pids, err := pidGetter()
+ if err != nil {
+ c.logger.Debug("error collecting pids", "error", err)
+ }
+ c.pidLock.Lock()
+
+ // Adding pids which are not being tracked
+ for pid, np := range pids {
+ if _, ok := c.pids[pid]; !ok {
+ c.pids[pid] = np
+ }
+ }
+ // Removing pids which are no longer present
+ for pid := range c.pids {
+ if _, ok := pids[pid]; !ok {
+ delete(c.pids, pid)
+ }
+ }
+ c.pidLock.Unlock()
+ timer.Reset(pidScanInterval)
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+// scanPids scans all the pids on the machine running the current executor and
+// returns the child processes of the executor.
+func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) {
+ processFamily := make(map[int]struct{})
+ processFamily[parentPid] = struct{}{}
+
+ // A mapping of pids to their parent pids. It is used to build the process
+ // tree of the executing task
+ pidsRemaining := make(map[int]int, len(allPids))
+ for _, pid := range allPids {
+ pidsRemaining[pid.Pid()] = pid.PPid()
+ }
+
+ for {
+ // flag to indicate if we have found a match
+ foundNewPid := false
+
+ for pid, ppid := range pidsRemaining {
+ _, childPid := processFamily[ppid]
+
+ // checking if the pid is a child of any of the parents
+ if childPid {
+ processFamily[pid] = struct{}{}
+ delete(pidsRemaining, pid)
+ foundNewPid = true
+ }
+ }
+
+ // not scanning anymore if we couldn't find a single match
+ if !foundNewPid {
+ break
+ }
+ }
+
+ res := make(map[int]*resources.PID)
+ for pid := range processFamily {
+ res[pid] = &resources.PID{
+ PID: pid,
+ StatsTotalCPU: stats.NewCpuStats(),
+ StatsUserCPU: stats.NewCpuStats(),
+ StatsSysCPU: stats.NewCpuStats(),
+ }
+ }
+ return res, nil
+}
+
+// pidStats returns the resource usage stats per pid
+func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) {
+ stats := make(map[string]*drivers.ResourceUsage)
+ c.pidLock.RLock()
+ pids := make(map[int]*resources.PID, len(c.pids))
+ for k, v := range c.pids {
+ pids[k] = v
+ }
+ c.pidLock.RUnlock()
+ for pid, np := range pids {
+ p, err := process.NewProcess(int32(pid))
+ if err != nil {
+ c.logger.Trace("unable to create new process", "pid", pid, "error", err)
+ continue
+ }
+ ms := &drivers.MemoryStats{}
+ if memInfo, err := p.MemoryInfo(); err == nil {
+ ms.RSS = memInfo.RSS
+ ms.Swap = memInfo.Swap
+ ms.Measured = ExecutorBasicMeasuredMemStats
+ }
+
+ cs := &drivers.CpuStats{}
+ if cpuStats, err := p.Times(); err == nil {
+ cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second))
+ cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second))
+ cs.Measured = ExecutorBasicMeasuredCpuStats
+
+ // calculate cpu usage percent
+ cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second))
+ }
+ stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs}
+ }
+
+ return stats, nil
+}
+
+// aggregatedResourceUsage aggregates the resource usage of all the pids and
+// returns a TaskResourceUsage data point
+func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string]*drivers.ResourceUsage) *drivers.TaskResourceUsage {
+ ts := time.Now().UTC().UnixNano()
+ var (
+ systemModeCPU, userModeCPU, percent float64
+ totalRSS, totalSwap uint64
+ )
+
+ for _, pidStat := range pidStats {
+ systemModeCPU += pidStat.CpuStats.SystemMode
+ userModeCPU += pidStat.CpuStats.UserMode
+ percent += pidStat.CpuStats.Percent
+
+ totalRSS += pidStat.MemoryStats.RSS
+ totalSwap += pidStat.MemoryStats.Swap
+ }
+
+ totalCPU := &drivers.CpuStats{
+ SystemMode: systemModeCPU,
+ UserMode: userModeCPU,
+ Percent: percent,
+ Measured: ExecutorBasicMeasuredCpuStats,
+ TotalTicks: systemCpuStats.TicksConsumed(percent),
+ }
+
+ totalMemory := &drivers.MemoryStats{
+ RSS: totalRSS,
+ Swap: totalSwap,
+ Measured: ExecutorBasicMeasuredMemStats,
+ }
+
+ resourceUsage := drivers.ResourceUsage{
+ MemoryStats: totalMemory,
+ CpuStats: totalCPU,
+ }
+ return &drivers.TaskResourceUsage{
+ ResourceUsage: &resourceUsage,
+ Timestamp: ts,
+ Pids: pidStats,
+ }
+}
+
+func getAllPidsByScanning() (resources.PIDs, error) {
+ allProcesses, err := ps.Processes()
+ if err != nil {
+ return nil, err
+ }
+ return scanPids(os.Getpid(), allProcesses)
+}