From f3f27293df83986ba29fb03f8af26a2177518e20 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:07:13 +0100 Subject: Uniform framework for bg variable management --- src/garage/admin.rs | 47 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 26 deletions(-) (limited to 'src/garage/admin.rs') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c669b5e6..13536c80 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -18,7 +18,6 @@ use garage_table::*; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; @@ -60,6 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), + WorkerVars(Vec<(String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,32 +943,27 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Set { opt } => match opt { - WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); - self.garage - .block_manager - .send_scrub_command(scrub_command) - .await?; - Ok(AdminRpc::Ok("Scrub tranquility updated".into())) - } - WorkerSetCmd::ResyncWorkerCount { worker_count } => { - self.garage - .block_manager - .resync - .set_n_workers(*worker_count) - .await?; - Ok(AdminRpc::Ok("Number of resync workers updated".into())) - } - WorkerSetCmd::ResyncTranquility { tranquility } => { - self.garage - .block_manager - .resync - .set_tranquility(*tranquility) - .await?; - Ok(AdminRpc::Ok("Resync tranquility updated".into())) + WorkerOperation::Get { variable } => { + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + v.clone(), + self.garage.bg_vars.get(&v)?, + )])) + } else { + Ok(AdminRpc::WorkerVars( + self.garage + .bg_vars + .get_all() + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + )) } - }, + } + WorkerOperation::Set { variable, value } => { + self.garage.bg_vars.set(&variable, &value)?; + Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) + } } } -- cgit v1.2.3 From 29dbcb82780dcdb6f2a01a9da5122e70abaf93bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jan 2023 13:25:57 +0100 Subject: bg var operation on all nodes at once --- src/garage/admin.rs | 112 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 19 deletions(-) (limited to 'src/garage/admin.rs') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 13536c80..305c5c65 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -59,7 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), - WorkerVars(Vec<(String, String)>), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,27 +943,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Get { variable } => { - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - v.clone(), - self.garage.bg_vars.get(&v)?, - )])) - } else { - Ok(AdminRpc::WorkerVars( - self.garage - .bg_vars - .get_all() - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - )) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } } - WorkerOperation::Set { variable, value } => { - self.garage.bg_vars.set(&variable, &value)?; - Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), + } } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } -- cgit v1.2.3