diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-01-31 15:39:31 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-03 18:54:51 +0100 |
commit | 10bbb26b303e7bd58ca3396009a66b70a1673c0f (patch) | |
tree | 82b718338b1f6e2693d1345ca872bc2fbb652dcb /src/garage | |
parent | 89ff9f5576f91dc127ba3cc1fae96543e27b9468 (diff) | |
download | garage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.tar.gz garage-10bbb26b303e7bd58ca3396009a66b70a1673c0f.zip |
cli_v2: implement ListWorkers and GetWorkerInfo
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin/mod.rs | 30 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 6 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 117 | ||||
-rw-r--r-- | src/garage/cli_v2/worker.rs | 145 | ||||
-rw-r--r-- | src/garage/server.rs | 3 |
5 files changed, 141 insertions, 160 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index f493d0c5..c0e63524 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -22,7 +22,7 @@ use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; +use garage_model::helper::error::Error; use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; @@ -40,17 +40,11 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; pub enum AdminRpc { LaunchRepair(RepairOpt), Stats(StatsOpt), - Worker(WorkerOperation), BlockOperation(BlockOperation), MetaOperation(MetaOperation), // Replies Ok(String), - WorkerList( - HashMap<usize, garage_util::background::WorkerInfo>, - WorkerListOpt, - ), - WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec<BlockResyncErrorInfo>), BlockInfo { hash: Hash, @@ -340,27 +334,6 @@ impl AdminRpcHandler { )) } - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - _ => unreachable!(), - } - } - // ================ META DB COMMANDS ==================== async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> { @@ -409,7 +382,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { match message { AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, m => Err(GarageError::unexpected_rpc_message(m).into()), diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 6f1b0681..bc34d014 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -17,12 +17,6 @@ pub async fn cmd_admin( AdminRpc::Ok(msg) => { println!("{}", msg); } - AdminRpc::WorkerList(wi, wlo) => { - print_worker_list(wi, wlo); - } - AdminRpc::WorkerInfo(tid, wi) => { - print_worker_info(tid, wi); - } AdminRpc::BlockErrorList(el) => { print_block_error_list(el); } diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 8261fb3e..43b28623 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,8 +1,6 @@ -use std::collections::HashMap; use std::time::Duration; use format_table::format_table; -use garage_util::background::*; use garage_util::data::*; use garage_util::time::*; @@ -11,121 +9,6 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::*; -use crate::cli::structs::WorkerListOpt; - -pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { - let mut wi = wi.into_iter().collect::<Vec<_>>(); - wi.sort_by_key(|(tid, info)| { - ( - match info.state { - WorkerState::Busy | WorkerState::Throttled(_) => 0, - WorkerState::Idle => 1, - WorkerState::Done => 2, - }, - *tid, - ) - }); - - let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; - for (tid, info) in wi.iter() { - if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { - continue; - } - if wlo.errors && info.errors == 0 { - continue; - } - - let tf = timeago::Formatter::new(); - let err_ago = info - .last_error - .as_ref() - .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) - .unwrap_or_default(); - let (total_err, consec_err) = if info.errors > 0 { - (info.errors.to_string(), info.consecutive_errors.to_string()) - } else { - ("-".into(), "-".into()) - }; - - table.push(format!( - "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", - tid, - info.state, - info.name, - info.status - .tranquility - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - info.status.progress.as_deref().unwrap_or("-"), - info.status - .queue_length - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - total_err, - consec_err, - err_ago, - )); - } - format_table(table); -} - -pub fn print_worker_info(tid: usize, info: WorkerInfo) { - let mut table = vec![]; - table.push(format!("Task id:\t{}", tid)); - table.push(format!("Worker name:\t{}", info.name)); - match info.state { - WorkerState::Throttled(t) => { - table.push(format!( - "Worker state:\tBusy (throttled, paused for {:.3}s)", - t - )); - } - s => { - table.push(format!("Worker state:\t{}", s)); - } - }; - if let Some(tql) = info.status.tranquility { - table.push(format!("Tranquility:\t{}", tql)); - } - - table.push("".into()); - table.push(format!("Total errors:\t{}", info.errors)); - table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); - if let Some((s, t)) = info.last_error { - table.push(format!("Last error:\t{}", s)); - let tf = timeago::Formatter::new(); - table.push(format!( - "Last error time:\t{}", - tf.convert(Duration::from_millis(now_msec() - t)) - )); - } - - table.push("".into()); - if let Some(p) = info.status.progress { - table.push(format!("Progress:\t{}", p)); - } - if let Some(ql) = info.status.queue_length { - table.push(format!("Queue length:\t{}", ql)); - } - if let Some(pe) = info.status.persistent_errors { - table.push(format!("Persistent errors:\t{}", pe)); - } - - for (i, s) in info.status.freeform.iter().enumerate() { - if i == 0 { - if table.last() != Some(&"".into()) { - table.push("".into()); - } - table.push(format!("Message:\t{}", s)); - } else { - table.push(format!("\t{}", s)); - } - } - format_table(table); -} - pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs index 0dfe3e96..9db729ec 100644 --- a/src/garage/cli_v2/worker.rs +++ b/src/garage/cli_v2/worker.rs @@ -11,6 +11,8 @@ use crate::cli_v2::*; impl Cli { pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { match cmd { + WorkerOperation::List { opt } => self.cmd_list_workers(opt).await, + WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await, WorkerOperation::Get { all_nodes, variable, @@ -20,14 +22,136 @@ impl Cli { variable, value, } => self.cmd_set_var(all_nodes, variable, value).await, - wo => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), + } + } + + pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> { + let mut list = self + .api_request(ListWorkersRequest { + node: hex::encode(self.rpc_host), + body: LocalListWorkersRequest { + busy_only: opt.busy, + error_only: opt.errors, + }, + }) + .await? + .into_single_response()? + .0; + + list.sort_by_key(|info| { + ( + match info.state { + WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0, + WorkerStateResp::Idle => 1, + WorkerStateResp::Done => 2, + }, + info.id, ) - .await - .ok_or_message("cli_v1"), + }); + + let mut table = + vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; + let tf = timeago::Formatter::new(); + for info in list.iter() { + let err_ago = info + .last_error + .as_ref() + .map(|x| tf.convert(Duration::from_secs(x.secs_ago))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + info.id, + format_worker_state(&info.state), + info.name, + info.tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.progress.as_deref().unwrap_or("-"), + info.queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); } + format_table(table); + + Ok(()) + } + + pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> { + let info = self + .api_request(GetWorkerInfoRequest { + node: hex::encode(self.rpc_host), + body: LocalGetWorkerInfoRequest { id: tid as u64 }, + }) + .await? + .into_single_response()? + .0; + + let mut table = vec![]; + table.push(format!("Task id:\t{}", info.id)); + table.push(format!("Worker name:\t{}", info.name)); + match &info.state { + WorkerStateResp::Throttled { duration_secs } => { + table.push(format!( + "Worker state:\tBusy (throttled, paused for {:.3}s)", + duration_secs + )); + } + s => { + table.push(format!("Worker state:\t{}", format_worker_state(s))); + } + }; + if let Some(tql) = info.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some(err) = info.last_error { + table.push(format!("Last error:\t{}", err.message)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_secs(err.secs_ago)) + )); + } + + table.push("".into()); + if let Some(p) = info.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); + } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); + } + } + format_table(table); + + Ok(()) } pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> { @@ -87,3 +211,12 @@ impl Cli { Ok(()) } } + +fn format_worker_state(s: &WorkerStateResp) -> &'static str { + match s { + WorkerStateResp::Busy => "Busy", + WorkerStateResp::Throttled { .. } => "Busy*", + WorkerStateResp::Idle => "Idle", + WorkerStateResp::Done => "Done", + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs index f17f641b..e629041c 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::sync::Arc; use tokio::sync::watch; @@ -65,7 +64,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er } info!("Initialize Admin API server and metrics collector..."); - let admin_server: Arc<AdminApiServer> = AdminApiServer::new( + let admin_server = AdminApiServer::new( garage.clone(), background.clone(), #[cfg(feature = "metrics")] |