diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 90 |
1 files changed, 85 insertions, 5 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index a7c90d4f..1bea9f09 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -519,6 +519,86 @@ impl Worker for ScrubWorker { } // ---- ---- ---- +// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS +// between multiple storage locations. +// This is a one-shot repair operation that can be launched, +// checks everything, and then exits. +// ---- ---- ---- + +pub struct RebalanceWorker { + manager: Arc<BlockManager>, + block_iter: BlockStoreIterator, + moved: usize, + moved_bytes: usize, +} + +impl RebalanceWorker { + pub fn new(manager: Arc<BlockManager>) -> Self { + let block_iter = BlockStoreIterator::new(&manager); + Self { + manager, + block_iter, + moved: 0, + moved_bytes: 0, + } + } +} + +#[async_trait] +impl Worker for RebalanceWorker { + fn name(&self) -> String { + "Block rebalance worker".into() + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), + freeform: vec![ + format!("Blocks moved: {}", self.moved), + format!("Bytes moved: {}", self.moved_bytes), + ], + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if let Some((path, hash)) = self.block_iter.next().await? { + let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); + if path.parent().expect("no parent?") != prim_loc { + // block is not in its primary location, + // move it there (reading and re-writing does the trick) + let data = self.manager.read_block(&hash).await?; + self.manager.write_block(&hash, &data).await?; + self.moved += 1; + self.moved_bytes += data.inner_buffer().len(); + } + Ok(WorkerState::Busy) + } else { + // all blocks are in their primary location: + // - the ones we moved now are + // - the ones written in the meantime always were, because we only + // write to primary locations + // so we can safely remove all secondary locations from the data layout + let new_layout = self + .manager + .data_layout + .load_full() + .without_secondary_locations(); + self.manager + .data_layout_persister + .save_async(&new_layout) + .await?; + self.manager.data_layout.store(Arc::new(new_layout)); + Ok(WorkerState::Done) + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- @@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000; impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let min_cap = manager - .data_layout + let data_layout = manager.data_layout.load_full(); + + let min_cap = data_layout .data_dirs .iter() .filter_map(|x| x.capacity()) .min() .unwrap_or(0); - let sum_cap = manager - .data_layout + let sum_cap = data_layout .data_dirs .iter() .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) @@ -543,7 +623,7 @@ impl BlockStoreIterator { let mut cum_cap = 0; let mut todo = vec![]; - for dir in manager.data_layout.data_dirs.iter() { + for dir in data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, |