diff options
Diffstat (limited to 'src/block/repair.rs')
-rw-r--r-- | src/block/repair.rs | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/src/block/repair.rs b/src/block/repair.rs new file mode 100644 index 00000000..0445527c --- /dev/null +++ b/src/block/repair.rs @@ -0,0 +1,204 @@ +use core::ops::Bound; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::fs; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::tranquilizer::Tranquilizer; + +use crate::manager::*; + +pub struct RepairWorker { + manager: Arc<BlockManager>, + next_start: Option<Hash>, + block_iter: Option<BlockStoreIterator>, +} + +impl RepairWorker { + pub fn new(manager: Arc<BlockManager>) -> Self { + Self { + manager, + next_start: None, + block_iter: None, + } + } +} + +#[async_trait] +impl Worker for RepairWorker { + fn name(&self) -> String { + "Block repair worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.block_iter.as_mut() { + None => { + // Phase 1: Repair blocks from RC table. + + // We have to do this complicated two-step process where we first read a bunch + // of hashes from the RC table, and then insert them in the to-resync queue, + // because of SQLite. Basically, as long as we have an iterator on a DB table, + // we can't do anything else on the DB. The naive approach (which we had previously) + // of just iterating on the RC table and inserting items one to one in the resync + // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. + // This is mostly because the Rust bindings for SQLite assume a worst-case scenario + // where SQLite is not compiled in thread-safe mode, so we have to wrap everything + // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). + let mut batch_of_hashes = vec![]; + let start_bound = match self.next_start.as_ref() { + None => Bound::Unbounded, + Some(x) => Bound::Excluded(x.as_slice()), + }; + for entry in self + .manager + .rc + .rc + .range::<&[u8], _>((start_bound, Bound::Unbounded))? + { + let (hash, _) = entry?; + let hash = Hash::try_from(&hash[..]).unwrap(); + batch_of_hashes.push(hash); + if batch_of_hashes.len() >= 1000 { + break; + } + } + if batch_of_hashes.is_empty() { + // move on to phase 2 + self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?); + return Ok(WorkerStatus::Busy); + } + + for hash in batch_of_hashes.into_iter() { + self.manager.put_to_resync(&hash, Duration::from_secs(0))?; + self.next_start = Some(hash) + } + + Ok(WorkerStatus::Busy) + } + Some(bi) => { + // Phase 2: Repair blocks actually on disk + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. + if let Some(hash) = bi.next().await? { + self.manager.put_to_resync(&hash, Duration::from_secs(0))?; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Done) + } + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() + } +} + +// ---- + +pub struct ScrubWorker { + manager: Arc<BlockManager>, + iterator: BlockStoreIterator, + tranquilizer: Tranquilizer, + tranquility: u32, +} + +impl ScrubWorker { + pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> { + let iterator = BlockStoreIterator::new(&manager).await?; + Ok(Self { + manager, + iterator, + tranquilizer: Tranquilizer::new(30), + tranquility, + }) + } +} + +#[async_trait] +impl Worker for ScrubWorker { + fn name(&self) -> String { + "Block scrub worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + self.tranquilizer.reset(); + if let Some(hash) = self.iterator.next().await? { + let _ = self.manager.read_block(&hash).await; + self.tranquilizer.tranquilize(self.tranquility).await; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Done) + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() + } +} + +// ---- + +struct BlockStoreIterator { + path: Vec<fs::ReadDir>, +} + +impl BlockStoreIterator { + async fn new(manager: &BlockManager) -> Result<Self, Error> { + let root_dir = manager.data_dir.clone(); + let read_root_dir = fs::read_dir(&root_dir).await?; + Ok(Self { + path: vec![read_root_dir], + }) + } + + async fn next(&mut self) -> Result<Option<Hash>, Error> { + loop { + if let Some(reader) = self.path.last_mut() { + if let Some(data_dir_ent) = reader.next_entry().await? { + let name = data_dir_ent.file_name(); + let name = if let Ok(n) = name.into_string() { + n + } else { + continue; + }; + let ent_type = data_dir_ent.file_type().await?; + + let name = name.strip_suffix(".zst").unwrap_or(&name); + if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?; + self.path.push(read_child_dir); + continue; + } else if name.len() == 64 { + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + return Ok(Some(hash.into())); + } + } else { + self.path.pop(); + continue; + } + } else { + return Ok(None); + } + } + } +} |