aboutsummaryrefslogtreecommitdiff
path: root/src/garage/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/repair.rs')
-rw-r--r--src/garage/repair.rs83
1 files changed, 24 insertions, 59 deletions
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 297ae9cd..8200f1f0 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -16,7 +16,13 @@ pub struct Repair {
}
impl Repair {
- pub async fn repair_worker(
+ pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
+ if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
+ warn!("Repair worker failed with error: {}", e);
+ }
+ }
+
+ async fn repair_worker_aux(
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
@@ -25,41 +31,11 @@ impl Repair {
if todo(RepairWhat::Tables) {
info!("Launching a full sync of tables");
- self.garage
- .bucket_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .object_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .version_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .block_ref_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .key_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
+ self.garage.bucket_table.syncer.add_full_sync();
+ self.garage.object_table.syncer.add_full_sync();
+ self.garage.version_table.syncer.add_full_sync();
+ self.garage.block_ref_table.syncer.add_full_sync();
+ self.garage.key_table.syncer.add_full_sync();
}
// TODO: wait for full sync to finish before proceeding to the rest?
@@ -93,11 +69,13 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
+ while let Some((item_key, item_bytes)) =
+ self.garage.version_table.data.store.get_gt(&pos)?
+ {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
- if version.deleted {
+ if version.deleted.get() {
continue;
}
let object = self
@@ -110,13 +88,7 @@ impl Repair {
.versions()
.iter()
.any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => {
- warn!(
- "Repair versions: object for version {:?} not found, skipping.",
- version
- );
- continue;
- }
+ None => false,
};
if !version_exists {
info!("Repair versions: marking version as deleted: {:?}", version);
@@ -127,7 +99,6 @@ impl Repair {
version.bucket,
version.key,
true,
- vec![],
))
.await?;
}
@@ -142,11 +113,13 @@ impl Repair {
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
+ while let Some((item_key, item_bytes)) =
+ self.garage.block_ref_table.data.store.get_gt(&pos)?
+ {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
- if block_ref.deleted {
+ if block_ref.deleted.get() {
continue;
}
let version = self
@@ -154,16 +127,8 @@ impl Repair {
.version_table
.get(&block_ref.version, &EmptyKey)
.await?;
- let ref_exists = match version {
- Some(v) => !v.deleted,
- None => {
- warn!(
- "Block ref repair: version for block ref {:?} not found, skipping.",
- block_ref
- );
- continue;
- }
- };
+ // The version might not exist if it has been GC'ed
+ let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
@@ -174,7 +139,7 @@ impl Repair {
.insert(&BlockRef {
block: block_ref.block,
version: block_ref.version,
- deleted: true,
+ deleted: true.into(),
})
.await?;
}