diff options
author | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
commit | 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch) | |
tree | 256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/garage | |
parent | aab34bfe5415e9584432bf32e29a151dc5af9ebd (diff) | |
download | garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip |
Background task manager (#332)
- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
- [x] Merkle updater: several items per iteration
- [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
- [x] have only one worker with a channel to start/pause/cancel
- [x] automatic scrub
- [x] ability to view and change tranquility from CLI
- [x] persistence of a few info
- [ ] Testing
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 2 | ||||
-rw-r--r-- | src/garage/admin.rs | 29 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 8 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 55 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 58 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 222 |
6 files changed, 276 insertions, 98 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 640e6975..8948e750 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -23,6 +23,7 @@ path = "tests/lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } garage_api = { version = "0.7.0", path = "../api" } +garage_block = { version = "0.7.0", path = "../block" } garage_model = { version = "0.7.0", path = "../model" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } @@ -31,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" } bytes = "1.0" bytesize = "1.1" +timeago = "0.3" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 48914655..71ee608c 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,7 +24,7 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::online::OnlineRepair; +use crate::repair::online::launch_online_repair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; @@ -36,6 +36,7 @@ pub enum AdminRpc { LaunchRepair(RepairOpt), Migrate(MigrateOpt), Stats(StatsOpt), + Worker(WorkerOpt), // Replies Ok(String), @@ -47,6 +48,10 @@ pub enum AdminRpc { }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), + WorkerList( + HashMap<usize, garage_util::background::WorkerInfo>, + WorkerListOpt, + ), } impl Rpc for AdminRpc { @@ -693,15 +698,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = OnlineRepair { - garage: self.garage.clone(), - }; - self.garage - .system - .background - .spawn_worker("Repair worker".into(), move |must_exit| async move { - repair.repair_worker(opt, must_exit).await - }); + launch_online_repair(self.garage.clone(), opt).await; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -830,6 +827,17 @@ impl AdminRpcHandler { Ok(()) } + + // ---- + + async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> { + match opt.cmd { + WorkerCmd::List { opt } => { + let workers = self.garage.background.get_worker_info(); + Ok(AdminRpc::WorkerList(workers, opt)) + } + } + } } #[async_trait] @@ -845,6 +853,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 3a0bd956..1aa2c2ff 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use garage_util::error::*; use garage_util::formater::format_table; @@ -39,6 +40,7 @@ pub async fn cli_command_dispatch( cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await } Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, + Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, _ => unreachable!(), } } @@ -100,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; for adv in status.iter().filter(|adv| !adv.is_up) { if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { + let tf = timeago::Formatter::new(); failed_nodes.push(format!( "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", id = adv.id, @@ -110,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> capacity = cfg.capacity_string(), last_seen = adv .last_seen_secs_ago - .map(|s| format!("{}s ago", s)) + .map(|s| tf.convert(Duration::from_secs(s))) .unwrap_or_else(|| "never seen".into()), )); } @@ -182,6 +185,9 @@ pub async fn cmd_admin( AdminRpc::KeyInfo(key, rb) => { print_key_info(&key, &rb); } + AdminRpc::WorkerList(wi, wlo) => { + print_worker_info(wi, wlo); + } r => { error!("Unexpected response: {:?}", r); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4f2efe19..bc44b5ef 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -45,6 +45,10 @@ pub enum Command { /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), + + /// Manage background workers + #[structopt(name = "worker")] + Worker(WorkerOpt), } #[derive(StructOpt, Debug)] @@ -423,8 +427,29 @@ pub enum RepairWhat { /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) #[structopt(name = "scrub")] Scrub { - /// Tranquility factor (see tranquilizer documentation) - #[structopt(name = "tranquility", default_value = "2")] + #[structopt(subcommand)] + cmd: ScrubCmd, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum ScrubCmd { + /// Start scrub + #[structopt(name = "start")] + Start, + /// Pause scrub (it will resume automatically after 24 hours) + #[structopt(name = "pause")] + Pause, + /// Resume paused scrub + #[structopt(name = "resume")] + Resume, + /// Cancel scrub in progress + #[structopt(name = "cancel")] + Cancel, + /// Set tranquility level for in-progress and future scrubs + #[structopt(name = "set-tranquility")] + SetTranquility { + #[structopt()] tranquility: u32, }, } @@ -460,3 +485,29 @@ pub struct StatsOpt { #[structopt(short = "d", long = "detailed")] pub detailed: bool, } + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct WorkerOpt { + #[structopt(subcommand)] + pub cmd: WorkerCmd, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum WorkerCmd { + /// List all workers on Garage node + #[structopt(name = "list")] + List { + #[structopt(flatten)] + opt: WorkerListOpt, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +pub struct WorkerListOpt { + /// Show only busy workers + #[structopt(short = "b", long = "busy")] + pub busy: bool, + /// Show only workers with errors + #[structopt(short = "e", long = "errors")] + pub errors: bool, +} diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 329e8a3e..396938ae 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,14 +1,19 @@ use std::collections::HashMap; +use std::time::Duration; +use garage_util::background::*; use garage_util::crdt::*; use garage_util::data::Uuid; use garage_util::error::*; use garage_util::formater::format_table; +use garage_util::time::*; use garage_model::bucket_table::*; use garage_model::key_table::*; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use crate::cli::structs::WorkerListOpt; + pub fn print_bucket_list(bl: Vec<Bucket>) { println!("List of buckets:"); @@ -235,3 +240,56 @@ pub fn find_matching_node( Ok(candidates[0]) } } + +pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { + let mut wi = wi.into_iter().collect::<Vec<_>>(); + wi.sort_by_key(|(tid, info)| { + ( + match info.state { + WorkerState::Busy | WorkerState::Throttled(_) => 0, + WorkerState::Idle => 1, + WorkerState::Done => 2, + }, + *tid, + ) + }); + + let mut table = vec![]; + for (tid, info) in wi.iter() { + if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { + continue; + } + if wlo.errors && info.errors == 0 { + continue; + } + + table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); + if let Some(i) = &info.info { + table.push(format!("\t\t {}", i)); + } + let tf = timeago::Formatter::new(); + let (err_ago, err_msg) = info + .last_error + .as_ref() + .map(|(m, t)| { + ( + tf.convert(Duration::from_millis(now_msec() - t)), + m.as_str(), + ) + }) + .unwrap_or(("(?) ago".into(), "(?)")); + if info.consecutive_errors > 0 { + table.push(format!( + "\t\t {} consecutive errors ({} total), last {}", + info.consecutive_errors, info.errors, err_ago, + )); + table.push(format!("\t\t {}", err_msg)); + } else if info.errors > 0 { + table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); + if wlo.errors { + table.push(format!("\t\t {}", err_msg)); + } + } + } + format_table(table); +} diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index d6a71742..e33cf097 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,89 +1,110 @@ use std::sync::Arc; +use std::time::Duration; +use async_trait::async_trait; use tokio::sync::watch; +use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; +use garage_util::background::*; use garage_util::error::Error; use crate::*; -pub struct OnlineRepair { - pub garage: Arc<Garage>, -} - -impl OnlineRepair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); +pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync(); + garage.object_table.syncer.add_full_sync(); + garage.version_table.syncer.add_full_sync(); + garage.block_ref_table.syncer.add_full_sync(); + garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + garage + .background + .spawn_worker(RepairVersionsWorker::new(garage.clone())); + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + garage + .background + .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + garage + .background + .spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairWhat::Scrub { cmd } => { + let cmd = match cmd { + ScrubCmd::Start => ScrubWorkerCommand::Start, + ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), + ScrubCmd::Resume => ScrubWorkerCommand::Resume, + ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, + ScrubCmd::SetTranquility { tranquility } => { + ScrubWorkerCommand::SetTranquility(tranquility) + } + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await; } } +} - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } +// ---- + +struct RepairVersionsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, +} + +impl RepairVersionsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + counter: 0, } - Ok(()) } +} - async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; +#[async_trait] +impl Worker for RepairVersionsWorker { + fn name(&self) -> String { + "Version repair worker".into() + } - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; + fn info(&self) -> Option<String> { + Some(format!("{} items done", self.counter)) + } - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; + None => { + info!("repair_versions: finished, done {}", self.counter); + return Ok(WorkerState::Done); } + }; + + self.counter += 1; + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if !version.deleted.get() { let object = self .garage .object_table @@ -109,32 +130,59 @@ impl OnlineRepair { .await?; } } - info!("repair_versions: finished, done {}", i); - Ok(()) + + Ok(WorkerState::Busy) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + unreachable!() + } +} - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; +// ---- - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } +struct RepairBlockrefsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, +} - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; +impl RepairBlockrefsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + counter: 0, + } + } +} + +#[async_trait] +impl Worker for RepairBlockrefsWorker { + fn name(&self) -> String { + "Block refs repair worker".into() + } + + fn info(&self) -> Option<String> { + Some(format!("{} items done", self.counter)) + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } + None => { + info!("repair_block_ref: finished, done {}", self.counter); + return Ok(WorkerState::Done); + } + }; + + self.counter += 1; + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if !block_ref.deleted.get() { let version = self .garage .version_table @@ -157,7 +205,11 @@ impl OnlineRepair { .await?; } } - info!("repair_block_ref: finished, done {}", i); - Ok(()) + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + unreachable!() } } |