diff options
author | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
commit | 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch) | |
tree | 256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/garage/cli | |
parent | aab34bfe5415e9584432bf32e29a151dc5af9ebd (diff) | |
download | garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip |
Background task manager (#332)
- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
- [x] Merkle updater: several items per iteration
- [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
- [x] have only one worker with a channel to start/pause/cancel
- [x] automatic scrub
- [x] ability to view and change tranquility from CLI
- [x] persistence of a few info
- [ ] Testing
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/garage/cli')
-rw-r--r-- | src/garage/cli/cmd.rs | 8 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 55 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 58 |
3 files changed, 118 insertions, 3 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 3a0bd956..1aa2c2ff 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use garage_util::error::*; use garage_util::formater::format_table; @@ -39,6 +40,7 @@ pub async fn cli_command_dispatch( cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, _ => unreachable!(), } } @@ -100,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; for adv in status.iter().filter(|adv| !adv.is_up) { if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { + let tf = timeago::Formatter::new(); failed_nodes.push(format!( "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", id = adv.id, @@ -110,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> capacity = cfg.capacity_string(), last_seen = adv .last_seen_secs_ago - .map(|s| format!("{}s ago", s)) + .map(|s| tf.convert(Duration::from_secs(s))) .unwrap_or_else(|| "never seen".into()), )); } @@ -182,6 +185,9 @@ pub async fn cmd_admin( AdminRpc::KeyInfo(key, rb) => { print_key_info(&key, &rb); } + AdminRpc::WorkerList(wi, wlo) => { + print_worker_info(wi, wlo); + } r => { error!("Unexpected response: {:?}", r); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4f2efe19..bc44b5ef 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -45,6 +45,10 @@ pub enum Command { /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), + + /// Manage background workers + #[structopt(name = "worker")] + Worker(WorkerOpt), } #[derive(StructOpt, Debug)] @@ -423,8 +427,29 @@ pub enum RepairWhat { /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) #[structopt(name = "scrub")] Scrub { - /// Tranquility factor (see tranquilizer documentation) - #[structopt(name = "tranquility", default_value = "2")] + #[structopt(subcommand)] + cmd: ScrubCmd, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum ScrubCmd { + /// Start scrub + #[structopt(name = "start")] + Start, + /// Pause scrub (it will resume automatically after 24 hours) + #[structopt(name = "pause")] + Pause, + /// Resume paused scrub + #[structopt(name = "resume")] + Resume, + /// Cancel scrub in progress + #[structopt(name = "cancel")] + Cancel, + /// Set tranquility level for in-progress and future scrubs + #[structopt(name = "set-tranquility")] + SetTranquility { + #[structopt()] tranquility: u32, }, } @@ -460,3 +485,29 @@ pub struct StatsOpt { #[structopt(short = "d", long = "detailed")] pub detailed: bool, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct WorkerOpt { + #[structopt(subcommand)] + pub cmd: WorkerCmd, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum WorkerCmd { + /// List all workers on Garage node + #[structopt(name = "list")] + List { + #[structopt(flatten)] + opt: WorkerListOpt, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +pub struct WorkerListOpt { + /// Show only busy workers + #[structopt(short = "b", long = "busy")] + pub busy: bool, + /// Show only workers with errors + #[structopt(short = "e", long = "errors")] + pub errors: bool, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 329e8a3e..396938ae 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,14 +1,19 @@ use std::collections::HashMap; +use std::time::Duration; +use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::Uuid; use garage_util::error::*; use garage_util::formater::format_table; +use garage_util::time::*; use garage_model::bucket_table::*; use garage_model::key_table::*; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use crate::cli::structs::WorkerListOpt; + pub fn print_bucket_list(bl: Vec<Bucket>) { println!("List of buckets:"); @@ -235,3 +240,56 @@ pub fn find_matching_node( Ok(candidates[0]) } } + +pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { + let mut wi = wi.into_iter().collect::<Vec<_>>(); + wi.sort_by_key(|(tid, info)| { + ( + match info.state { + WorkerState::Busy | WorkerState::Throttled(_) => 0, + WorkerState::Idle => 1, + WorkerState::Done => 2, + }, + *tid, + ) + }); + + let mut table = vec![]; + for (tid, info) in wi.iter() { + if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { + continue; + } + if wlo.errors && info.errors == 0 { + continue; + } + + table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); + if let Some(i) = &info.info { + table.push(format!("\t\t {}", i)); + } + let tf = timeago::Formatter::new(); + let (err_ago, err_msg) = info + .last_error + .as_ref() + .map(|(m, t)| { + ( + tf.convert(Duration::from_millis(now_msec() - t)), + m.as_str(), + ) + }) + .unwrap_or(("(?) ago".into(), "(?)")); + if info.consecutive_errors > 0 { + table.push(format!( + "\t\t {} consecutive errors ({} total), last {}", + info.consecutive_errors, info.errors, err_ago, + )); + table.push(format!("\t\t {}", err_msg)); + } else if info.errors > 0 { + table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); + if wlo.errors { + table.push(format!("\t\t {}", err_msg)); + } + } + } + format_table(table); +} |