diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/lib.rs | 1 | ||||
-rw-r--r-- | src/block/manager.rs | 160 | ||||
-rw-r--r-- | src/block/repair.rs | 204 |
3 files changed, 209 insertions, 156 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs index dc685657..ebdb95d8 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -2,6 +2,7 @@ extern crate tracing; pub mod manager; +pub mod repair; mod block; mod metrics; diff --git a/src/block/manager.rs b/src/block/manager.rs index 8a131270..54368faf 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,7 +1,5 @@ -use core::ops::Bound; - use std::convert::TryInto; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -94,7 +92,7 @@ pub struct BlockManager { mutation_lock: Mutex<BlockManagerLocked>, - rc: BlockRc, + pub(crate) rc: BlockRc, resync_queue: CountedTree, resync_notify: Notify, @@ -225,90 +223,6 @@ impl BlockManager { Ok(()) } - /// Launch the repair procedure on the data store - /// - /// This will list all blocks locally present, as well as those - /// that are required because of refcount > 0, and will try - /// to fix any mismatch between the two. - pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - // 1. Repair blocks from RC table. - let mut next_start: Option<Hash> = None; - loop { - // We have to do this complicated two-step process where we first read a bunch - // of hashes from the RC table, and then insert them in the to-resync queue, - // because of SQLite. Basically, as long as we have an iterator on a DB table, - // we can't do anything else on the DB. The naive approach (which we had previously) - // of just iterating on the RC table and inserting items one to one in the resync - // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. - // This is mostly because the Rust bindings for SQLite assume a worst-case scenario - // where SQLite is not compiled in thread-safe mode, so we have to wrap everything - // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). - let mut batch_of_hashes = vec![]; - let start_bound = match next_start.as_ref() { - None => Bound::Unbounded, - Some(x) => Bound::Excluded(x.as_slice()), - }; - for entry in self - .rc - .rc - .range::<&[u8], _>((start_bound, Bound::Unbounded))? - { - let (hash, _) = entry?; - let hash = Hash::try_from(&hash[..]).unwrap(); - batch_of_hashes.push(hash); - if batch_of_hashes.len() >= 1000 { - break; - } - } - if batch_of_hashes.is_empty() { - break; - } - - for hash in batch_of_hashes.into_iter() { - self.put_to_resync(&hash, Duration::from_secs(0))?; - next_start = Some(hash) - } - - if *must_exit.borrow() { - return Ok(()); - } - } - - // 2. Repair blocks actually on disk - // Lists all blocks on disk and adds them to the resync queue. - // This allows us to find blocks we are storing but don't actually need, - // so that we can offload them if necessary and then delete them locally. - self.for_each_file( - (), - move |_, hash| async move { - self.put_to_resync(&hash, Duration::from_secs(0)) - .map_err(Into::into) - }, - must_exit, - ) - .await - } - - /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by - /// this function. - pub async fn scrub_data_store( - &self, - must_exit: &watch::Receiver<bool>, - tranquility: u32, - ) -> Result<(), Error> { - let tranquilizer = Tranquilizer::new(30); - self.for_each_file( - tranquilizer, - move |mut tranquilizer, hash| async move { - let _ = self.read_block(&hash).await; - tranquilizer.tranquilize(tranquility).await; - Ok(tranquilizer) - }, - must_exit, - ) - .await - } - /// Get lenght of resync queue pub fn resync_queue_len(&self) -> Result<usize, Error> { // This currently can't return an error because the CountedTree hack @@ -397,7 +311,7 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { + pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { let data = self .read_block_internal(hash) .bound_record_duration(&self.metrics.block_read_duration) @@ -575,7 +489,7 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { + pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } @@ -784,72 +698,6 @@ impl BlockManager { Ok(()) } - - // ---- Utility: iteration on files in the data directory ---- - - async fn for_each_file<F, Fut, State>( - &self, - state: State, - mut f: F, - must_exit: &watch::Receiver<bool>, - ) -> Result<(), Error> - where - F: FnMut(State, Hash) -> Fut + Send, - Fut: Future<Output = Result<State, Error>> + Send, - State: Send, - { - self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit) - .await - .map(|_| ()) - } - - fn for_each_file_rec<'a, F, Fut, State>( - &'a self, - path: &'a Path, - mut state: State, - f: &'a mut F, - must_exit: &'a watch::Receiver<bool>, - ) -> BoxFuture<'a, Result<State, Error>> - where - F: FnMut(State, Hash) -> Fut + Send, - Fut: Future<Output = Result<State, Error>> + Send, - State: Send + 'a, - { - async move { - let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { - if *must_exit.borrow() { - break; - } - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - state = self - .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) - .await?; - } else if name.len() == 64 { - let hash_bytes = if let Ok(h) = hex::decode(&name) { - h - } else { - continue; - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - state = f(state, hash.into()).await?; - } - } - Ok(state) - } - .boxed() - } } #[async_trait] diff --git a/src/block/repair.rs b/src/block/repair.rs new file mode 100644 index 00000000..0445527c --- /dev/null +++ b/src/block/repair.rs @@ -0,0 +1,204 @@ +use core::ops::Bound; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::fs; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::tranquilizer::Tranquilizer; + +use crate::manager::*; + +pub struct RepairWorker { + manager: Arc<BlockManager>, + next_start: Option<Hash>, + block_iter: Option<BlockStoreIterator>, +} + +impl RepairWorker { + pub fn new(manager: Arc<BlockManager>) -> Self { + Self { + manager, + next_start: None, + block_iter: None, + } + } +} + +#[async_trait] +impl Worker for RepairWorker { + fn name(&self) -> String { + "Block repair worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.block_iter.as_mut() { + None => { + // Phase 1: Repair blocks from RC table. + + // We have to do this complicated two-step process where we first read a bunch + // of hashes from the RC table, and then insert them in the to-resync queue, + // because of SQLite. Basically, as long as we have an iterator on a DB table, + // we can't do anything else on the DB. The naive approach (which we had previously) + // of just iterating on the RC table and inserting items one to one in the resync + // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. + // This is mostly because the Rust bindings for SQLite assume a worst-case scenario + // where SQLite is not compiled in thread-safe mode, so we have to wrap everything + // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). + let mut batch_of_hashes = vec![]; + let start_bound = match self.next_start.as_ref() { + None => Bound::Unbounded, + Some(x) => Bound::Excluded(x.as_slice()), + }; + for entry in self + .manager + .rc + .rc + .range::<&[u8], _>((start_bound, Bound::Unbounded))? + { + let (hash, _) = entry?; + let hash = Hash::try_from(&hash[..]).unwrap(); + batch_of_hashes.push(hash); + if batch_of_hashes.len() >= 1000 { + break; + } + } + if batch_of_hashes.is_empty() { + // move on to phase 2 + self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?); + return Ok(WorkerStatus::Busy); + } + + for hash in batch_of_hashes.into_iter() { + self.manager.put_to_resync(&hash, Duration::from_secs(0))?; + self.next_start = Some(hash) + } + + Ok(WorkerStatus::Busy) + } + Some(bi) => { + // Phase 2: Repair blocks actually on disk + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. + if let Some(hash) = bi.next().await? { + self.manager.put_to_resync(&hash, Duration::from_secs(0))?; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Done) + } + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() + } +} + +// ---- + +pub struct ScrubWorker { + manager: Arc<BlockManager>, + iterator: BlockStoreIterator, + tranquilizer: Tranquilizer, + tranquility: u32, +} + +impl ScrubWorker { + pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> { + let iterator = BlockStoreIterator::new(&manager).await?; + Ok(Self { + manager, + iterator, + tranquilizer: Tranquilizer::new(30), + tranquility, + }) + } +} + +#[async_trait] +impl Worker for ScrubWorker { + fn name(&self) -> String { + "Block scrub worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + self.tranquilizer.reset(); + if let Some(hash) = self.iterator.next().await? { + let _ = self.manager.read_block(&hash).await; + self.tranquilizer.tranquilize(self.tranquility).await; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Done) + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + unreachable!() + } +} + +// ---- + +struct BlockStoreIterator { + path: Vec<fs::ReadDir>, +} + +impl BlockStoreIterator { + async fn new(manager: &BlockManager) -> Result<Self, Error> { + let root_dir = manager.data_dir.clone(); + let read_root_dir = fs::read_dir(&root_dir).await?; + Ok(Self { + path: vec![read_root_dir], + }) + } + + async fn next(&mut self) -> Result<Option<Hash>, Error> { + loop { + if let Some(reader) = self.path.last_mut() { + if let Some(data_dir_ent) = reader.next_entry().await? { + let name = data_dir_ent.file_name(); + let name = if let Ok(n) = name.into_string() { + n + } else { + continue; + }; + let ent_type = data_dir_ent.file_type().await?; + + let name = name.strip_suffix(".zst").unwrap_or(&name); + if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?; + self.path.push(read_child_dir); + continue; + } else if name.len() == 64 { + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + return Ok(Some(hash.into())); + } + } else { + self.path.pop(); + continue; + } + } else { + return Ok(None); + } + } + } +} |