diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-21 16:40:17 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-21 16:40:17 +0000 |
commit | b1ddb933b09fa30e0e19e2a545e3000096a9466f (patch) | |
tree | 6a9d72d1deae75ca8d0b2450a76ec23bb508e265 /src | |
parent | a04218047ece73aff8ef0647ae55d3f496f709f3 (diff) | |
download | garage-b1ddb933b09fa30e0e19e2a545e3000096a9466f.tar.gz garage-b1ddb933b09fa30e0e19e2a545e3000096a9466f.zip |
Make the repair command accept subcommands to not do everything all the time
Diffstat (limited to 'src')
-rw-r--r-- | src/admin_rpc.rs | 123 | ||||
-rw-r--r-- | src/main.rs | 31 |
2 files changed, 106 insertions, 48 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 412c44b7..8228eaf8 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -21,7 +21,7 @@ pub const ADMIN_RPC_PATH: &str = "_admin"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRPC { BucketOperation(BucketOperation), - LaunchRepair(bool), + LaunchRepair(RepairOpt), // Replies Ok(String), @@ -48,9 +48,7 @@ impl AdminRpcHandler { async move { match msg { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRPC::LaunchRepair(repair_all) => { - self2.handle_launch_repair(repair_all).await - } + AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, _ => Err(Error::Message(format!("Invalid RPC"))), } } @@ -155,14 +153,26 @@ impl AdminRpcHandler { } } - async fn handle_launch_repair(self: &Arc<Self>, repair_all: bool) -> Result<AdminRPC, Error> { - if repair_all { + async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { + if !opt.yes { + return Err(Error::Message(format!( + "Please provide the --yes flag to initiate repair operations." + ))); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { if self .rpc_client - .call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT) + .call( + node, + AdminRPC::LaunchRepair(opt_to_send.clone()), + ADMIN_RPC_TIMEOUT, + ) .await .is_err() { @@ -183,7 +193,7 @@ impl AdminRpcHandler { .system .background .spawn_worker("Repair worker".into(), move |must_exit| async move { - self2.repair_worker(must_exit).await + self2.repair_worker(opt, must_exit).await }) .await; Ok(AdminRPC::Ok(format!( @@ -193,45 +203,70 @@ impl AdminRpcHandler { } } - async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + async fn repair_worker( + self: Arc<Self>, + opt: RepairOpt, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); + + if todo(RepairWhat::Tables) { + info!("Launching a full sync of tables"); + self.garage + .bucket_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .object_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .version_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .block_ref_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + } // TODO: wait for full sync to finish before proceeding to the rest? - self.repair_versions(&must_exit).await?; - self.repair_block_ref(&must_exit).await?; - self.repair_rc(&must_exit).await?; - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; + if todo(RepairWhat::Versions) { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + + if todo(RepairWhat::BlockRefs) { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + + if opt.what.is_none() { + info!("Repairing the RC"); + self.repair_rc(&must_exit).await?; + } + + if todo(RepairWhat::Blocks) { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 01972928..6ecf1024 100644 --- a/src/main.rs +++ b/src/main.rs @@ -188,11 +188,34 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes - #[structopt(long = "all")] - pub all: bool, + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: Option<RepairWhat>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum RepairWhat { + /// Only do a full sync of metadata tables + #[structopt(name = "tables")] + Tables, + /// Only repair (resync/rebalance) the set of stored blocks + #[structopt(name = "blocks")] + Blocks, + /// Only redo the propagation of object deletions to the version table (slow) + #[structopt(name = "versions")] + Versions, + /// Only redo the propagation of version deletions to the block ref table (extremely slow) + #[structopt(name = "block_refs")] + BlockRefs, } #[tokio::main] @@ -241,7 +264,7 @@ async fn main() { cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await } Command::Repair(ro) => { - cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro.all)).await + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await } }; |