diff options
author | Alex Auvolat <alex@adnab.me> | 2023-05-04 10:36:48 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-06-09 16:23:37 +0200 |
commit | 058518c22b701d5d2dc3e838518d88ce9a4cc875 (patch) | |
tree | 45f3ac653884ab65893d90da43ddc8444cf9f127 | |
parent | 8644376ac2dd8015e9212c19f30df63811426e1c (diff) | |
download | garage-058518c22b701d5d2dc3e838518d88ce9a4cc875.tar.gz garage-058518c22b701d5d2dc3e838518d88ce9a4cc875.zip |
refactor repair workers with a trait
-rw-r--r-- | src/garage/repair/online.rs | 149 |
1 files changed, 81 insertions, 68 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9b6b3cad..6d8a91fe 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,15 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +36,11 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,49 +71,100 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + async fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> Result<bool, Error>; +} + +struct TableRepairWorker<T: TableRepair> { garage: Arc<Garage>, pos: Vec<u8>, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc<Garage>) -> Self { +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl<R: TableRepair> Worker for TableRepairWorker<R> { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { if !version.deleted.get() { let version_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => { - let object = self.garage.object_table.get(&bucket_id, &key).await?; + let object = garage.object_table.get(&bucket_id, &key).await?; match object { Some(o) => o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted @@ -118,7 +173,7 @@ impl Worker for RepairVersionsWorker { } } VersionBacklink::MultipartUpload { upload_id } => { - let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; match mpu { Some(u) => !u.deleted.get(), None => false, @@ -127,69 +182,33 @@ impl Worker for RepairVersionsWorker { }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); - self.garage + garage .version_table .insert(&Version::new(version.uuid, version.backlink, true)) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } // ---- -struct RepairBlockrefsWorker { - garage: Arc<Garage>, - pos: Vec<u8>, - counter: usize, -} - -impl RepairBlockrefsWorker { - fn new(garage: Arc<Garage>) -> Self { - Self { - garage, - pos: vec![], - counter: 0, - } - } -} +struct RepairBlockRefs; #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result<bool, Error> { if !block_ref.deleted.get() { - let version = self - .garage + let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; @@ -200,7 +219,7 @@ impl Worker for RepairBlockrefsWorker { "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage + garage .block_ref_table .insert(&BlockRef { block: block_ref.block, @@ -208,16 +227,10 @@ impl Worker for RepairBlockrefsWorker { deleted: true.into(), }) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } |