aboutsummaryrefslogtreecommitdiff
path: root/src/garage/repair/online.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-19 11:04:20 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-19 13:00:24 +0100
commitfa92046a5d22401b518d3604673190ffa34c2728 (patch)
tree124216480e2e22b782fb8318d997ac77bd79c17d /src/garage/repair/online.rs
parent0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (diff)
downloadgarage-fa92046a5d22401b518d3604673190ffa34c2728.tar.gz
garage-fa92046a5d22401b518d3604673190ffa34c2728.zip
[block-ref-repair] Block refcount recalculation and repairblock-ref-repair
- We always recalculate the reference count of a block before deleting it locally, to make sure that it is indeed zero. - If we had to fetch a remote block but we were not able to get it, check that refcount is indeed > 0. - Repair procedure that checks everything
Diffstat (limited to 'src/garage/repair/online.rs')
-rw-r--r--src/garage/repair/online.rs145
1 files changed, 127 insertions, 18 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 9e4de873..9999717d 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -4,6 +4,7 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
+use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
@@ -16,11 +17,14 @@ use garage_table::replication::*;
use garage_table::*;
use garage_util::background::*;
+use garage_util::data::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
use crate::*;
+const ITER_COUNT: usize = 64;
+
pub async fn launch_online_repair(
garage: &Arc<Garage>,
bg: &BackgroundRunner,
@@ -47,6 +51,13 @@ pub async fn launch_online_repair(
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
+ RepairWhat::BlockRc => {
+ info!("Repairing the block reference counters");
+ bg.spawn_worker(BlockRcRepair::new(
+ garage.block_manager.clone(),
+ garage.block_ref_table.clone(),
+ ));
+ }
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
@@ -129,28 +140,31 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
- Some((k, v)) => (v, k),
- None => {
- info!(
- "{}: finished, done {}, fixed {}",
- self.name(),
- self.counter,
- self.repairs
- );
- return Ok(WorkerState::Done);
+ for _i in 0..ITER_COUNT {
+ let (item_bytes, next_pos) =
+ match R::table(&self.garage).data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
+ None => {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ let entry = <R::T as TableSchema>::E::decode(&item_bytes)
+ .ok_or_message("Cannot decode table entry")?;
+ if self.inner.process(&self.garage, entry).await? {
+ self.repairs += 1;
}
- };
- let entry = <R::T as TableSchema>::E::decode(&item_bytes)
- .ok_or_message("Cannot decode table entry")?;
- if self.inner.process(&self.garage, entry).await? {
- self.repairs += 1;
+ self.counter += 1;
+ self.pos = next_pos;
}
- self.counter += 1;
- self.pos = next_pos;
-
Ok(WorkerState::Busy)
}
@@ -282,3 +296,98 @@ impl TableRepair for RepairMpu {
Ok(false)
}
}
+
+// ===== block reference counter repair =====
+
+pub struct BlockRcRepair {
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ cursor: Hash,
+ counter: u64,
+ repairs: u64,
+}
+
+impl BlockRcRepair {
+ fn new(
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ ) -> Self {
+ Self {
+ block_manager,
+ block_ref_table,
+ cursor: [0u8; 32].into(),
+ counter: 0,
+ repairs: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for BlockRcRepair {
+ fn name(&self) -> String {
+ format!("Block refcount repair worker")
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ for _i in 0..ITER_COUNT {
+ let next1 = self
+ .block_manager
+ .rc
+ .rc
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
+ let next2 = self
+ .block_ref_table
+ .data
+ .store
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
+ let next = match (next1, next2) {
+ (Some(k1), Some(k2)) => std::cmp::min(k1, k2),
+ (Some(k), None) | (None, Some(k)) => k,
+ (None, None) => {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ if self.block_manager.rc.recalculate_rc(&next)?.1 {
+ self.repairs += 1;
+ }
+ self.counter += 1;
+ if let Some(next_incr) = next.increment() {
+ self.cursor = next_incr;
+ } else {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
+}