aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-04 11:34:43 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-04 11:34:43 +0100
commit570e5e5bbb7a3eac41350db9433e28ed289b97f4 (patch)
treea7fc299ba180098be5a3bef28a39256870ce697b /src/garage/cli
parent6e44369cbc810b8912ca0f7f5fd293e87f10c851 (diff)
parent4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff)
downloadgarage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.tar.gz
garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.zip
Merge branch 'main' into next
Diffstat (limited to 'src/garage/cli')
-rw-r--r--src/garage/cli/cmd.rs18
-rw-r--r--src/garage/cli/structs.rs53
-rw-r--r--src/garage/cli/util.rs163
3 files changed, 200 insertions, 34 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index e352ddf2..0d180ecd 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
+ Command::Block(bo) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
+ }
_ => unreachable!(),
}
}
@@ -186,7 +189,20 @@ pub async fn cmd_admin(
print_key_info(&key, &rb);
}
AdminRpc::WorkerList(wi, wlo) => {
- print_worker_info(wi, wlo);
+ print_worker_list(wi, wlo);
+ }
+ AdminRpc::WorkerInfo(tid, wi) => {
+ print_worker_info(tid, wi);
+ }
+ AdminRpc::BlockErrorList(el) => {
+ print_block_error_list(el);
+ }
+ AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ } => {
+ print_block_info(hash, refcount, versions);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 49a1f267..825c1813 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -49,7 +49,11 @@ pub enum Command {
/// Manage background workers
#[structopt(name = "worker", version = garage_version())]
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+
+ /// Low-level debug operations on data blocks
+ #[structopt(name = "block", version = garage_version())]
+ Block(BlockOperation),
}
#[derive(StructOpt, Debug)]
@@ -513,20 +517,17 @@ pub struct StatsOpt {
pub detailed: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
-pub struct WorkerOpt {
- #[structopt(subcommand)]
- pub cmd: WorkerCmd,
-}
-
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerCmd {
+pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
List {
#[structopt(flatten)]
opt: WorkerListOpt,
},
+ /// Get detailed information about a worker
+ #[structopt(name = "info", version = garage_version())]
+ Info { tid: usize },
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
@@ -551,9 +552,41 @@ pub enum WorkerSetCmd {
#[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers
- #[structopt(name = "resync-n-workers", version = garage_version())]
- ResyncNWorkers { n_workers: usize },
+ #[structopt(name = "resync-worker-count", version = garage_version())]
+ ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum BlockOperation {
+ /// List all blocks that currently have a resync error
+ #[structopt(name = "list-errors", version = garage_version())]
+ ListErrors,
+ /// Get detailed information about a single block
+ #[structopt(name = "info", version = garage_version())]
+ Info {
+ /// Hash of the block for which to retrieve information
+ hash: String,
+ },
+ /// Retry now the resync of one or many blocks
+ #[structopt(name = "retry-now", version = garage_version())]
+ RetryNow {
+ /// Retry all blocks that have a resync error
+ #[structopt(long = "all")]
+ all: bool,
+ /// Hashes of the block to retry to resync now
+ blocks: Vec<String>,
+ },
+ /// Delete all objects referencing a missing block
+ #[structopt(name = "purge", version = garage_version())]
+ Purge {
+ /// Mandatory to confirm this operation
+ #[structopt(long = "yes")]
+ yes: bool,
+ /// Hashes of the block to purge
+ #[structopt(required = true)]
+ blocks: Vec<String>,
+ },
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 396938ae..63fd9eba 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -3,14 +3,17 @@ use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*;
+use garage_block::manager::BlockResyncErrorInfo;
+
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use garage_model::s3::version_table::Version;
use crate::cli::structs::WorkerListOpt;
@@ -241,7 +244,7 @@ pub fn find_matching_node(
}
}
-pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
@@ -254,7 +257,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
)
});
- let mut table = vec![];
+ let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue;
@@ -263,33 +266,147 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
continue;
}
- table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
- if let Some(i) = &info.info {
- table.push(format!("\t\t {}", i));
- }
let tf = timeago::Formatter::new();
- let (err_ago, err_msg) = info
+ let err_ago = info
.last_error
.as_ref()
- .map(|(m, t)| {
- (
- tf.convert(Duration::from_millis(now_msec() - t)),
- m.as_str(),
- )
- })
- .unwrap_or(("(?) ago".into(), "(?)"));
- if info.consecutive_errors > 0 {
+ .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ tid,
+ info.state,
+ info.name,
+ info.status
+ .tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.status.progress.as_deref().unwrap_or("-"),
+ info.status
+ .queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
+ }
+ format_table(table);
+}
+
+pub fn print_worker_info(tid: usize, info: WorkerInfo) {
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", tid));
+ table.push(format!("Worker name:\t{}", info.name));
+ match info.state {
+ WorkerState::Throttled(t) => {
table.push(format!(
- "\t\t {} consecutive errors ({} total), last {}",
- info.consecutive_errors, info.errors, err_ago,
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ t
));
- table.push(format!("\t\t {}", err_msg));
- } else if info.errors > 0 {
- table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
- if wlo.errors {
- table.push(format!("\t\t {}", err_msg));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", s));
+ }
+ };
+ if let Some(tql) = info.status.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some((s, t)) = info.last_error {
+ table.push(format!("Last error:\t{}", s));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_millis(now_msec() - t))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.status.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.status.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.status.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.status.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
}
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
}
}
format_table(table);
}
+
+pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
+ let now = now_msec();
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in el {
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ hex::encode(e.hash.as_slice()),
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_millis(now - e.last_try)),
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ ));
+ }
+ format_table(table);
+}
+
+pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+ println!("Block hash: {}", hex::encode(hash.as_slice()));
+ println!("Refcount: {}", refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for v in versions.iter() {
+ match v {
+ Ok(ver) => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}",
+ ver.uuid,
+ ver.bucket_id,
+ ver.key,
+ ver.deleted.get()
+ ));
+ if !ver.deleted.get() {
+ nondeleted_count += 1;
+ }
+ }
+ Err(vh) => {
+ table.push(format!("{:?}\t\t\tyes", vh));
+ }
+ }
+ }
+ format_table(table);
+
+ if refcount != nondeleted_count {
+ println!();
+ println!("Warning: refcount does not match number of non-deleted versions");
+ }
+}