aboutsummaryrefslogtreecommitdiff
path: root/src/block/repair.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r--src/block/repair.rs217
1 files changed, 142 insertions, 75 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 71093d69..d5e2e168 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use crate::layout::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
- if let Some(hash) = bi.next().await? {
+ if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -376,7 +377,7 @@ impl Worker for ScrubWorker {
match &mut self.work {
ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset();
- if let Some(hash) = bsi.next().await? {
+ if let Some((_path, hash)) = bsi.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -447,100 +448,166 @@ impl Worker for ScrubWorker {
// UTILITY FOR ENUMERATING THE BLOCK STORE
// ---- ---- ----
+const PROGRESS_FP: u64 = 1_000_000_000;
+
struct BlockStoreIterator {
- path: Vec<ReadingDir>,
+ todo: Vec<BsiTodo>,
}
-enum ReadingDir {
- Pending(PathBuf),
- Read {
- subpaths: Vec<fs::DirEntry>,
- pos: usize,
+enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ filename: String,
+ progress: u64,
},
}
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let root_dir = manager.data_dir.clone();
- Self {
- path: vec![ReadingDir::Pending(root_dir)],
+ let min_cap = manager
+ .data_layout
+ .data_dirs
+ .iter()
+ .filter_map(|x| match x.state {
+ DataDirState::Active { capacity } => Some(capacity),
+ _ => None,
+ })
+ .min()
+ .unwrap_or(0);
+
+ let sum_cap = manager
+ .data_layout
+ .data_dirs
+ .iter()
+ .map(|x| match x.state {
+ DataDirState::Active { capacity } => capacity,
+ _ => min_cap, // approximation
+ })
+ .sum::<u64>() as u128;
+
+ let mut cum_cap = 0;
+ let mut todo = vec![];
+ for dir in manager.data_layout.data_dirs.iter() {
+ let cap = match dir.state {
+ DataDirState::Active { capacity } => capacity,
+ _ => min_cap,
+ };
+
+ let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64;
+ let progress_max =
+ (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64;
+ cum_cap += cap;
+
+ todo.push(BsiTodo::Directory {
+ path: dir.path.clone(),
+ progress_min,
+ progress_max,
+ });
}
+ // entries are processed back-to-front (because of .pop()),
+ // so reverse entries to process them in increasing progress bounds
+ todo.reverse();
+
+ let ret = Self { todo };
+ debug_assert!(ret.progress_invariant());
+
+ ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
- if self.path.is_empty() {
- 1.0
- } else {
- let mut ret = 0.0;
- let mut next_div = 1;
- for p in self.path.iter() {
- match p {
- ReadingDir::Pending(_) => break,
- ReadingDir::Read { subpaths, pos } => {
- next_div *= subpaths.len();
- ret += ((*pos - 1) as f32) / (next_div as f32);
- }
- }
- }
- ret
- }
+ self.todo
+ .last()
+ .map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => *progress_min,
+ BsiTodo::File { progress, .. } => *progress,
+ })
+ .map(|x| x as f32 / PROGRESS_FP as f32)
+ .unwrap_or(1.0)
}
- async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
- let last_path = match self.path.last_mut() {
+ match self.todo.pop() {
None => return Ok(None),
- Some(lp) => lp,
- };
-
- if let ReadingDir::Pending(path) = last_path {
- let mut reader = fs::read_dir(&path).await?;
- let mut subpaths = vec![];
- while let Some(ent) = reader.next_entry().await? {
- subpaths.push(ent);
- }
- *last_path = ReadingDir::Read { subpaths, pos: 0 };
- }
-
- let (subpaths, pos) = match *last_path {
- ReadingDir::Read {
- ref subpaths,
- ref mut pos,
- } => (subpaths, pos),
- ReadingDir::Pending(_) => unreachable!(),
- };
+ Some(BsiTodo::Directory {
+ path,
+ progress_min,
+ progress_max,
+ }) => {
+ let istart = self.todo.len();
+
+ let mut reader = fs::read_dir(&path).await?;
+ while let Some(ent) = reader.next_entry().await? {
+ let name = if let Ok(n) = ent.file_name().into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ft = ent.file_type().await?;
+ if ft.is_dir() && hex::decode(&name).is_ok() {
+ self.todo.push(BsiTodo::Directory {
+ path: ent.path(),
+ progress_min: 0,
+ progress_max: 0,
+ });
+ } else if ft.is_file() {
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ filename: name,
+ progress: 0,
+ });
+ }
+ }
- let data_dir_ent = match subpaths.get(*pos) {
- None => {
- self.path.pop();
- continue;
- }
- Some(ent) => {
- *pos += 1;
- ent
+ let count = self.todo.len() - istart;
+ for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
+ let p1 = progress_min
+ + ((progress_max - progress_min) * i as u64) / count as u64;
+ let p2 = progress_min
+ + ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
+ match ent {
+ BsiTodo::Directory {
+ progress_min,
+ progress_max,
+ ..
+ } => {
+ *progress_min = p1;
+ *progress_max = p2;
+ }
+ BsiTodo::File { progress, .. } => {
+ *progress = p1;
+ }
+ }
+ }
+ self.todo[istart..].reverse();
+ debug_assert!(self.progress_invariant());
}
- };
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
- let path = data_dir_ent.path();
- self.path.push(ReadingDir::Pending(path));
- } else if name.len() == 64 {
- if let Ok(h) = hex::decode(name) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some(hash.into()));
+ Some(BsiTodo::File { path, filename, .. }) => {
+ let filename = filename.strip_suffix(".zst").unwrap_or(&filename);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ return Ok(Some((path, hash.into())));
+ }
+ }
}
}
}
}
+
+ fn progress_invariant(&self) -> bool {
+ let iter = self.todo.iter().map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => progress_min,
+ BsiTodo::File { progress, .. } => progress,
+ });
+ let iter_1 = iter.clone().skip(1);
+ iter.zip(iter_1).all(|(prev, next)| prev >= next)
+ }
}