diff options
Diffstat (limited to 'src/garage/repair/online.rs')
-rw-r--r-- | src/garage/repair/online.rs | 145 |
1 files changed, 127 insertions, 18 deletions
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9e4de873..9999717d 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -4,6 +4,7 @@ use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; +use garage_block::manager::BlockManager; use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; @@ -16,11 +17,14 @@ use garage_table::replication::*; use garage_table::*; use garage_util::background::*; +use garage_util::data::*; use garage_util::error::Error; use garage_util::migrate::Migrate; use crate::*; +const ITER_COUNT: usize = 64; + pub async fn launch_online_repair( garage: &Arc<Garage>, bg: &BackgroundRunner, @@ -47,6 +51,13 @@ pub async fn launch_online_repair( info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } + RepairWhat::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } RepairWhat::Blocks => { info!("Repairing the stored blocks"); bg.spawn_worker(garage_block::repair::RepairWorker::new( @@ -129,28 +140,31 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> { } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); + for _i in 0..ITER_COUNT { + let (item_bytes, next_pos) = + match R::table(&self.garage).data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; } - }; - let entry = <R::T as TableSchema>::E::decode(&item_bytes) - .ok_or_message("Cannot decode table entry")?; - if self.inner.process(&self.garage, entry).await? { - self.repairs += 1; + self.counter += 1; + self.pos = next_pos; } - self.counter += 1; - self.pos = next_pos; - Ok(WorkerState::Busy) } @@ -282,3 +296,98 @@ impl TableRepair for RepairMpu { Ok(false) } } + +// ===== block reference counter repair ===== + +pub struct BlockRcRepair { + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + cursor: Hash, + counter: u64, + repairs: u64, +} + +impl BlockRcRepair { + fn new( + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + ) -> Self { + Self { + block_manager, + block_ref_table, + cursor: [0u8; 32].into(), + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl Worker for BlockRcRepair { + fn name(&self) -> String { + format!("Block refcount repair worker") + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + for _i in 0..ITER_COUNT { + let next1 = self + .block_manager + .rc + .rc + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); + let next2 = self + .block_ref_table + .data + .store + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); + let next = match (next1, next2) { + (Some(k1), Some(k2)) => std::cmp::min(k1, k2), + (Some(k), None) | (None, Some(k)) => k, + (None, None) => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + if self.block_manager.rc.recalculate_rc(&next)?.1 { + self.repairs += 1; + } + self.counter += 1; + if let Some(next_incr) = next.increment() { + self.cursor = next_incr; + } else { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + } + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} |