diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 30 |
1 files changed, 26 insertions, 4 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index 1bea9f09..e18eeaeb 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::block::*; use crate::layout::*; use crate::manager::*; @@ -528,8 +529,10 @@ impl Worker for ScrubWorker { pub struct RebalanceWorker { manager: Arc<BlockManager>, block_iter: BlockStoreIterator, + t_started: u64, + t_finished: Option<u64>, moved: usize, - moved_bytes: usize, + moved_bytes: u64, } impl RebalanceWorker { @@ -538,6 +541,8 @@ impl RebalanceWorker { Self { manager, block_iter, + t_started: now_msec(), + t_finished: None, moved: 0, moved_bytes: 0, } @@ -551,11 +556,18 @@ impl Worker for RebalanceWorker { } fn status(&self) -> WorkerStatus { + let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); + let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), freeform: vec![ format!("Blocks moved: {}", self.moved), - format!("Bytes moved: {}", self.moved_bytes), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), ], ..Default::default() } @@ -565,12 +577,21 @@ impl Worker for RebalanceWorker { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.parent().expect("no parent?") != prim_loc { + let path = match path.extension() { + None => DataBlockPath::Plain(path), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + _ => { + warn!("not rebalancing file: {}", path.to_string_lossy()); + return Ok(WorkerState::Busy); + } + }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - let data = self.manager.read_block(&hash).await?; + debug!("rebalance: moving block {:?}", hash); + let data = self.manager.read_block_from(&hash, &path).await?; self.manager.write_block(&hash, &data).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len(); + self.moved_bytes += data.inner_buffer().len() as u64; } Ok(WorkerState::Busy) } else { @@ -589,6 +610,7 @@ impl Worker for RebalanceWorker { .save_async(&new_layout) .await?; self.manager.data_layout.store(Arc::new(new_layout)); + self.t_finished = Some(now_msec()); Ok(WorkerState::Done) } } |