diff options
Diffstat (limited to 'src/garage/repair.rs')
-rw-r--r-- | src/garage/repair.rs | 83 |
1 files changed, 24 insertions, 59 deletions
diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 297ae9cd..8200f1f0 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -16,7 +16,13 @@ pub struct Repair { } impl Repair { - pub async fn repair_worker( + pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { + if let Err(e) = self.repair_worker_aux(opt, must_exit).await { + warn!("Repair worker failed with error: {}", e); + } + } + + async fn repair_worker_aux( &self, opt: RepairOpt, must_exit: watch::Receiver<bool>, @@ -25,41 +31,11 @@ impl Repair { if todo(RepairWhat::Tables) { info!("Launching a full sync of tables"); - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .key_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); } // TODO: wait for full sync to finish before proceeding to the rest? @@ -93,11 +69,13 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + while let Some((item_key, item_bytes)) = + self.garage.version_table.data.store.get_gt(&pos)? + { pos = item_key.to_vec(); let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; - if version.deleted { + if version.deleted.get() { continue; } let object = self @@ -110,13 +88,7 @@ impl Repair { .versions() .iter() .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => { - warn!( - "Repair versions: object for version {:?} not found, skipping.", - version - ); - continue; - } + None => false, }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); @@ -127,7 +99,6 @@ impl Repair { version.bucket, version.key, true, - vec![], )) .await?; } @@ -142,11 +113,13 @@ impl Repair { async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + while let Some((item_key, item_bytes)) = + self.garage.block_ref_table.data.store.get_gt(&pos)? + { pos = item_key.to_vec(); let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; - if block_ref.deleted { + if block_ref.deleted.get() { continue; } let version = self @@ -154,16 +127,8 @@ impl Repair { .version_table .get(&block_ref.version, &EmptyKey) .await?; - let ref_exists = match version { - Some(v) => !v.deleted, - None => { - warn!( - "Block ref repair: version for block ref {:?} not found, skipping.", - block_ref - ); - continue; - } - }; + // The version might not exist if it has been GC'ed + let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", @@ -174,7 +139,7 @@ impl Repair { .insert(&BlockRef { block: block_ref.block, version: block_ref.version, - deleted: true, + deleted: true.into(), }) .await?; } |