diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 156 |
1 files changed, 118 insertions, 38 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs index 97989780..a2a8443e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -1,4 +1,5 @@ use core::ops::Bound; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -36,6 +37,25 @@ impl Worker for RepairWorker { "Block repair worker".into() } + fn info(&self) -> Option<String> { + match self.block_iter.as_ref() { + None => { + let idx_bytes = self + .next_start + .as_ref() + .map(|x| x.as_slice()) + .unwrap_or(&[]); + let idx_bytes = if idx_bytes.len() > 4 { + &idx_bytes[..4] + } else { + idx_bytes + }; + Some(format!("Phase 1: {}", hex::encode(idx_bytes))) + } + Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), + } + } + async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, @@ -74,7 +94,7 @@ impl Worker for RepairWorker { } if batch_of_hashes.is_empty() { // move on to phase 2 - self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?); + self.block_iter = Some(BlockStoreIterator::new(&self.manager)); return Ok(WorkerStatus::Busy); } @@ -115,14 +135,14 @@ pub struct ScrubWorker { } impl ScrubWorker { - pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> { - let iterator = BlockStoreIterator::new(&manager).await?; - Ok(Self { + pub fn new(manager: Arc<BlockManager>, tranquility: u32) -> Self { + let iterator = BlockStoreIterator::new(&manager); + Self { manager, iterator, tranquilizer: Tranquilizer::new(30), tranquility, - }) + } } } @@ -132,6 +152,10 @@ impl Worker for ScrubWorker { "Block scrub worker".into() } + fn info(&self) -> Option<String> { + Some(format!("{:.2}% done", self.iterator.progress() * 100.)) + } + async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, @@ -153,51 +177,107 @@ impl Worker for ScrubWorker { // ---- struct BlockStoreIterator { - path: Vec<fs::ReadDir>, + path: Vec<ReadingDir>, +} + +enum ReadingDir { + Pending(PathBuf), + Read { + subpaths: Vec<fs::DirEntry>, + pos: usize, + }, } impl BlockStoreIterator { - async fn new(manager: &BlockManager) -> Result<Self, Error> { + fn new(manager: &BlockManager) -> Self { let root_dir = manager.data_dir.clone(); - let read_root_dir = fs::read_dir(&root_dir).await?; - Ok(Self { - path: vec![read_root_dir], - }) + Self { + path: vec![ReadingDir::Pending(root_dir)], + } + } + + /// Returns progress done, between 0% and 1% + fn progress(&self) -> f32 { + if self.path.is_empty() { + 1.0 + } else { + let mut ret = 0.0; + let mut next_div = 1; + for p in self.path.iter() { + match p { + ReadingDir::Pending(_) => break, + ReadingDir::Read { subpaths, pos } => { + next_div *= subpaths.len(); + ret += ((*pos - 1) as f32) / (next_div as f32); + } + } + } + ret + } } async fn next(&mut self) -> Result<Option<Hash>, Error> { loop { - if let Some(reader) = self.path.last_mut() { - if let Some(data_dir_ent) = reader.next_entry().await? { - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?; - self.path.push(read_child_dir); - continue; - } else if name.len() == 64 { - let hash_bytes = if let Ok(h) = hex::decode(&name) { - h - } else { - continue; - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - return Ok(Some(hash.into())); - } - } else { + let last_path = match self.path.last_mut() { + None => return Ok(None), + Some(lp) => lp, + }; + + if let ReadingDir::Pending(path) = last_path { + let mut reader = fs::read_dir(&path).await?; + let mut subpaths = vec![]; + while let Some(ent) = reader.next_entry().await? { + subpaths.push(ent); + } + *last_path = ReadingDir::Read { subpaths, pos: 0 }; + } + + let (subpaths, pos) = match *last_path { + ReadingDir::Read { + ref subpaths, + ref mut pos, + } => (subpaths, pos), + ReadingDir::Pending(_) => unreachable!(), + }; + + if *pos >= subpaths.len() { + self.path.pop(); + continue; + } + + let data_dir_ent = match subpaths.get(*pos) { + None => { self.path.pop(); continue; } + Some(ent) => { + *pos += 1; + ent + } + }; + + let name = data_dir_ent.file_name(); + let name = if let Ok(n) = name.into_string() { + n } else { - return Ok(None); + continue; + }; + let ent_type = data_dir_ent.file_type().await?; + + let name = name.strip_suffix(".zst").unwrap_or(&name); + if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + let path = data_dir_ent.path(); + self.path.push(ReadingDir::Pending(path)); + continue; + } else if name.len() == 64 { + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + return Ok(Some(hash.into())); } } } |