diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-21 16:00:08 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-21 16:00:08 +0200 |
commit | ba1ace6cf6edcea58aa904ccf6190155a6ac7c5e (patch) | |
tree | e67082848c333fe73117bf7ebcf652817d25e444 /src/block/manager.rs | |
parent | a855c54bdb1a6912e99a6d64ee97bc63c700f29f (diff) | |
download | garage-ba1ace6cf6edcea58aa904ccf6190155a6ac7c5e.tar.gz garage-ba1ace6cf6edcea58aa904ccf6190155a6ac7c5e.zip |
Block repair with new worker semantics
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 160 |
1 files changed, 4 insertions, 156 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 8a131270..54368faf 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,7 +1,5 @@ -use core::ops::Bound; - use std::convert::TryInto; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -94,7 +92,7 @@ pub struct BlockManager { mutation_lock: Mutex<BlockManagerLocked>, - rc: BlockRc, + pub(crate) rc: BlockRc, resync_queue: CountedTree, resync_notify: Notify, @@ -225,90 +223,6 @@ impl BlockManager { Ok(()) } - /// Launch the repair procedure on the data store - /// - /// This will list all blocks locally present, as well as those - /// that are required because of refcount > 0, and will try - /// to fix any mismatch between the two. - pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - // 1. Repair blocks from RC table. - let mut next_start: Option<Hash> = None; - loop { - // We have to do this complicated two-step process where we first read a bunch - // of hashes from the RC table, and then insert them in the to-resync queue, - // because of SQLite. Basically, as long as we have an iterator on a DB table, - // we can't do anything else on the DB. The naive approach (which we had previously) - // of just iterating on the RC table and inserting items one to one in the resync - // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. - // This is mostly because the Rust bindings for SQLite assume a worst-case scenario - // where SQLite is not compiled in thread-safe mode, so we have to wrap everything - // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). - let mut batch_of_hashes = vec![]; - let start_bound = match next_start.as_ref() { - None => Bound::Unbounded, - Some(x) => Bound::Excluded(x.as_slice()), - }; - for entry in self - .rc - .rc - .range::<&[u8], _>((start_bound, Bound::Unbounded))? - { - let (hash, _) = entry?; - let hash = Hash::try_from(&hash[..]).unwrap(); - batch_of_hashes.push(hash); - if batch_of_hashes.len() >= 1000 { - break; - } - } - if batch_of_hashes.is_empty() { - break; - } - - for hash in batch_of_hashes.into_iter() { - self.put_to_resync(&hash, Duration::from_secs(0))?; - next_start = Some(hash) - } - - if *must_exit.borrow() { - return Ok(()); - } - } - - // 2. Repair blocks actually on disk - // Lists all blocks on disk and adds them to the resync queue. - // This allows us to find blocks we are storing but don't actually need, - // so that we can offload them if necessary and then delete them locally. - self.for_each_file( - (), - move |_, hash| async move { - self.put_to_resync(&hash, Duration::from_secs(0)) - .map_err(Into::into) - }, - must_exit, - ) - .await - } - - /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by - /// this function. - pub async fn scrub_data_store( - &self, - must_exit: &watch::Receiver<bool>, - tranquility: u32, - ) -> Result<(), Error> { - let tranquilizer = Tranquilizer::new(30); - self.for_each_file( - tranquilizer, - move |mut tranquilizer, hash| async move { - let _ = self.read_block(&hash).await; - tranquilizer.tranquilize(tranquility).await; - Ok(tranquilizer) - }, - must_exit, - ) - .await - } - /// Get lenght of resync queue pub fn resync_queue_len(&self) -> Result<usize, Error> { // This currently can't return an error because the CountedTree hack @@ -397,7 +311,7 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { + pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { let data = self .read_block_internal(hash) .bound_record_duration(&self.metrics.block_read_duration) @@ -575,7 +489,7 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { + pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } @@ -784,72 +698,6 @@ impl BlockManager { Ok(()) } - - // ---- Utility: iteration on files in the data directory ---- - - async fn for_each_file<F, Fut, State>( - &self, - state: State, - mut f: F, - must_exit: &watch::Receiver<bool>, - ) -> Result<(), Error> - where - F: FnMut(State, Hash) -> Fut + Send, - Fut: Future<Output = Result<State, Error>> + Send, - State: Send, - { - self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit) - .await - .map(|_| ()) - } - - fn for_each_file_rec<'a, F, Fut, State>( - &'a self, - path: &'a Path, - mut state: State, - f: &'a mut F, - must_exit: &'a watch::Receiver<bool>, - ) -> BoxFuture<'a, Result<State, Error>> - where - F: FnMut(State, Hash) -> Fut + Send, - Fut: Future<Output = Result<State, Error>> + Send, - State: Send + 'a, - { - async move { - let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { - if *must_exit.borrow() { - break; - } - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - state = self - .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) - .await?; - } else if name.len() == 64 { - let hash_bytes = if let Ok(h) = hex::decode(&name) { - h - } else { - continue; - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - state = f(state, hash.into()).await?; - } - } - Ok(state) - } - .boxed() - } } #[async_trait] |