diff options
Diffstat (limited to 'src/garage/cli_v2')
-rw-r--r-- | src/garage/cli_v2/block.rs | 145 | ||||
-rw-r--r-- | src/garage/cli_v2/bucket.rs | 48 | ||||
-rw-r--r-- | src/garage/cli_v2/cluster.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/key.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/layout.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/mod.rs | 104 | ||||
-rw-r--r-- | src/garage/cli_v2/node.rs | 113 | ||||
-rw-r--r-- | src/garage/cli_v2/worker.rs | 213 |
8 files changed, 564 insertions, 65 deletions
diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs new file mode 100644 index 00000000..bfc0db4a --- /dev/null +++ b/src/garage/cli_v2/block.rs @@ -0,0 +1,145 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { + match cmd { + BlockOperation::ListErrors => self.cmd_list_block_errors().await, + BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await, + BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await, + BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await, + } + } + + pub async fn cmd_list_block_errors(&self) -> Result<(), Error> { + let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0; + + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in errors { + let next_try = if e.next_try_in_secs > 0 { + tf2.convert(Duration::from_secs(e.next_try_in_secs)) + } else { + "asap".to_string() + }; + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + e.block_hash, + e.refcount, + e.error_count, + tf.convert(Duration::from_secs(e.last_try_secs_ago)), + next_try + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetBlockInfoRequest { block_hash: hash }) + .await?; + + println!("Block hash: {}", info.block_hash); + println!("Refcount: {}", info.refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; + let mut nondeleted_count = 0; + for ver in info.versions.iter() { + match &ver.backlink { + Some(BlockVersionBacklink::Object { bucket_id, key }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t\t{:?}", + ver.version_id, bucket_id, key, ver.deleted + )); + } + Some(BlockVersionBacklink::Upload { + upload_id, + upload_deleted: _, + upload_garbage_collected: _, + bucket_id, + key, + }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}", + ver.version_id, + bucket_id.as_deref().unwrap_or(""), + key.as_deref().unwrap_or(""), + upload_id, + ver.deleted + )); + } + None => { + table.push(format!("{:.16}\t\t\tyes", ver.version_id)); + } + } + if !ver.deleted { + nondeleted_count += 1; + } + } + format_table(table); + + if info.refcount != nondeleted_count { + println!(); + println!( + "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." + ); + } + + Ok(()) + } + + pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> { + let req = match (all, blocks.len()) { + (true, 0) => LocalRetryBlockResyncRequest::All { all: true }, + (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks { + block_hashes: blocks, + }, + _ => { + return Err(Error::Message( + "Please specify block hashes or --all (not both)".into(), + )) + } + }; + + let res = self.local_api_request(req).await?; + + println!( + "{} blocks returned in queue for a retry now (check logs to see results)", + res.count + ); + + Ok(()) + } + + pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> { + if !yes { + return Err(Error::Message( + "Pass the --yes flag to confirm block purge operation.".into(), + )); + } + + let res = self + .local_api_request(LocalPurgeBlocksRequest(blocks)) + .await?; + + println!( + "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads", + res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted, + ); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs index 837ce783..c25c2c3e 100644 --- a/src/garage/cli_v2/bucket.rs +++ b/src/garage/cli_v2/bucket.rs @@ -3,9 +3,8 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; -use crate::cli as cli_v1; use crate::cli::structs::*; use crate::cli_v2::*; @@ -22,15 +21,9 @@ impl Cli { BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await, BucketOperation::Website(query) => self.cmd_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await, - - // TODO - x => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BucketOperation(x), - ) - .await - .ok_or_message("old error"), + BucketOperation::CleanupIncompleteUploads(query) => { + self.cmd_cleanup_incomplete_uploads(query).await + } } } @@ -520,4 +513,37 @@ impl Cli { Ok(()) } + + pub async fn cmd_cleanup_incomplete_uploads( + &self, + opt: CleanupIncompleteUploadsOpt, + ) -> Result<(), Error> { + let older_than = parse_duration::parse::parse(&opt.older_than) + .ok_or_message("Invalid duration passed for --older-than parameter")?; + + for b in opt.buckets.iter() { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(b.clone()), + }) + .await?; + + let res = self + .api_request(CleanupIncompleteUploadsRequest { + bucket_id: bucket.id.clone(), + older_than_secs: older_than.as_secs(), + }) + .await?; + + if res.uploads_deleted > 0 { + println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted); + } else { + println!("{:.16}: no uploads deleted", bucket.id); + } + } + + Ok(()) + } } diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs index 34a28674..6eb65d12 100644 --- a/src/garage/cli_v2/cluster.rs +++ b/src/garage/cli_v2/cluster.rs @@ -2,7 +2,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::structs::*; use crate::cli_v2::layout::*; diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli_v2/key.rs index ff403a9a..b956906d 100644 --- a/src/garage/cli_v2/key.rs +++ b/src/garage/cli_v2/key.rs @@ -2,7 +2,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::structs::*; use crate::cli_v2::*; diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs index d44771c7..2f14b332 100644 --- a/src/garage/cli_v2/layout.rs +++ b/src/garage/cli_v2/layout.rs @@ -3,7 +3,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::layout as cli_v1; use crate::cli::structs::*; diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 692b7c1c..28c7c824 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,10 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod block; +pub mod node; +pub mod worker; + use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; @@ -12,17 +16,15 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use garage_api::admin::api::*; -use garage_api::admin::EndpointHandler as AdminApiEndpoint; +use garage_api_admin::api::*; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; +use garage_api_admin::RequestHandler; -use crate::admin::*; -use crate::cli as cli_v1; use crate::cli::structs::*; -use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>, - pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>, + pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>, pub rpc_host: NodeID, } @@ -36,63 +38,35 @@ impl Cli { Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, - - // TODO - Command::Repair(ro) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::LaunchRepair(ro), - ) - .await - .ok_or_message("cli_v1"), - Command::Stats(so) => { - cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) - .await - .ok_or_message("cli_v1") - } - Command::Worker(wo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), - ) - .await - .ok_or_message("cli_v1"), - Command::Block(bo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BlockOperation(bo), - ) - .await - .ok_or_message("cli_v1"), - Command::Meta(mo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::MetaOperation(mo), - ) - .await - .ok_or_message("cli_v1"), + Command::Worker(wo) => self.cmd_worker(wo).await, + Command::Block(bo) => self.cmd_block(bo).await, + Command::Meta(mo) => self.cmd_meta(mo).await, + Command::Stats(so) => self.cmd_stats(so).await, + Command::Repair(ro) => self.cmd_repair(ro).await, _ => unreachable!(), } } - pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error> + pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error> where - T: AdminApiEndpoint, + T: RequestHandler, AdminApiRequest: From<T>, - <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>, + <T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>, { let req = AdminApiRequest::from(req); let req_name = req.name(); match self - .admin_rpc_endpoint - .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL) - .await? - .ok_or_message("rpc")? + .proxy_rpc_endpoint + .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL) + .await?? { - AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp) - .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))), - AdminRpc::ApiErrorResponse { + ProxyRpcResponse::ProxyApiOkResponse(resp) => { + <T as RequestHandler>::Response::try_from(resp).map_err(|_| { + Error::Message(format!("{} returned unexpected response", req_name)) + }) + } + ProxyRpcResponse::ApiErrorResponse { http_code, error_code, message, @@ -103,4 +77,32 @@ impl Cli { m => Err(Error::unexpected_rpc_message(m)), } } + + pub async fn local_api_request<T>( + &self, + req: T, + ) -> Result<<T as RequestHandler>::Response, Error> + where + T: RequestHandler, + MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>, + AdminApiRequest: From<MultiRequest<T>>, + <MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>, + { + let req = MultiRequest { + node: hex::encode(self.rpc_host), + body: req, + }; + let resp = self.api_request(req).await?; + + if let Some((_, e)) = resp.error.into_iter().next() { + return Err(Error::Message(e)); + } + if resp.success.len() != 1 { + return Err(Error::Message(format!( + "{} responses returned, expected 1", + resp.success.len() + ))); + } + Ok(resp.success.into_iter().next().unwrap().1) + } } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs new file mode 100644 index 00000000..c5d0cdea --- /dev/null +++ b/src/garage/cli_v2/node.rs @@ -0,0 +1,113 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> { + let MetaOperation::Snapshot { all } = cmd; + + let res = self + .api_request(CreateMetadataSnapshotRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalCreateMetadataSnapshotRequest, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tSnapshot created", node)); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> { + let res = self + .api_request(GetNodeStatisticsRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetNodeStatisticsRequest, + }) + .await?; + + for (node, res) in res.success.iter() { + println!("======================"); + println!("Stats for node {:.16}:\n", node); + println!("{}\n", res.freeform); + } + + for (node, err) in res.error.iter() { + println!("======================"); + println!("Node {:.16}: error: {}\n", node, err); + } + + let res = self.api_request(GetClusterStatisticsRequest).await?; + println!("======================"); + println!("Cluster statistics:\n"); + println!("{}\n", res.freeform); + + Ok(()) + } + + pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> { + if !cmd.yes { + return Err(Error::Message( + "Please add --yes to start the repair operation".into(), + )); + } + + let repair_type = match cmd.what { + RepairWhat::Tables => RepairType::Tables, + RepairWhat::Blocks => RepairType::Blocks, + RepairWhat::Versions => RepairType::Versions, + RepairWhat::MultipartUploads => RepairType::MultipartUploads, + RepairWhat::BlockRefs => RepairType::BlockRefs, + RepairWhat::BlockRc => RepairType::BlockRc, + RepairWhat::Rebalance => RepairType::Rebalance, + RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd { + ScrubCmd::Start => ScrubCommand::Start, + ScrubCmd::Cancel => ScrubCommand::Cancel, + ScrubCmd::Pause => ScrubCommand::Pause, + ScrubCmd::Resume => ScrubCommand::Resume, + }), + }; + + let res = self + .api_request(LaunchRepairOperationRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalLaunchRepairOperationRequest { repair_type }, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tRepair launched", node)); + } + format_table(table); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs new file mode 100644 index 00000000..9c248a39 --- /dev/null +++ b/src/garage/cli_v2/worker.rs @@ -0,0 +1,213 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { + match cmd { + WorkerOperation::List { opt } => self.cmd_list_workers(opt).await, + WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await, + WorkerOperation::Get { + all_nodes, + variable, + } => self.cmd_get_var(all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.cmd_set_var(all_nodes, variable, value).await, + } + } + + pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> { + let mut list = self + .local_api_request(LocalListWorkersRequest { + busy_only: opt.busy, + error_only: opt.errors, + }) + .await? + .0; + + list.sort_by_key(|info| { + ( + match info.state { + WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0, + WorkerStateResp::Idle => 1, + WorkerStateResp::Done => 2, + }, + info.id, + ) + }); + + let mut table = + vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; + let tf = timeago::Formatter::new(); + for info in list.iter() { + let err_ago = info + .last_error + .as_ref() + .map(|x| tf.convert(Duration::from_secs(x.secs_ago))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + info.id, + format_worker_state(&info.state), + info.name, + info.tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.progress.as_deref().unwrap_or("-"), + info.queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 }) + .await? + .0; + + let mut table = vec![]; + table.push(format!("Task id:\t{}", info.id)); + table.push(format!("Worker name:\t{}", info.name)); + match &info.state { + WorkerStateResp::Throttled { duration_secs } => { + table.push(format!( + "Worker state:\tBusy (throttled, paused for {:.3}s)", + duration_secs + )); + } + s => { + table.push(format!("Worker state:\t{}", format_worker_state(s))); + } + }; + if let Some(tql) = info.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some(err) = info.last_error { + table.push(format!("Last error:\t{}", err.message)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_secs(err.secs_ago)) + )); + } + + table.push("".into()); + if let Some(p) = info.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); + } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); + } + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> { + let res = self + .api_request(GetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetWorkerVariableRequest { variable: var }, + }) + .await?; + + let mut table = vec![]; + for (node, vars) in res.success.iter() { + for (key, val) in vars.0.iter() { + table.push(format!("{:.16}\t{}\t{}", node, key, val)); + } + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } + + pub async fn cmd_set_var( + &self, + all: bool, + variable: String, + value: String, + ) -> Result<(), Error> { + let res = self + .api_request(SetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalSetWorkerVariableRequest { variable, value }, + }) + .await?; + + let mut table = vec![]; + for (node, kv) in res.success.iter() { + table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value)); + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } +} + +fn format_worker_state(s: &WorkerStateResp) -> &'static str { + match s { + WorkerStateResp::Busy => "Busy", + WorkerStateResp::Throttled { .. } => "Busy*", + WorkerStateResp::Idle => "Idle", + WorkerStateResp::Done => "Done", + } +} |