aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/admin.rs119
-rw-r--r--src/garage/cli/cmd.rs3
-rw-r--r--src/garage/cli/structs.rs31
-rw-r--r--src/garage/cli/util.rs8
-rw-r--r--src/garage/repair/online.rs6
5 files changed, 126 insertions, 41 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c669b5e6..305c5c65 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -18,7 +18,6 @@ use garage_table::*;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
-use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
@@ -60,6 +59,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
+ WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@@ -943,32 +943,101 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerOperation::Set { opt } => match opt {
- WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
- self.garage
- .block_manager
- .send_scrub_command(scrub_command)
- .await?;
- Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
- }
- WorkerSetCmd::ResyncWorkerCount { worker_count } => {
- self.garage
- .block_manager
- .resync
- .set_n_workers(*worker_count)
- .await?;
- Ok(AdminRpc::Ok("Number of resync workers updated".into()))
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- WorkerSetCmd::ResyncTranquility { tranquility } => {
- self.garage
- .block_manager
- .resync
- .set_tranquility(*tranquility)
- .await?;
- Ok(AdminRpc::Ok("Resync tranquility updated".into()))
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- },
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 6c5598b1..46e9113c 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -191,6 +191,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
+ AdminRpc::WorkerVars(wv) => {
+ print_worker_vars(wv);
+ }
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index e2f632f3..661a71f0 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -517,11 +517,25 @@ pub enum WorkerOperation {
/// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())]
Info { tid: usize },
+ /// Get worker parameter
+ #[structopt(name = "get", version = garage_version())]
+ Get {
+ /// Gather variable values from all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable name to get, or none to get all variables
+ variable: Option<String>,
+ },
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
- #[structopt(subcommand)]
- opt: WorkerSetCmd,
+ /// Set variable values on all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable node to set
+ variable: String,
+ /// Value to set the variable to
+ value: String,
},
}
@@ -536,19 +550,6 @@ pub struct WorkerListOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerSetCmd {
- /// Set tranquility of scrub operations
- #[structopt(name = "scrub-tranquility", version = garage_version())]
- ScrubTranquility { tranquility: u32 },
- /// Set number of concurrent block resync workers
- #[structopt(name = "resync-worker-count", version = garage_version())]
- ResyncWorkerCount { worker_count: usize },
- /// Set tranquility of block resync operations
- #[structopt(name = "resync-tranquility", version = garage_version())]
- ResyncTranquility { tranquility: u32 },
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 63fd9eba..230ce3de 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
+pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
+ let table = wv
+ .into_iter()
+ .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
+ .collect::<Vec<_>>();
+ format_table(table);
+}
+
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 627e3bf3..0e14ed51 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
- ScrubWorkerCommand::SetTranquility(tranquility)
+ garage
+ .block_manager
+ .scrub_persister
+ .set_with(|x| x.tranquility = tranquility)?;
+ return Ok(());
}
};
info!("Sending command to scrub worker: {:?}", cmd);