From 6b47c294f570b141a2349d5b6da537c0b64d165d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Oct 2021 10:36:04 +0200 Subject: Refactoring on repair commands --- src/model/block.rs | 70 +++++++++++++++++++++++------------------------------- 1 file changed, 30 insertions(+), 40 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index c43c0b97..35d3871a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use tokio::time::Instant; use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; +use garage_util::token_bucket::TokenBucket; use garage_rpc::system::System; use garage_rpc::*; @@ -209,6 +209,35 @@ impl BlockManager { .await } + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn scrub_data_store( + &self, + must_exit: &watch::Receiver, + speed_limit: Option, + ) -> Result<(), Error> { + let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + self.for_each_file( + token_bucket, + move |mut token_bucket, hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(tb) = &mut token_bucket { + tb.take(len as u64).await; + } + Ok(token_bucket) + } + }, + must_exit, + ) + .await + } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() @@ -553,45 +582,6 @@ impl BlockManager { } .boxed() } - - /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by - /// this function. - pub async fn verify_data_store_integrity( - &self, - must_exit: &watch::Receiver, - speed_limit: Option, - ) -> Result<(), Error> { - let last_refill = Instant::now(); - let token_left = speed_limit.unwrap_or(0); - self.for_each_file( - (last_refill, token_left), - move |(last_refill, token_left), hash| { - async move { - let len = match self.read_block(&hash).await { - Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), - Ok(_) => unreachable!(), - Err(_) => 0, // resync and warn message made by read_block if necessary - }; - - if let Some(speed_limit) = speed_limit { - // throttling logic - if let Some(t) = token_left.checked_sub(len) { - // token bucket not empty yet - Ok((last_refill, t)) - } else { - // token bucket empty. Sleep and refill - tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await; - Ok((Instant::now(), speed_limit)) - } - } else { - Ok((last_refill, token_left)) // actually not used - } - } - }, - must_exit, - ) - .await - } } #[async_trait] -- cgit v1.2.3