aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli_v2
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/cli_v2')
-rw-r--r--src/garage/cli_v2/block.rs145
-rw-r--r--src/garage/cli_v2/bucket.rs48
-rw-r--r--src/garage/cli_v2/cluster.rs2
-rw-r--r--src/garage/cli_v2/key.rs2
-rw-r--r--src/garage/cli_v2/layout.rs2
-rw-r--r--src/garage/cli_v2/mod.rs104
-rw-r--r--src/garage/cli_v2/node.rs113
-rw-r--r--src/garage/cli_v2/worker.rs213
8 files changed, 564 insertions, 65 deletions
diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs
new file mode 100644
index 00000000..bfc0db4a
--- /dev/null
+++ b/src/garage/cli_v2/block.rs
@@ -0,0 +1,145 @@
+//use bytesize::ByteSize;
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {
+ match cmd {
+ BlockOperation::ListErrors => self.cmd_list_block_errors().await,
+ BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
+ BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
+ }
+ }
+
+ pub async fn cmd_list_block_errors(&self) -> Result<(), Error> {
+ let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0;
+
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in errors {
+ let next_try = if e.next_try_in_secs > 0 {
+ tf2.convert(Duration::from_secs(e.next_try_in_secs))
+ } else {
+ "asap".to_string()
+ };
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ e.block_hash,
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_secs(e.last_try_secs_ago)),
+ next_try
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetBlockInfoRequest { block_hash: hash })
+ .await?;
+
+ println!("Block hash: {}", info.block_hash);
+ println!("Refcount: {}", info.refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for ver in info.versions.iter() {
+ match &ver.backlink {
+ Some(BlockVersionBacklink::Object { bucket_id, key }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t\t{:?}",
+ ver.version_id, bucket_id, key, ver.deleted
+ ));
+ }
+ Some(BlockVersionBacklink::Upload {
+ upload_id,
+ upload_deleted: _,
+ upload_garbage_collected: _,
+ bucket_id,
+ key,
+ }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}",
+ ver.version_id,
+ bucket_id.as_deref().unwrap_or(""),
+ key.as_deref().unwrap_or(""),
+ upload_id,
+ ver.deleted
+ ));
+ }
+ None => {
+ table.push(format!("{:.16}\t\t\tyes", ver.version_id));
+ }
+ }
+ if !ver.deleted {
+ nondeleted_count += 1;
+ }
+ }
+ format_table(table);
+
+ if info.refcount != nondeleted_count {
+ println!();
+ println!(
+ "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
+ );
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
+ let req = match (all, blocks.len()) {
+ (true, 0) => LocalRetryBlockResyncRequest::All { all: true },
+ (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
+ block_hashes: blocks,
+ },
+ _ => {
+ return Err(Error::Message(
+ "Please specify block hashes or --all (not both)".into(),
+ ))
+ }
+ };
+
+ let res = self.local_api_request(req).await?;
+
+ println!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ res.count
+ );
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
+ if !yes {
+ return Err(Error::Message(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let res = self
+ .local_api_request(LocalPurgeBlocksRequest(blocks))
+ .await?;
+
+ println!(
+ "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
+ res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
+ );
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs
index 837ce783..c25c2c3e 100644
--- a/src/garage/cli_v2/bucket.rs
+++ b/src/garage/cli_v2/bucket.rs
@@ -3,9 +3,8 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@@ -22,15 +21,9 @@ impl Cli {
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
-
- // TODO
- x => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BucketOperation(x),
- )
- .await
- .ok_or_message("old error"),
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.cmd_cleanup_incomplete_uploads(query).await
+ }
}
}
@@ -520,4 +513,37 @@ impl Cli {
Ok(())
}
+
+ pub async fn cmd_cleanup_incomplete_uploads(
+ &self,
+ opt: CleanupIncompleteUploadsOpt,
+ ) -> Result<(), Error> {
+ let older_than = parse_duration::parse::parse(&opt.older_than)
+ .ok_or_message("Invalid duration passed for --older-than parameter")?;
+
+ for b in opt.buckets.iter() {
+ let bucket = self
+ .api_request(GetBucketInfoRequest {
+ id: None,
+ global_alias: None,
+ search: Some(b.clone()),
+ })
+ .await?;
+
+ let res = self
+ .api_request(CleanupIncompleteUploadsRequest {
+ bucket_id: bucket.id.clone(),
+ older_than_secs: older_than.as_secs(),
+ })
+ .await?;
+
+ if res.uploads_deleted > 0 {
+ println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted);
+ } else {
+ println!("{:.16}: no uploads deleted", bucket.id);
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs
index 34a28674..6eb65d12 100644
--- a/src/garage/cli_v2/cluster.rs
+++ b/src/garage/cli_v2/cluster.rs
@@ -2,7 +2,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::layout::*;
diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli_v2/key.rs
index ff403a9a..b956906d 100644
--- a/src/garage/cli_v2/key.rs
+++ b/src/garage/cli_v2/key.rs
@@ -2,7 +2,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs
index d44771c7..2f14b332 100644
--- a/src/garage/cli_v2/layout.rs
+++ b/src/garage/cli_v2/layout.rs
@@ -3,7 +3,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::layout as cli_v1;
use crate::cli::structs::*;
diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs
index 692b7c1c..28c7c824 100644
--- a/src/garage/cli_v2/mod.rs
+++ b/src/garage/cli_v2/mod.rs
@@ -3,6 +3,10 @@ pub mod cluster;
pub mod key;
pub mod layout;
+pub mod block;
+pub mod node;
+pub mod worker;
+
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@@ -12,17 +16,15 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use garage_api::admin::api::*;
-use garage_api::admin::EndpointHandler as AdminApiEndpoint;
+use garage_api_admin::api::*;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
+use garage_api_admin::RequestHandler;
-use crate::admin::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
-use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
- pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
+ pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@@ -36,63 +38,35 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
-
- // TODO
- Command::Repair(ro) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::LaunchRepair(ro),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Stats(so) => {
- cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
- .await
- .ok_or_message("cli_v1")
- }
- Command::Worker(wo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::Worker(wo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Block(bo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BlockOperation(bo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Meta(mo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::MetaOperation(mo),
- )
- .await
- .ok_or_message("cli_v1"),
+ Command::Worker(wo) => self.cmd_worker(wo).await,
+ Command::Block(bo) => self.cmd_block(bo).await,
+ Command::Meta(mo) => self.cmd_meta(mo).await,
+ Command::Stats(so) => self.cmd_stats(so).await,
+ Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}
}
- pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
+ pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
where
- T: AdminApiEndpoint,
+ T: RequestHandler,
AdminApiRequest: From<T>,
- <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
+ <T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
- .admin_rpc_endpoint
- .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
- .await?
- .ok_or_message("rpc")?
+ .proxy_rpc_endpoint
+ .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
+ .await??
{
- AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
- .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
- AdminRpc::ApiErrorResponse {
+ ProxyRpcResponse::ProxyApiOkResponse(resp) => {
+ <T as RequestHandler>::Response::try_from(resp).map_err(|_| {
+ Error::Message(format!("{} returned unexpected response", req_name))
+ })
+ }
+ ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
@@ -103,4 +77,32 @@ impl Cli {
m => Err(Error::unexpected_rpc_message(m)),
}
}
+
+ pub async fn local_api_request<T>(
+ &self,
+ req: T,
+ ) -> Result<<T as RequestHandler>::Response, Error>
+ where
+ T: RequestHandler,
+ MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
+ AdminApiRequest: From<MultiRequest<T>>,
+ <MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
+ {
+ let req = MultiRequest {
+ node: hex::encode(self.rpc_host),
+ body: req,
+ };
+ let resp = self.api_request(req).await?;
+
+ if let Some((_, e)) = resp.error.into_iter().next() {
+ return Err(Error::Message(e));
+ }
+ if resp.success.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} responses returned, expected 1",
+ resp.success.len()
+ )));
+ }
+ Ok(resp.success.into_iter().next().unwrap().1)
+ }
}
diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs
new file mode 100644
index 00000000..c5d0cdea
--- /dev/null
+++ b/src/garage/cli_v2/node.rs
@@ -0,0 +1,113 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> {
+ let MetaOperation::Snapshot { all } = cmd;
+
+ let res = self
+ .api_request(CreateMetadataSnapshotRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalCreateMetadataSnapshotRequest,
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tSnapshot created", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> {
+ let res = self
+ .api_request(GetNodeStatisticsRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetNodeStatisticsRequest,
+ })
+ .await?;
+
+ for (node, res) in res.success.iter() {
+ println!("======================");
+ println!("Stats for node {:.16}:\n", node);
+ println!("{}\n", res.freeform);
+ }
+
+ for (node, err) in res.error.iter() {
+ println!("======================");
+ println!("Node {:.16}: error: {}\n", node, err);
+ }
+
+ let res = self.api_request(GetClusterStatisticsRequest).await?;
+ println!("======================");
+ println!("Cluster statistics:\n");
+ println!("{}\n", res.freeform);
+
+ Ok(())
+ }
+
+ pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
+ if !cmd.yes {
+ return Err(Error::Message(
+ "Please add --yes to start the repair operation".into(),
+ ));
+ }
+
+ let repair_type = match cmd.what {
+ RepairWhat::Tables => RepairType::Tables,
+ RepairWhat::Blocks => RepairType::Blocks,
+ RepairWhat::Versions => RepairType::Versions,
+ RepairWhat::MultipartUploads => RepairType::MultipartUploads,
+ RepairWhat::BlockRefs => RepairType::BlockRefs,
+ RepairWhat::BlockRc => RepairType::BlockRc,
+ RepairWhat::Rebalance => RepairType::Rebalance,
+ RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
+ ScrubCmd::Start => ScrubCommand::Start,
+ ScrubCmd::Cancel => ScrubCommand::Cancel,
+ ScrubCmd::Pause => ScrubCommand::Pause,
+ ScrubCmd::Resume => ScrubCommand::Resume,
+ }),
+ };
+
+ let res = self
+ .api_request(LaunchRepairOperationRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalLaunchRepairOperationRequest { repair_type },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tRepair launched", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs
new file mode 100644
index 00000000..9c248a39
--- /dev/null
+++ b/src/garage/cli_v2/worker.rs
@@ -0,0 +1,213 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
+ match cmd {
+ WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
+ WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.cmd_get_var(all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.cmd_set_var(all_nodes, variable, value).await,
+ }
+ }
+
+ pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
+ let mut list = self
+ .local_api_request(LocalListWorkersRequest {
+ busy_only: opt.busy,
+ error_only: opt.errors,
+ })
+ .await?
+ .0;
+
+ list.sort_by_key(|info| {
+ (
+ match info.state {
+ WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
+ WorkerStateResp::Idle => 1,
+ WorkerStateResp::Done => 2,
+ },
+ info.id,
+ )
+ });
+
+ let mut table =
+ vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
+ let tf = timeago::Formatter::new();
+ for info in list.iter() {
+ let err_ago = info
+ .last_error
+ .as_ref()
+ .map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ info.id,
+ format_worker_state(&info.state),
+ info.name,
+ info.tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.progress.as_deref().unwrap_or("-"),
+ info.queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 })
+ .await?
+ .0;
+
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", info.id));
+ table.push(format!("Worker name:\t{}", info.name));
+ match &info.state {
+ WorkerStateResp::Throttled { duration_secs } => {
+ table.push(format!(
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ duration_secs
+ ));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", format_worker_state(s)));
+ }
+ };
+ if let Some(tql) = info.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some(err) = info.last_error {
+ table.push(format!("Last error:\t{}", err.message));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_secs(err.secs_ago))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
+ }
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
+ }
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
+ let res = self
+ .api_request(GetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetWorkerVariableRequest { variable: var },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, vars) in res.success.iter() {
+ for (key, val) in vars.0.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, key, val));
+ }
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_set_var(
+ &self,
+ all: bool,
+ variable: String,
+ value: String,
+ ) -> Result<(), Error> {
+ let res = self
+ .api_request(SetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalSetWorkerVariableRequest { variable, value },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, kv) in res.success.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+}
+
+fn format_worker_state(s: &WorkerStateResp) -> &'static str {
+ match s {
+ WorkerStateResp::Busy => "Busy",
+ WorkerStateResp::Throttled { .. } => "Busy*",
+ WorkerStateResp::Idle => "Idle",
+ WorkerStateResp::Done => "Done",
+ }
+}