diff options
Diffstat (limited to 'src/garage/cli')
-rw-r--r-- | src/garage/cli/cmd.rs | 30 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 79 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 202 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 137 |
4 files changed, 313 insertions, 135 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index a90277a0..c8b96489 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,6 +1,8 @@ use std::collections::HashSet; +use std::time::Duration; use garage_util::error::*; +use garage_util::formater::format_table; use garage_rpc::layout::*; use garage_rpc::system::*; @@ -38,13 +40,14 @@ pub async fn cli_command_dispatch( cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, _ => unreachable!(), } } pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, @@ -85,19 +88,21 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> format_table(healthy_nodes); let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|adv| !adv.is_up); + let failure_case_1 = status + .iter() + .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); let failure_case_2 = layout .roles .items() .iter() - .filter(|(_, _, v)| v.0.is_some()) - .any(|(id, _, _)| !status_keys.contains(id)); + .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some()); if failure_case_1 || failure_case_2 { println!("\n==== FAILED NODES ===="); let mut failed_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; for adv in status.iter().filter(|adv| !adv.is_up) { if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { + let tf = timeago::Formatter::new(); failed_nodes.push(format!( "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", id = adv.id, @@ -108,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> capacity = cfg.capacity_string(), last_seen = adv .last_seen_secs_ago - .map(|s| format!("{}s ago", s)) + .map(|s| tf.convert(Duration::from_secs(s))) .unwrap_or_else(|| "never seen".into()), )); } @@ -144,7 +149,7 @@ pub async fn cmd_connect( args: ConnectNodeOpt, ) -> Result<(), Error> { match rpc_cli - .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) + .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) .await?? { SystemRpc::Ok => { @@ -160,15 +165,19 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { + match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } AdminRpc::BucketList(bl) => { print_bucket_list(bl); } - AdminRpc::BucketInfo(bucket, rk) => { - print_bucket_info(&bucket, &rk); + AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + } => { + print_bucket_info(&bucket, &relevant_keys, &counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); @@ -176,6 +185,9 @@ pub async fn cmd_admin( AdminRpc::KeyInfo(key, rb) => { print_key_info(&key, &rb); } + AdminRpc::WorkerList(wi, wlo) => { + print_worker_info(wi, wlo); + } r => { error!("Unexpected response: {:?}", r); } diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index e76f7737..3884bb92 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,6 +1,6 @@ use garage_util::crdt::Crdt; -use garage_util::data::*; use garage_util::error::*; +use garage_util::formater::format_table; use garage_rpc::layout::*; use garage_rpc::system::*; @@ -36,21 +36,29 @@ pub async fn cmd_assign_role( args: AssignRoleOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let added_nodes = args .node_ids .iter() - .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id)) + .map(|node_id| { + find_matching_node( + status + .iter() + .map(|adv| adv.id) + .chain(layout.node_ids().iter().cloned()), + node_id, + ) + }) .collect::<Result<Vec<_>, _>>()?; - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let mut roles = layout.roles.clone(); roles.merge(&layout.staging); @@ -203,31 +211,9 @@ pub async fn cmd_apply_layout( rpc_host: NodeID, apply_opt: ApplyLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match apply_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - - layout.version += 1; + let layout = layout.apply_staged_changes(apply_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -242,25 +228,9 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match revert_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.version += 1; + let layout = layout.revert_staged_changes(revert_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -275,7 +245,7 @@ pub async fn fetch_layout( rpc_host: NodeID, ) -> Result<ClusterLayout, Error> { match rpc_cli - .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { SystemRpc::AdvertiseClusterLayout(t) => Ok(t), @@ -291,7 +261,7 @@ pub async fn send_layout( rpc_cli .call( &rpc_host, - &SystemRpc::AdvertiseClusterLayout(layout), + SystemRpc::AdvertiseClusterLayout(layout), PRIO_NORMAL, ) .await??; @@ -323,11 +293,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { - if !layout.staging.items().is_empty() { + let has_changes = layout + .staging + .items() + .iter() + .any(|(k, _, v)| layout.roles.get(k) != Some(v)); + + if has_changes { println!(); println!("==== STAGED ROLE CHANGES ===="); let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; for (id, _, role) in layout.staging.items().iter() { + if layout.roles.get(id) == Some(role) { + continue; + } if let Some(role) = &role.0 { let tags = role.tags.join(","); table.push(format!( diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..06548e89 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,55 +1,65 @@ use serde::{Deserialize, Serialize}; - use structopt::StructOpt; +use garage_util::version::garage_version; + #[derive(StructOpt, Debug)] pub enum Command { /// Run Garage server - #[structopt(name = "server")] + #[structopt(name = "server", version = garage_version())] Server, /// Get network status - #[structopt(name = "status")] + #[structopt(name = "status", version = garage_version())] Status, /// Operations on individual Garage nodes - #[structopt(name = "node")] + #[structopt(name = "node", version = garage_version())] Node(NodeOperation), /// Operations on the assignation of node roles in the cluster layout - #[structopt(name = "layout")] + #[structopt(name = "layout", version = garage_version())] Layout(LayoutOperation), /// Operations on buckets - #[structopt(name = "bucket")] + #[structopt(name = "bucket", version = garage_version())] Bucket(BucketOperation), /// Operations on S3 access keys - #[structopt(name = "key")] + #[structopt(name = "key", version = garage_version())] Key(KeyOperation), /// Run migrations from previous Garage version /// (DO NOT USE WITHOUT READING FULL DOCUMENTATION) - #[structopt(name = "migrate")] + #[structopt(name = "migrate", version = garage_version())] Migrate(MigrateOpt), - /// Start repair of node data - #[structopt(name = "repair")] + /// Start repair of node data on remote node + #[structopt(name = "repair", version = garage_version())] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair", version = garage_version())] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics - #[structopt(name = "stats")] + #[structopt(name = "stats", version = garage_version())] Stats(StatsOpt), + + /// Manage background workers + #[structopt(name = "worker", version = garage_version())] + Worker(WorkerOpt), } #[derive(StructOpt, Debug)] pub enum NodeOperation { /// Print identifier (public key) of this Garage node - #[structopt(name = "id")] + #[structopt(name = "id", version = garage_version())] NodeId(NodeIdOpt), /// Connect to Garage node that is currently isolated from the system - #[structopt(name = "connect")] + #[structopt(name = "connect", version = garage_version())] Connect(ConnectNodeOpt), } @@ -70,23 +80,23 @@ pub struct ConnectNodeOpt { #[derive(StructOpt, Debug)] pub enum LayoutOperation { /// Assign role to Garage node - #[structopt(name = "assign")] + #[structopt(name = "assign", version = garage_version())] Assign(AssignRoleOpt), /// Remove role from Garage cluster node - #[structopt(name = "remove")] + #[structopt(name = "remove", version = garage_version())] Remove(RemoveRoleOpt), /// Show roles currently assigned to nodes and changes staged for commit - #[structopt(name = "show")] + #[structopt(name = "show", version = garage_version())] Show, /// Apply staged changes to cluster layout - #[structopt(name = "apply")] + #[structopt(name = "apply", version = garage_version())] Apply(ApplyLayoutOpt), /// Revert staged changes to cluster layout - #[structopt(name = "revert")] + #[structopt(name = "revert", version = garage_version())] Revert(RevertLayoutOpt), } @@ -141,40 +151,44 @@ pub struct RevertLayoutOpt { #[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum BucketOperation { /// List buckets - #[structopt(name = "list")] + #[structopt(name = "list", version = garage_version())] List, /// Get bucket info - #[structopt(name = "info")] + #[structopt(name = "info", version = garage_version())] Info(BucketOpt), /// Create bucket - #[structopt(name = "create")] + #[structopt(name = "create", version = garage_version())] Create(BucketOpt), /// Delete bucket - #[structopt(name = "delete")] + #[structopt(name = "delete", version = garage_version())] Delete(DeleteBucketOpt), /// Alias bucket under new name - #[structopt(name = "alias")] + #[structopt(name = "alias", version = garage_version())] Alias(AliasBucketOpt), /// Remove bucket alias - #[structopt(name = "unalias")] + #[structopt(name = "unalias", version = garage_version())] Unalias(UnaliasBucketOpt), /// Allow key to read or write to bucket - #[structopt(name = "allow")] + #[structopt(name = "allow", version = garage_version())] Allow(PermBucketOpt), /// Deny key from reading or writing to bucket - #[structopt(name = "deny")] + #[structopt(name = "deny", version = garage_version())] Deny(PermBucketOpt), /// Expose as website or not - #[structopt(name = "website")] + #[structopt(name = "website", version = garage_version())] Website(WebsiteOpt), + + /// Set the quotas for this bucket + #[structopt(name = "set-quotas", version = garage_version())] + SetQuotas(SetQuotasOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -262,37 +276,52 @@ pub struct PermBucketOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct SetQuotasOpt { + /// Bucket name + pub bucket: String, + + /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB, + /// or `none` for no size restriction) + #[structopt(long = "max-size")] + pub max_size: Option<String>, + + /// Set a maximum number of objects for the bucket (or `none` for no restriction) + #[structopt(long = "max-objects")] + pub max_objects: Option<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys - #[structopt(name = "list")] + #[structopt(name = "list", version = garage_version())] List, /// Get key info - #[structopt(name = "info")] + #[structopt(name = "info", version = garage_version())] Info(KeyOpt), /// Create new key - #[structopt(name = "new")] + #[structopt(name = "new", version = garage_version())] New(KeyNewOpt), /// Rename key - #[structopt(name = "rename")] + #[structopt(name = "rename", version = garage_version())] Rename(KeyRenameOpt), /// Delete key - #[structopt(name = "delete")] + #[structopt(name = "delete", version = garage_version())] Delete(KeyDeleteOpt), /// Set permission flags for key - #[structopt(name = "allow")] + #[structopt(name = "allow", version = garage_version())] Allow(KeyPermOpt), /// Unset permission flags for key - #[structopt(name = "deny")] + #[structopt(name = "deny", version = garage_version())] Deny(KeyPermOpt), /// Import key - #[structopt(name = "import")] + #[structopt(name = "import", version = garage_version())] Import(KeyImportOpt), } @@ -364,7 +393,7 @@ pub struct MigrateOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum MigrateWhat { /// Migrate buckets and permissions from v0.5.0 - #[structopt(name = "buckets050")] + #[structopt(name = "buckets050", version = garage_version())] Buckets050, } @@ -385,27 +414,69 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Only do a full sync of metadata tables - #[structopt(name = "tables")] + #[structopt(name = "tables", version = garage_version())] Tables, /// Only repair (resync/rebalance) the set of stored blocks - #[structopt(name = "blocks")] + #[structopt(name = "blocks", version = garage_version())] Blocks, /// Only redo the propagation of object deletions to the version table (slow) - #[structopt(name = "versions")] + #[structopt(name = "versions", version = garage_version())] Versions, /// Only redo the propagation of version deletions to the block ref table (extremely slow) - #[structopt(name = "block_refs")] + #[structopt(name = "block_refs", version = garage_version())] BlockRefs, /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) - #[structopt(name = "scrub")] + #[structopt(name = "scrub", version = garage_version())] Scrub { - /// Tranquility factor (see tranquilizer documentation) - #[structopt(name = "tranquility", default_value = "2")] + #[structopt(subcommand)] + cmd: ScrubCmd, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum ScrubCmd { + /// Start scrub + #[structopt(name = "start", version = garage_version())] + Start, + /// Pause scrub (it will resume automatically after 24 hours) + #[structopt(name = "pause", version = garage_version())] + Pause, + /// Resume paused scrub + #[structopt(name = "resume", version = garage_version())] + Resume, + /// Cancel scrub in progress + #[structopt(name = "cancel", version = garage_version())] + Cancel, + /// Set tranquility level for in-progress and future scrubs + #[structopt(name = "set-tranquility", version = garage_version())] + SetTranquility { + #[structopt()] tranquility: u32, }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[cfg(feature = "k2v")] + #[structopt(name = "k2v_item_counters", version = garage_version())] + K2VItemCounters, + /// Repair object counters + #[structopt(name = "object_counters", version = garage_version())] + ObjectCounters, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] @@ -415,3 +486,48 @@ pub struct StatsOpt { #[structopt(short = "d", long = "detailed")] pub detailed: bool, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct WorkerOpt { + #[structopt(subcommand)] + pub cmd: WorkerCmd, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum WorkerCmd { + /// List all workers on Garage node + #[structopt(name = "list", version = garage_version())] + List { + #[structopt(flatten)] + opt: WorkerListOpt, + }, + /// Set worker parameter + #[structopt(name = "set", version = garage_version())] + Set { + #[structopt(subcommand)] + opt: WorkerSetCmd, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +pub struct WorkerListOpt { + /// Show only busy workers + #[structopt(short = "b", long = "busy")] + pub busy: bool, + /// Show only workers with errors + #[structopt(short = "e", long = "errors")] + pub errors: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum WorkerSetCmd { + /// Set tranquility of scrub operations + #[structopt(name = "scrub-tranquility", version = garage_version())] + ScrubTranquility { tranquility: u32 }, + /// Set number of concurrent block resync workers + #[structopt(name = "resync-n-workers", version = garage_version())] + ResyncNWorkers { n_workers: usize }, + /// Set tranquility of block resync operations + #[structopt(name = "resync-tranquility", version = garage_version())] + ResyncTranquility { tranquility: u32 }, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 7d496507..396938ae 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,11 +1,18 @@ use std::collections::HashMap; +use std::time::Duration; +use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::Uuid; use garage_util::error::*; +use garage_util::formater::format_table; +use garage_util::time::*; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; + +use crate::cli::structs::WorkerListOpt; pub fn print_bucket_list(bl: Vec<Bucket>) { println!("List of buckets:"); @@ -28,11 +35,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) { [((k, n), _, _)] => format!("{}:{}", k, n), s => format!("[{} local aliases]", s.len()), }; + table.push(format!( "\t{}\t{}\t{}", aliases.join(","), local_aliases_n, - hex::encode(bucket.id) + hex::encode(bucket.id), )); } format_table(table); @@ -120,7 +128,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) { } } -pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) { +pub fn print_bucket_info( + bucket: &Bucket, + relevant_keys: &HashMap<String, Key>, + counters: &HashMap<String, i64>, +) { let key_name = |k| { relevant_keys .get(k) @@ -132,7 +144,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) match &bucket.state { Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { - println!("Website access: {}", p.website_config.get().is_some()); + let size = + bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + println!( + "\nSize: {} ({})", + size.to_string_as(true), + size.to_string_as(false) + ); + println!( + "Objects: {}", + counters.get(OBJECTS).cloned().unwrap_or_default() + ); + println!( + "Unfinished multipart uploads: {}", + counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default() + ); + + println!("\nWebsite access: {}", p.website_config.get().is_some()); + + let quotas = p.quotas.get(); + if quotas.max_size.is_some() || quotas.max_objects.is_some() { + println!("\nQuotas:"); + if let Some(ms) = quotas.max_size { + let ms = bytesize::ByteSize::b(ms); + println!( + " maximum size: {} ({})", + ms.to_string_as(true), + ms.to_string_as(false) + ); + } + if let Some(mo) = quotas.max_objects { + println!(" maximum number of objects: {}", mo); + } + } println!("\nGlobal aliases:"); for (alias, _, active) in p.aliases.items().iter() { @@ -173,42 +220,13 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) }; } -pub fn format_table(data: Vec<String>) { - let data = data - .iter() - .map(|s| s.split('\t').collect::<Vec<_>>()) - .collect::<Vec<_>>(); - - let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); - let mut column_size = vec![0; columns]; - - let mut out = String::new(); - - for row in data.iter() { - for (i, col) in row.iter().enumerate() { - column_size[i] = std::cmp::max(column_size[i], col.chars().count()); - } - } - - for row in data.iter() { - for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { - out.push_str(col); - (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); - } - out.push_str(row[row.len() - 1]); - out.push('\n'); - } - - print!("{}", out); -} - pub fn find_matching_node( cand: impl std::iter::Iterator<Item = Uuid>, pattern: &str, ) -> Result<Uuid, Error> { let mut candidates = vec![]; for c in cand { - if hex::encode(&c).starts_with(&pattern) { + if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) { candidates.push(c); } } @@ -222,3 +240,56 @@ pub fn find_matching_node( Ok(candidates[0]) } } + +pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { + let mut wi = wi.into_iter().collect::<Vec<_>>(); + wi.sort_by_key(|(tid, info)| { + ( + match info.state { + WorkerState::Busy | WorkerState::Throttled(_) => 0, + WorkerState::Idle => 1, + WorkerState::Done => 2, + }, + *tid, + ) + }); + + let mut table = vec![]; + for (tid, info) in wi.iter() { + if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { + continue; + } + if wlo.errors && info.errors == 0 { + continue; + } + + table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); + if let Some(i) = &info.info { + table.push(format!("\t\t {}", i)); + } + let tf = timeago::Formatter::new(); + let (err_ago, err_msg) = info + .last_error + .as_ref() + .map(|(m, t)| { + ( + tf.convert(Duration::from_millis(now_msec() - t)), + m.as_str(), + ) + }) + .unwrap_or(("(?) ago".into(), "(?)")); + if info.consecutive_errors > 0 { + table.push(format!( + "\t\t {} consecutive errors ({} total), last {}", + info.consecutive_errors, info.errors, err_ago, + )); + table.push(format!("\t\t {}", err_msg)); + } else if info.errors > 0 { + table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); + if wlo.errors { + table.push(format!("\t\t {}", err_msg)); + } + } + } + format_table(table); +} |