aboutsummaryrefslogblamecommitdiff
path: root/src/garage/cli_v2/worker.rs
blob: 9c248a396f9492bb231431b2a7a2561f24c4fa47 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12











                                                                                   

                                                                                          








                                                                                




                                                                                       


                                                                    

                               









                                                                                                       
                         


































                                                                                                       
                 






                                                                              
                                                                                        
                               























































                                                                                                 


























































                                                                                              








                                                             
use format_table::format_table;

use garage_util::error::*;

use garage_api_admin::api::*;

use crate::cli::structs::*;
use crate::cli_v2::*;

impl Cli {
	pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
		match cmd {
			WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
			WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
			WorkerOperation::Get {
				all_nodes,
				variable,
			} => self.cmd_get_var(all_nodes, variable).await,
			WorkerOperation::Set {
				all_nodes,
				variable,
				value,
			} => self.cmd_set_var(all_nodes, variable, value).await,
		}
	}

	pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
		let mut list = self
			.local_api_request(LocalListWorkersRequest {
				busy_only: opt.busy,
				error_only: opt.errors,
			})
			.await?
			.0;

		list.sort_by_key(|info| {
			(
				match info.state {
					WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
					WorkerStateResp::Idle => 1,
					WorkerStateResp::Done => 2,
				},
				info.id,
			)
		});

		let mut table =
			vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
		let tf = timeago::Formatter::new();
		for info in list.iter() {
			let err_ago = info
				.last_error
				.as_ref()
				.map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
				.unwrap_or_default();
			let (total_err, consec_err) = if info.errors > 0 {
				(info.errors.to_string(), info.consecutive_errors.to_string())
			} else {
				("-".into(), "-".into())
			};

			table.push(format!(
				"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
				info.id,
				format_worker_state(&info.state),
				info.name,
				info.tranquility
					.as_ref()
					.map(ToString::to_string)
					.unwrap_or_else(|| "-".into()),
				info.progress.as_deref().unwrap_or("-"),
				info.queue_length
					.as_ref()
					.map(ToString::to_string)
					.unwrap_or_else(|| "-".into()),
				total_err,
				consec_err,
				err_ago,
			));
		}
		format_table(table);

		Ok(())
	}

	pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
		let info = self
			.local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 })
			.await?
			.0;

		let mut table = vec![];
		table.push(format!("Task id:\t{}", info.id));
		table.push(format!("Worker name:\t{}", info.name));
		match &info.state {
			WorkerStateResp::Throttled { duration_secs } => {
				table.push(format!(
					"Worker state:\tBusy (throttled, paused for {:.3}s)",
					duration_secs
				));
			}
			s => {
				table.push(format!("Worker state:\t{}", format_worker_state(s)));
			}
		};
		if let Some(tql) = info.tranquility {
			table.push(format!("Tranquility:\t{}", tql));
		}

		table.push("".into());
		table.push(format!("Total errors:\t{}", info.errors));
		table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
		if let Some(err) = info.last_error {
			table.push(format!("Last error:\t{}", err.message));
			let tf = timeago::Formatter::new();
			table.push(format!(
				"Last error time:\t{}",
				tf.convert(Duration::from_secs(err.secs_ago))
			));
		}

		table.push("".into());
		if let Some(p) = info.progress {
			table.push(format!("Progress:\t{}", p));
		}
		if let Some(ql) = info.queue_length {
			table.push(format!("Queue length:\t{}", ql));
		}
		if let Some(pe) = info.persistent_errors {
			table.push(format!("Persistent errors:\t{}", pe));
		}

		for (i, s) in info.freeform.iter().enumerate() {
			if i == 0 {
				if table.last() != Some(&"".into()) {
					table.push("".into());
				}
				table.push(format!("Message:\t{}", s));
			} else {
				table.push(format!("\t{}", s));
			}
		}
		format_table(table);

		Ok(())
	}

	pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
		let res = self
			.api_request(GetWorkerVariableRequest {
				node: if all {
					"*".to_string()
				} else {
					hex::encode(self.rpc_host)
				},
				body: LocalGetWorkerVariableRequest { variable: var },
			})
			.await?;

		let mut table = vec![];
		for (node, vars) in res.success.iter() {
			for (key, val) in vars.0.iter() {
				table.push(format!("{:.16}\t{}\t{}", node, key, val));
			}
		}
		format_table(table);

		for (node, err) in res.error.iter() {
			eprintln!("{:.16}: error: {}", node, err);
		}

		Ok(())
	}

	pub async fn cmd_set_var(
		&self,
		all: bool,
		variable: String,
		value: String,
	) -> Result<(), Error> {
		let res = self
			.api_request(SetWorkerVariableRequest {
				node: if all {
					"*".to_string()
				} else {
					hex::encode(self.rpc_host)
				},
				body: LocalSetWorkerVariableRequest { variable, value },
			})
			.await?;

		let mut table = vec![];
		for (node, kv) in res.success.iter() {
			table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
		}
		format_table(table);

		for (node, err) in res.error.iter() {
			eprintln!("{:.16}: error: {}", node, err);
		}

		Ok(())
	}
}

fn format_worker_state(s: &WorkerStateResp) -> &'static str {
	match s {
		WorkerStateResp::Busy => "Busy",
		WorkerStateResp::Throttled { .. } => "Busy*",
		WorkerStateResp::Idle => "Idle",
		WorkerStateResp::Done => "Done",
	}
}