diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 08002911..406abf7b 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -14,7 +14,7 @@ use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; -use garage_util::token_bucket::TokenBucket; +use garage_util::tranquilizer::Tranquilizer; use garage_rpc::system::System; use garage_rpc::*; @@ -29,6 +29,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; +pub const BACKGROUND_TRANQUILITY: u32 = 3; const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); @@ -214,24 +215,15 @@ impl BlockManager { pub async fn scrub_data_store( &self, must_exit: &watch::Receiver<bool>, - speed_limit: Option<usize>, + tranquility: u32, ) -> Result<(), Error> { - let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + let tranquilizer = Tranquilizer::new(30); self.for_each_file( - token_bucket, - move |mut token_bucket, hash| { - async move { - let len = match self.read_block(&hash).await { - Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), - Ok(_) => unreachable!(), - Err(_) => 0, // resync and warn message made by read_block if necessary - }; - - if let Some(tb) = &mut token_bucket { - tb.take(len as u64).await; - } - Ok(token_bucket) - } + tranquilizer, + move |mut tranquilizer, hash| async move { + let _ = self.read_block(&hash).await; + tranquilizer.tranquilize(tranquility).await; + Ok(tranquilizer) }, must_exit, ) @@ -381,18 +373,32 @@ impl BlockManager { } async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { + let mut tranquilizer = Tranquilizer::new(30); + while !*must_exit.borrow() { - if let Err(e) = self.resync_iter(&mut must_exit).await { - warn!("Error in block resync loop: {}", e); - select! { - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {}, - _ = must_exit.changed().fuse() => {}, + match self.resync_iter(&mut must_exit).await { + Ok(true) => { + tranquilizer.tranquilize(BACKGROUND_TRANQUILITY).await; + } + Ok(false) => { + tranquilizer.reset(); + } + Err(e) => { + // The errors that we have here are only Sled errors + // We don't really know how to handle them so just ¯\_(ツ)_/¯ + // (there is kind of an assumption that Sled won't error on us, + // if it does there is not much we can do -- TODO should we just panic?) + error!( + "Could not do a resync iteration: {} (this is a very bad error)", + e + ); + tranquilizer.reset(); } } } } - async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let now = now_msec(); @@ -403,7 +409,7 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } - res?; // propagate error to delay main loop + Ok(true) } else { self.resync_queue.insert(time_bytes, hash_bytes)?; let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); @@ -412,14 +418,15 @@ impl BlockManager { _ = self.resync_notify.notified().fuse() => {}, _ = must_exit.changed().fuse() => {}, } + Ok(false) } } else { select! { _ = self.resync_notify.notified().fuse() => {}, _ = must_exit.changed().fuse() => {}, } + Ok(false) } - Ok(()) } async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { |