aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/repair.rs156
-rw-r--r--src/garage/admin.rs2
-rw-r--r--src/garage/repair/online.rs13
3 files changed, 126 insertions, 45 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 97989780..a2a8443e 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -1,4 +1,5 @@
use core::ops::Bound;
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -36,6 +37,25 @@ impl Worker for RepairWorker {
"Block repair worker".into()
}
+ fn info(&self) -> Option<String> {
+ match self.block_iter.as_ref() {
+ None => {
+ let idx_bytes = self
+ .next_start
+ .as_ref()
+ .map(|x| x.as_slice())
+ .unwrap_or(&[]);
+ let idx_bytes = if idx_bytes.len() > 4 {
+ &idx_bytes[..4]
+ } else {
+ idx_bytes
+ };
+ Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ }
+ Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ }
+ }
+
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
@@ -74,7 +94,7 @@ impl Worker for RepairWorker {
}
if batch_of_hashes.is_empty() {
// move on to phase 2
- self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?);
+ self.block_iter = Some(BlockStoreIterator::new(&self.manager));
return Ok(WorkerStatus::Busy);
}
@@ -115,14 +135,14 @@ pub struct ScrubWorker {
}
impl ScrubWorker {
- pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> {
- let iterator = BlockStoreIterator::new(&manager).await?;
- Ok(Self {
+ pub fn new(manager: Arc<BlockManager>, tranquility: u32) -> Self {
+ let iterator = BlockStoreIterator::new(&manager);
+ Self {
manager,
iterator,
tranquilizer: Tranquilizer::new(30),
tranquility,
- })
+ }
}
}
@@ -132,6 +152,10 @@ impl Worker for ScrubWorker {
"Block scrub worker".into()
}
+ fn info(&self) -> Option<String> {
+ Some(format!("{:.2}% done", self.iterator.progress() * 100.))
+ }
+
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
@@ -153,51 +177,107 @@ impl Worker for ScrubWorker {
// ----
struct BlockStoreIterator {
- path: Vec<fs::ReadDir>,
+ path: Vec<ReadingDir>,
+}
+
+enum ReadingDir {
+ Pending(PathBuf),
+ Read {
+ subpaths: Vec<fs::DirEntry>,
+ pos: usize,
+ },
}
impl BlockStoreIterator {
- async fn new(manager: &BlockManager) -> Result<Self, Error> {
+ fn new(manager: &BlockManager) -> Self {
let root_dir = manager.data_dir.clone();
- let read_root_dir = fs::read_dir(&root_dir).await?;
- Ok(Self {
- path: vec![read_root_dir],
- })
+ Self {
+ path: vec![ReadingDir::Pending(root_dir)],
+ }
+ }
+
+ /// Returns progress done, between 0% and 1%
+ fn progress(&self) -> f32 {
+ if self.path.is_empty() {
+ 1.0
+ } else {
+ let mut ret = 0.0;
+ let mut next_div = 1;
+ for p in self.path.iter() {
+ match p {
+ ReadingDir::Pending(_) => break,
+ ReadingDir::Read { subpaths, pos } => {
+ next_div *= subpaths.len();
+ ret += ((*pos - 1) as f32) / (next_div as f32);
+ }
+ }
+ }
+ ret
+ }
}
async fn next(&mut self) -> Result<Option<Hash>, Error> {
loop {
- if let Some(reader) = self.path.last_mut() {
- if let Some(data_dir_ent) = reader.next_entry().await? {
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?;
- self.path.push(read_child_dir);
- continue;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- return Ok(Some(hash.into()));
- }
- } else {
+ let last_path = match self.path.last_mut() {
+ None => return Ok(None),
+ Some(lp) => lp,
+ };
+
+ if let ReadingDir::Pending(path) = last_path {
+ let mut reader = fs::read_dir(&path).await?;
+ let mut subpaths = vec![];
+ while let Some(ent) = reader.next_entry().await? {
+ subpaths.push(ent);
+ }
+ *last_path = ReadingDir::Read { subpaths, pos: 0 };
+ }
+
+ let (subpaths, pos) = match *last_path {
+ ReadingDir::Read {
+ ref subpaths,
+ ref mut pos,
+ } => (subpaths, pos),
+ ReadingDir::Pending(_) => unreachable!(),
+ };
+
+ if *pos >= subpaths.len() {
+ self.path.pop();
+ continue;
+ }
+
+ let data_dir_ent = match subpaths.get(*pos) {
+ None => {
self.path.pop();
continue;
}
+ Some(ent) => {
+ *pos += 1;
+ ent
+ }
+ };
+
+ let name = data_dir_ent.file_name();
+ let name = if let Ok(n) = name.into_string() {
+ n
} else {
- return Ok(None);
+ continue;
+ };
+ let ent_type = data_dir_ent.file_type().await?;
+
+ let name = name.strip_suffix(".zst").unwrap_or(&name);
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ let path = data_dir_ent.path();
+ self.path.push(ReadingDir::Pending(path));
+ continue;
+ } else if name.len() == 64 {
+ let hash_bytes = if let Ok(h) = hex::decode(&name) {
+ h
+ } else {
+ continue;
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ return Ok(Some(hash.into()));
}
}
}
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 9c6a0c57..de49331e 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -698,7 +698,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt).await?;
+ launch_online_repair(self.garage.clone(), opt);
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index a5ccfa02..b0437c5e 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -13,7 +13,7 @@ use garage_util::error::Error;
use crate::*;
-pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
+pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
@@ -45,13 +45,14 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result
}
RepairWhat::Scrub { tranquility } => {
info!("Verifying integrity of stored blocks");
- garage.background.spawn_worker(
- garage_block::repair::ScrubWorker::new(garage.block_manager.clone(), tranquility)
- .await?,
- );
+ garage
+ .background
+ .spawn_worker(garage_block::repair::ScrubWorker::new(
+ garage.block_manager.clone(),
+ tranquility,
+ ));
}
}
- Ok(())
}
// ----