diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 121 |
1 files changed, 92 insertions, 29 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index d1ea1512..c43c0b97 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use tokio::time::Instant; use garage_util::data::*; use garage_util::error::Error; @@ -197,10 +198,15 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit) - .await?; - - Ok(()) + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. + self.for_each_file( + (), + move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) }, + must_exit, + ) + .await } /// Get lenght of resync queue @@ -485,50 +491,107 @@ impl BlockManager { Ok(()) } - fn repair_aux_read_dir_rec<'a>( + async fn for_each_file<F, Fut, State>( + &self, + state: State, + mut f: F, + must_exit: &watch::Receiver<bool>, + ) -> Result<(), Error> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future<Output = Result<State, Error>> + Send, + State: Send, + { + self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit) + .await + .map(|_| ()) + } + + fn for_each_file_rec<'a, F, Fut, State>( &'a self, path: &'a Path, + mut state: State, + f: &'a mut F, must_exit: &'a watch::Receiver<bool>, - ) -> BoxFuture<'a, Result<(), Error>> { - // Lists all blocks on disk and adds them to the resync queue. - // This allows us to find blocks we are storing but don't actually need, - // so that we can offload them if necessary and then delete them locally. + ) -> BoxFuture<'a, Result<State, Error>> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future<Output = Result<State, Error>> + Send, + State: Send + 'a, + { async move { let mut ls_data_dir = fs::read_dir(path).await?; - loop { - let data_dir_ent = ls_data_dir.next_entry().await?; - let data_dir_ent = match data_dir_ent { - Some(x) => x, - None => break, - }; + while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { + if *must_exit.borrow() { + break; + } + let name = data_dir_ent.file_name(); - let name = match name.into_string() { - Ok(x) => x, - Err(_) => continue, + let name = if let Ok(n) = name.into_string() { + n + } else { + continue; }; let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + state = self + .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) .await?; } else if name.len() == 64 { - let hash_bytes = match hex::decode(&name) { - Ok(h) => h, - Err(_) => continue, + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), Duration::from_secs(0))?; - } - - if *must_exit.borrow() { - break; + state = f(state, hash.into()).await?; } } - Ok(()) + Ok(state) } .boxed() } + + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn verify_data_store_integrity( + &self, + must_exit: &watch::Receiver<bool>, + speed_limit: Option<usize>, + ) -> Result<(), Error> { + let last_refill = Instant::now(); + let token_left = speed_limit.unwrap_or(0); + self.for_each_file( + (last_refill, token_left), + move |(last_refill, token_left), hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(speed_limit) = speed_limit { + // throttling logic + if let Some(t) = token_left.checked_sub(len) { + // token bucket not empty yet + Ok((last_refill, t)) + } else { + // token bucket empty. Sleep and refill + tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await; + Ok((Instant::now(), speed_limit)) + } + } else { + Ok((last_refill, token_left)) // actually not used + } + } + }, + must_exit, + ) + .await + } } #[async_trait] @@ -598,7 +661,7 @@ impl BlockManagerLocked { ); let path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; mgr.put_to_resync(hash, Duration::from_millis(0))?; Ok(()) |