diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-01-31 15:39:31 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-03 18:54:51 +0100 |
commit | 10bbb26b303e7bd58ca3396009a66b70a1673c0f (patch) | |
tree | 82b718338b1f6e2693d1345ca872bc2fbb652dcb /src/garage/cli_v2/worker.rs | |
parent | 89ff9f5576f91dc127ba3cc1fae96543e27b9468 (diff) | |
download | garage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.tar.gz garage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.zip |
cli_v2: implement ListWorkers and GetWorkerInfo
Diffstat (limited to 'src/garage/cli_v2/worker.rs')
-rw-r--r-- | src/garage/cli_v2/worker.rs | 145 |
1 files changed, 139 insertions, 6 deletions
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs index 0dfe3e96..9db729ec 100644 --- a/src/garage/cli_v2/worker.rs +++ b/src/garage/cli_v2/worker.rs @@ -11,6 +11,8 @@ use crate::cli_v2::*; impl Cli { pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { match cmd { + WorkerOperation::List { opt } => self.cmd_list_workers(opt).await, + WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await, WorkerOperation::Get { all_nodes, variable, @@ -20,14 +22,136 @@ impl Cli { variable, value, } => self.cmd_set_var(all_nodes, variable, value).await, - wo => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), + } + } + + pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> { + let mut list = self + .api_request(ListWorkersRequest { + node: hex::encode(self.rpc_host), + body: LocalListWorkersRequest { + busy_only: opt.busy, + error_only: opt.errors, + }, + }) + .await? + .into_single_response()? + .0; + + list.sort_by_key(|info| { + ( + match info.state { + WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0, + WorkerStateResp::Idle => 1, + WorkerStateResp::Done => 2, + }, + info.id, ) - .await - .ok_or_message("cli_v1"), + }); + + let mut table = + vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; + let tf = timeago::Formatter::new(); + for info in list.iter() { + let err_ago = info + .last_error + .as_ref() + .map(|x| tf.convert(Duration::from_secs(x.secs_ago))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + info.id, + format_worker_state(&info.state), + info.name, + info.tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.progress.as_deref().unwrap_or("-"), + info.queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); } + format_table(table); + + Ok(()) + } + + pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> { + let info = self + .api_request(GetWorkerInfoRequest { + node: hex::encode(self.rpc_host), + body: LocalGetWorkerInfoRequest { id: tid as u64 }, + }) + .await? + .into_single_response()? + .0; + + let mut table = vec![]; + table.push(format!("Task id:\t{}", info.id)); + table.push(format!("Worker name:\t{}", info.name)); + match &info.state { + WorkerStateResp::Throttled { duration_secs } => { + table.push(format!( + "Worker state:\tBusy (throttled, paused for {:.3}s)", + duration_secs + )); + } + s => { + table.push(format!("Worker state:\t{}", format_worker_state(s))); + } + }; + if let Some(tql) = info.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some(err) = info.last_error { + table.push(format!("Last error:\t{}", err.message)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_secs(err.secs_ago)) + )); + } + + table.push("".into()); + if let Some(p) = info.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); + } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); + } + } + format_table(table); + + Ok(()) } pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> { @@ -87,3 +211,12 @@ impl Cli { Ok(()) } } + +fn format_worker_state(s: &WorkerStateResp) -> &'static str { + match s { + WorkerStateResp::Busy => "Busy", + WorkerStateResp::Throttled { .. } => "Busy*", + WorkerStateResp::Idle => "Idle", + WorkerStateResp::Done => "Done", + } +} |