diff options
Diffstat (limited to 'src/garage/cli')
-rw-r--r-- | src/garage/cli/cmd.rs | 18 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 53 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 163 |
3 files changed, 200 insertions, 34 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index e352ddf2..0d180ecd 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -41,6 +41,9 @@ pub async fn cli_command_dispatch( } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, + Command::Block(bo) => { + cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await + } _ => unreachable!(), } } @@ -186,7 +189,20 @@ pub async fn cmd_admin( print_key_info(&key, &rb); } AdminRpc::WorkerList(wi, wlo) => { - print_worker_info(wi, wlo); + print_worker_list(wi, wlo); + } + AdminRpc::WorkerInfo(tid, wi) => { + print_worker_info(tid, wi); + } + AdminRpc::BlockErrorList(el) => { + print_block_error_list(el); + } + AdminRpc::BlockInfo { + hash, + refcount, + versions, + } => { + print_block_info(hash, refcount, versions); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 49a1f267..825c1813 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -49,7 +49,11 @@ pub enum Command { /// Manage background workers #[structopt(name = "worker", version = garage_version())] - Worker(WorkerOpt), + Worker(WorkerOperation), + + /// Low-level debug operations on data blocks + #[structopt(name = "block", version = garage_version())] + Block(BlockOperation), } #[derive(StructOpt, Debug)] @@ -513,20 +517,17 @@ pub struct StatsOpt { pub detailed: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct WorkerOpt { - #[structopt(subcommand)] - pub cmd: WorkerCmd, -} - #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerCmd { +pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] List { #[structopt(flatten)] opt: WorkerListOpt, }, + /// Get detailed information about a worker + #[structopt(name = "info", version = garage_version())] + Info { tid: usize }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] Set { @@ -551,9 +552,41 @@ pub enum WorkerSetCmd { #[structopt(name = "scrub-tranquility", version = garage_version())] ScrubTranquility { tranquility: u32 }, /// Set number of concurrent block resync workers - #[structopt(name = "resync-n-workers", version = garage_version())] - ResyncNWorkers { n_workers: usize }, + #[structopt(name = "resync-worker-count", version = garage_version())] + ResyncWorkerCount { worker_count: usize }, /// Set tranquility of block resync operations #[structopt(name = "resync-tranquility", version = garage_version())] ResyncTranquility { tranquility: u32 }, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum BlockOperation { + /// List all blocks that currently have a resync error + #[structopt(name = "list-errors", version = garage_version())] + ListErrors, + /// Get detailed information about a single block + #[structopt(name = "info", version = garage_version())] + Info { + /// Hash of the block for which to retrieve information + hash: String, + }, + /// Retry now the resync of one or many blocks + #[structopt(name = "retry-now", version = garage_version())] + RetryNow { + /// Retry all blocks that have a resync error + #[structopt(long = "all")] + all: bool, + /// Hashes of the block to retry to resync now + blocks: Vec<String>, + }, + /// Delete all objects referencing a missing block + #[structopt(name = "purge", version = garage_version())] + Purge { + /// Mandatory to confirm this operation + #[structopt(long = "yes")] + yes: bool, + /// Hashes of the block to purge + #[structopt(required = true)] + blocks: Vec<String>, + }, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 396938ae..63fd9eba 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -3,14 +3,17 @@ use std::time::Duration; use garage_util::background::*; use garage_util::crdt::*; -use garage_util::data::Uuid; +use garage_util::data::*; use garage_util::error::*; use garage_util::formater::format_table; use garage_util::time::*; +use garage_block::manager::BlockResyncErrorInfo; + use garage_model::bucket_table::*; use garage_model::key_table::*; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use garage_model::s3::version_table::Version; use crate::cli::structs::WorkerListOpt; @@ -241,7 +244,7 @@ pub fn find_matching_node( } } -pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { +pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { let mut wi = wi.into_iter().collect::<Vec<_>>(); wi.sort_by_key(|(tid, info)| { ( @@ -254,7 +257,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { ) }); - let mut table = vec![]; + let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; for (tid, info) in wi.iter() { if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { continue; @@ -263,33 +266,147 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { continue; } - table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); - if let Some(i) = &info.info { - table.push(format!("\t\t {}", i)); - } let tf = timeago::Formatter::new(); - let (err_ago, err_msg) = info + let err_ago = info .last_error .as_ref() - .map(|(m, t)| { - ( - tf.convert(Duration::from_millis(now_msec() - t)), - m.as_str(), - ) - }) - .unwrap_or(("(?) ago".into(), "(?)")); - if info.consecutive_errors > 0 { + .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + tid, + info.state, + info.name, + info.status + .tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.status.progress.as_deref().unwrap_or("-"), + info.status + .queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); + } + format_table(table); +} + +pub fn print_worker_info(tid: usize, info: WorkerInfo) { + let mut table = vec![]; + table.push(format!("Task id:\t{}", tid)); + table.push(format!("Worker name:\t{}", info.name)); + match info.state { + WorkerState::Throttled(t) => { table.push(format!( - "\t\t {} consecutive errors ({} total), last {}", - info.consecutive_errors, info.errors, err_ago, + "Worker state:\tBusy (throttled, paused for {:.3}s)", + t )); - table.push(format!("\t\t {}", err_msg)); - } else if info.errors > 0 { - table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); - if wlo.errors { - table.push(format!("\t\t {}", err_msg)); + } + s => { + table.push(format!("Worker state:\t{}", s)); + } + }; + if let Some(tql) = info.status.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some((s, t)) = info.last_error { + table.push(format!("Last error:\t{}", s)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_millis(now_msec() - t)) + )); + } + + table.push("".into()); + if let Some(p) = info.status.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.status.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.status.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.status.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); } } format_table(table); } + +pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { + let now = now_msec(); + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in el { + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + hex::encode(e.hash.as_slice()), + e.refcount, + e.error_count, + tf.convert(Duration::from_millis(now - e.last_try)), + tf2.convert(Duration::from_millis(e.next_try - now)) + )); + } + format_table(table); +} + +pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { + println!("Block hash: {}", hex::encode(hash.as_slice())); + println!("Refcount: {}", refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut nondeleted_count = 0; + for v in versions.iter() { + match v { + Ok(ver) => { + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}", + ver.uuid, + ver.bucket_id, + ver.key, + ver.deleted.get() + )); + if !ver.deleted.get() { + nondeleted_count += 1; + } + } + Err(vh) => { + table.push(format!("{:?}\t\t\tyes", vh)); + } + } + } + format_table(table); + + if refcount != nondeleted_count { + println!(); + println!("Warning: refcount does not match number of non-deleted versions"); + } +} |