From f3f27293df83986ba29fb03f8af26a2177518e20 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:07:13 +0100 Subject: Uniform framework for bg variable management --- src/garage/admin.rs | 47 ++++++++++++++++++++------------------------- src/garage/cli/cmd.rs | 3 +++ src/garage/cli/structs.rs | 21 ++++---------------- src/garage/cli/util.rs | 8 ++++++++ src/garage/repair/online.rs | 6 +++++- 5 files changed, 41 insertions(+), 44 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c669b5e6..13536c80 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -18,7 +18,6 @@ use garage_table::*; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; @@ -60,6 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), + WorkerVars(Vec<(String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,32 +943,27 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Set { opt } => match opt { - WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); - self.garage - .block_manager - .send_scrub_command(scrub_command) - .await?; - Ok(AdminRpc::Ok("Scrub tranquility updated".into())) - } - WorkerSetCmd::ResyncWorkerCount { worker_count } => { - self.garage - .block_manager - .resync - .set_n_workers(*worker_count) - .await?; - Ok(AdminRpc::Ok("Number of resync workers updated".into())) - } - WorkerSetCmd::ResyncTranquility { tranquility } => { - self.garage - .block_manager - .resync - .set_tranquility(*tranquility) - .await?; - Ok(AdminRpc::Ok("Resync tranquility updated".into())) + WorkerOperation::Get { variable } => { + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + v.clone(), + self.garage.bg_vars.get(&v)?, + )])) + } else { + Ok(AdminRpc::WorkerVars( + self.garage + .bg_vars + .get_all() + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + )) } - }, + } + WorkerOperation::Set { variable, value } => { + self.garage.bg_vars.set(&variable, &value)?; + Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) + } } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 6c5598b1..46e9113c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -191,6 +191,9 @@ pub async fn cmd_admin( AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } + AdminRpc::WorkerVars(wv) => { + print_worker_vars(wv); + } AdminRpc::WorkerInfo(tid, wi) => { print_worker_info(tid, wi); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index e2f632f3..08cb7769 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -517,12 +517,12 @@ pub enum WorkerOperation { /// Get detailed information about a worker #[structopt(name = "info", version = garage_version())] Info { tid: usize }, + /// Get worker parameter + #[structopt(name = "get", version = garage_version())] + Get { variable: Option }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] - Set { - #[structopt(subcommand)] - opt: WorkerSetCmd, - }, + Set { variable: String, value: String }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] @@ -535,19 +535,6 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerSetCmd { - /// Set tranquility of scrub operations - #[structopt(name = "scrub-tranquility", version = garage_version())] - ScrubTranquility { tranquility: u32 }, - /// Set number of concurrent block resync workers - #[structopt(name = "resync-worker-count", version = garage_version())] - ResyncWorkerCount { worker_count: usize }, - /// Set tranquility of block resync operations - #[structopt(name = "resync-tranquility", version = garage_version())] - ResyncTranquility { tranquility: u32 }, -} - #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 63fd9eba..17477bd3 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } +pub fn print_worker_vars(wv: Vec<(String, String)>) { + let table = wv + .into_iter() + .map(|(k, v)| format!("{}\t{}", k, v)) + .collect::>(); + format_table(table); +} + pub fn print_block_error_list(el: Vec) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 627e3bf3..0e14ed51 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -51,7 +51,11 @@ pub async fn launch_online_repair( ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::SetTranquility { tranquility } => { - ScrubWorkerCommand::SetTranquility(tranquility) + garage + .block_manager + .scrub_persister + .set_with(|x| x.tranquility = tranquility)?; + return Ok(()); } }; info!("Sending command to scrub worker: {:?}", cmd); -- cgit v1.2.3 From 29dbcb82780dcdb6f2a01a9da5122e70abaf93bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:25:57 +0100 Subject: bg var operation on all nodes at once --- src/garage/admin.rs | 112 ++++++++++++++++++++++++++++++++++++++-------- src/garage/cli/structs.rs | 18 +++++++- src/garage/cli/util.rs | 4 +- 3 files changed, 111 insertions(+), 23 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 13536c80..305c5c65 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -59,7 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), - WorkerVars(Vec<(String, String)>), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,27 +943,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Get { variable } => { - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - v.clone(), - self.garage.bg_vars.get(&v)?, - )])) - } else { - Ok(AdminRpc::WorkerVars( - self.garage - .bg_vars - .get_all() - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - )) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } } - WorkerOperation::Set { variable, value } => { - self.garage.bg_vars.set(&variable, &value)?; - Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), + } } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 08cb7769..661a71f0 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -519,10 +519,24 @@ pub enum WorkerOperation { Info { tid: usize }, /// Get worker parameter #[structopt(name = "get", version = garage_version())] - Get { variable: Option }, + Get { + /// Gather variable values from all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable name to get, or none to get all variables + variable: Option, + }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] - Set { variable: String, value: String }, + Set { + /// Set variable values on all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable node to set + variable: String, + /// Value to set the variable to + value: String, + }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 17477bd3..230ce3de 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -357,10 +357,10 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } -pub fn print_worker_vars(wv: Vec<(String, String)>) { +pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { let table = wv .into_iter() - .map(|(k, v)| format!("{}\t{}", k, v)) + .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) .collect::>(); format_table(table); } -- cgit v1.2.3