aboutsummaryrefslogtreecommitdiff
path: root/src/block/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r--src/block/repair.rs37
1 files changed, 20 insertions, 17 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index e18eeaeb..bd14085f 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -558,17 +558,21 @@ impl Worker for RebalanceWorker {
fn status(&self) -> WorkerStatus {
let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
+ let mut freeform = vec![
+ format!("Blocks moved: {}", self.moved),
+ format!(
+ "Bytes moved: {} ({}/s)",
+ bytesize::ByteSize::b(self.moved_bytes),
+ bytesize::ByteSize::b(rate)
+ ),
+ format!("Started: {}", msec_to_rfc3339(self.t_started)),
+ ];
+ if let Some(t_fin) = self.t_finished {
+ freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
+ }
WorkerStatus {
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
- freeform: vec![
- format!("Blocks moved: {}", self.moved),
- format!(
- "Bytes moved: {} ({}/s)",
- bytesize::ByteSize::b(self.moved_bytes),
- bytesize::ByteSize::b(rate)
- ),
- format!("Started: {}", msec_to_rfc3339(self.t_started)),
- ],
+ freeform,
..Default::default()
}
}
@@ -576,10 +580,10 @@ impl Worker for RebalanceWorker {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if let Some((path, hash)) = self.block_iter.next().await? {
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
- if path.parent().expect("no parent?") != prim_loc {
- let path = match path.extension() {
- None => DataBlockPath::Plain(path),
- Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path),
+ if path.ancestors().all(|x| x != prim_loc) {
+ let block_path = match path.extension() {
+ None => DataBlockPath::Plain(path.clone()),
+ Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
_ => {
warn!("not rebalancing file: {}", path.to_string_lossy());
return Ok(WorkerState::Busy);
@@ -587,11 +591,10 @@ impl Worker for RebalanceWorker {
};
// block is not in its primary location,
// move it there (reading and re-writing does the trick)
- debug!("rebalance: moving block {:?}", hash);
- let data = self.manager.read_block_from(&hash, &path).await?;
- self.manager.write_block(&hash, &data).await?;
+ debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
+ let block_len = self.manager.fix_block_location(&hash, block_path).await?;
self.moved += 1;
- self.moved_bytes += data.inner_buffer().len() as u64;
+ self.moved_bytes += block_len as u64;
}
Ok(WorkerState::Busy)
} else {