From 29dbcb82780dcdb6f2a01a9da5122e70abaf93bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:25:57 +0100 Subject: bg var operation on all nodes at once --- src/garage/admin.rs | 112 ++++++++++++++++++++++++++++++++++++-------- src/garage/cli/structs.rs | 18 ++++++- src/garage/cli/util.rs | 4 +- src/util/background/vars.rs | 6 +++ 4 files changed, 117 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 13536c80..305c5c65 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -59,7 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), - WorkerVars(Vec<(String, String)>), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,27 +943,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Get { variable } => { - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - v.clone(), - self.garage.bg_vars.get(&v)?, - )])) - } else { - Ok(AdminRpc::WorkerVars( - self.garage - .bg_vars - .get_all() - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - )) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } } - WorkerOperation::Set { variable, value } => { - self.garage.bg_vars.set(&variable, &value)?; - Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), + } } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 08cb7769..661a71f0 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -519,10 +519,24 @@ pub enum WorkerOperation { Info { tid: usize }, /// Get worker parameter #[structopt(name = "get", version = garage_version())] - Get { variable: Option }, + Get { + /// Gather variable values from all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable name to get, or none to get all variables + variable: Option, + }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] - Set { variable: String, value: String }, + Set { + /// Set variable values on all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable node to set + variable: String, + /// Value to set the variable to + value: String, + }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 17477bd3..230ce3de 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -357,10 +357,10 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } -pub fn print_worker_vars(wv: Vec<(String, String)>) { +pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { let table = wv .into_iter() - .map(|(k, v)| format!("{}\t{}", k, v)) + .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) .collect::>(); format_table(table); } diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs index fe54268e..7a449c95 100644 --- a/src/util/background/vars.rs +++ b/src/util/background/vars.rs @@ -71,6 +71,12 @@ impl BgVars { } } +impl Default for BgVars { + fn default() -> Self { + Self::new() + } +} + // ---- trait BgVarTrait: Send + Sync + 'static { -- cgit v1.2.3