diff options
Diffstat (limited to 'src/admin_rpc.rs')
-rw-r--r-- | src/admin_rpc.rs | 123 |
1 files changed, 79 insertions, 44 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 412c44b7..8228eaf8 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -21,7 +21,7 @@ pub const ADMIN_RPC_PATH: &str = "_admin"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRPC { BucketOperation(BucketOperation), - LaunchRepair(bool), + LaunchRepair(RepairOpt), // Replies Ok(String), @@ -48,9 +48,7 @@ impl AdminRpcHandler { async move { match msg { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRPC::LaunchRepair(repair_all) => { - self2.handle_launch_repair(repair_all).await - } + AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, _ => Err(Error::Message(format!("Invalid RPC"))), } } @@ -155,14 +153,26 @@ impl AdminRpcHandler { } } - async fn handle_launch_repair(self: &Arc<Self>, repair_all: bool) -> Result<AdminRPC, Error> { - if repair_all { + async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { + if !opt.yes { + return Err(Error::Message(format!( + "Please provide the --yes flag to initiate repair operations." + ))); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { if self .rpc_client - .call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT) + .call( + node, + AdminRPC::LaunchRepair(opt_to_send.clone()), + ADMIN_RPC_TIMEOUT, + ) .await .is_err() { @@ -183,7 +193,7 @@ impl AdminRpcHandler { .system .background .spawn_worker("Repair worker".into(), move |must_exit| async move { - self2.repair_worker(must_exit).await + self2.repair_worker(opt, must_exit).await }) .await; Ok(AdminRPC::Ok(format!( @@ -193,45 +203,70 @@ impl AdminRpcHandler { } } - async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + async fn repair_worker( + self: Arc<Self>, + opt: RepairOpt, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); + + if todo(RepairWhat::Tables) { + info!("Launching a full sync of tables"); + self.garage + .bucket_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .object_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .version_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .block_ref_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + } // TODO: wait for full sync to finish before proceeding to the rest? - self.repair_versions(&must_exit).await?; - self.repair_block_ref(&must_exit).await?; - self.repair_rc(&must_exit).await?; - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; + if todo(RepairWhat::Versions) { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + + if todo(RepairWhat::BlockRefs) { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + + if opt.what.is_none() { + info!("Repairing the RC"); + self.repair_rc(&must_exit).await?; + } + + if todo(RepairWhat::Blocks) { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + Ok(()) } |