aboutsummaryrefslogtreecommitdiff
path: root/src/block/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r--src/block/repair.rs90
1 files changed, 85 insertions, 5 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index a7c90d4f..1bea9f09 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -519,6 +519,86 @@ impl Worker for ScrubWorker {
}
// ---- ---- ----
+// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
+// between multiple storage locations.
+// This is a one-shot repair operation that can be launched,
+// checks everything, and then exits.
+// ---- ---- ----
+
+pub struct RebalanceWorker {
+ manager: Arc<BlockManager>,
+ block_iter: BlockStoreIterator,
+ moved: usize,
+ moved_bytes: usize,
+}
+
+impl RebalanceWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ let block_iter = BlockStoreIterator::new(&manager);
+ Self {
+ manager,
+ block_iter,
+ moved: 0,
+ moved_bytes: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RebalanceWorker {
+ fn name(&self) -> String {
+ "Block rebalance worker".into()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
+ freeform: vec![
+ format!("Blocks moved: {}", self.moved),
+ format!("Bytes moved: {}", self.moved_bytes),
+ ],
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some((path, hash)) = self.block_iter.next().await? {
+ let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
+ if path.parent().expect("no parent?") != prim_loc {
+ // block is not in its primary location,
+ // move it there (reading and re-writing does the trick)
+ let data = self.manager.read_block(&hash).await?;
+ self.manager.write_block(&hash, &data).await?;
+ self.moved += 1;
+ self.moved_bytes += data.inner_buffer().len();
+ }
+ Ok(WorkerState::Busy)
+ } else {
+ // all blocks are in their primary location:
+ // - the ones we moved now are
+ // - the ones written in the meantime always were, because we only
+ // write to primary locations
+ // so we can safely remove all secondary locations from the data layout
+ let new_layout = self
+ .manager
+ .data_layout
+ .load_full()
+ .without_secondary_locations();
+ self.manager
+ .data_layout_persister
+ .save_async(&new_layout)
+ .await?;
+ self.manager.data_layout.store(Arc::new(new_layout));
+ Ok(WorkerState::Done)
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
+}
+
+// ---- ---- ----
// UTILITY FOR ENUMERATING THE BLOCK STORE
// ---- ---- ----
@@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000;
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let min_cap = manager
- .data_layout
+ let data_layout = manager.data_layout.load_full();
+
+ let min_cap = data_layout
.data_dirs
.iter()
.filter_map(|x| x.capacity())
.min()
.unwrap_or(0);
- let sum_cap = manager
- .data_layout
+ let sum_cap = data_layout
.data_dirs
.iter()
.map(|x| x.capacity().unwrap_or(min_cap /* approximation */))
@@ -543,7 +623,7 @@ impl BlockStoreIterator {
let mut cum_cap = 0;
let mut todo = vec![];
- for dir in manager.data_layout.data_dirs.iter() {
+ for dir in data_layout.data_dirs.iter() {
let cap = match dir.state {
DataDirState::Active { capacity } => capacity,
_ => min_cap,