diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/admin.rs | 12 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 228 |
2 files changed, 143 insertions, 97 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 48914655..c9783e54 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,7 +24,7 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::online::OnlineRepair; +use crate::repair::online::launch_online_repair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; @@ -693,15 +693,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = OnlineRepair { - garage: self.garage.clone(), - }; - self.garage - .system - .background - .spawn_worker("Repair worker".into(), move |must_exit| async move { - repair.repair_worker(opt, must_exit).await - }); + launch_online_repair(self.garage.clone(), opt)?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index d6a71742..e6fcd705 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_trait::async_trait; use tokio::sync::watch; use garage_model::garage::Garage; @@ -7,83 +8,103 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; +use garage_util::background::*; use garage_util::error::Error; use crate::*; -pub struct OnlineRepair { - pub garage: Arc<Garage>, -} - -impl OnlineRepair { - pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { - if let Err(e) = self.repair_worker_aux(opt, must_exit).await { - warn!("Repair worker failed with error: {}", e); +pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> { + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync(); + garage.object_table.syncer.add_full_sync(); + garage.version_table.syncer.add_full_sync(); + garage.block_ref_table.syncer.add_full_sync(); + garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + garage + .background + .spawn_worker(RepairVersionsWorker::new(garage.clone())); + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + garage + .background + .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + } + RepairWhat::Blocks => { + unimplemented!() + /* + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + */ + } + RepairWhat::Scrub { tranquility } => { + unimplemented!() + /* + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, tranquility) + .await?; + */ } } + Ok(()) +} - async fn repair_worker_aux( - &self, - opt: RepairOpt, - must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - RepairWhat::Scrub { tranquility } => { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .scrub_data_store(&must_exit, tranquility) - .await?; - } +// ---- + +struct RepairVersionsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + iter: usize, +} + +impl RepairVersionsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + iter: 0, } - Ok(()) } +} - async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; +#[async_trait] +impl Worker for RepairVersionsWorker { + fn name(&self) -> String { + "Version repair worker".into() + } - i += 1; - if i % 1000 == 0 { - info!("repair_versions: {}", i); + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v } - - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; - if version.deleted.get() { - continue; + None => { + info!("repair_versions: finished, done {}", self.iter); + return Ok(WorkerStatus::Done); } + }; + + self.iter += 1; + if self.iter % 1000 == 0 { + info!("repair_versions: {}", self.iter); + } + + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + if !version.deleted.get() { let object = self .garage .object_table @@ -109,32 +130,61 @@ impl OnlineRepair { .await?; } } - info!("repair_versions: finished, done {}", i); - Ok(()) + + Ok(WorkerStatus::Busy) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - let mut i = 0; - - while !*must_exit.borrow() { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? { - Some((k, v)) => { - pos = k; - v - } - None => break, - }; + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() + } +} - i += 1; - if i % 1000 == 0 { - info!("repair_block_ref: {}", i); - } +// ---- + +struct RepairBlockrefsWorker { + garage: Arc<Garage>, + pos: Vec<u8>, + iter: usize, +} + +impl RepairBlockrefsWorker { + fn new(garage: Arc<Garage>) -> Self { + Self { + garage, + pos: vec![], + iter: 0, + } + } +} - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; - if block_ref.deleted.get() { - continue; +#[async_trait] +impl Worker for RepairBlockrefsWorker { + fn name(&self) -> String { + "Block refs repair worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => { + self.pos = k; + v + } + None => { + info!("repair_block_ref: finished, done {}", self.iter); + return Ok(WorkerStatus::Done); } + }; + + self.iter += 1; + if self.iter % 1000 == 0 { + info!("repair_block_ref: {}", self.iter); + } + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + if !block_ref.deleted.get() { let version = self .garage .version_table @@ -157,7 +207,11 @@ impl OnlineRepair { .await?; } } - info!("repair_block_ref: finished, done {}", i); - Ok(()) + + Ok(WorkerStatus::Busy) + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() } } |