aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs70
1 files changed, 30 insertions, 40 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index c43c0b97..35d3871a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
-use tokio::time::Instant;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
+use garage_util::token_bucket::TokenBucket;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -209,6 +209,35 @@ impl BlockManager {
.await
}
+ /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
+ /// this function.
+ pub async fn scrub_data_store(
+ &self,
+ must_exit: &watch::Receiver<bool>,
+ speed_limit: Option<usize>,
+ ) -> Result<(), Error> {
+ let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
+ self.for_each_file(
+ token_bucket,
+ move |mut token_bucket, hash| {
+ async move {
+ let len = match self.read_block(&hash).await {
+ Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
+ Ok(_) => unreachable!(),
+ Err(_) => 0, // resync and warn message made by read_block if necessary
+ };
+
+ if let Some(tb) = &mut token_bucket {
+ tb.take(len as u64).await;
+ }
+ Ok(token_bucket)
+ }
+ },
+ must_exit,
+ )
+ .await
+ }
+
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
@@ -553,45 +582,6 @@ impl BlockManager {
}
.boxed()
}
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn verify_data_store_integrity(
- &self,
- must_exit: &watch::Receiver<bool>,
- speed_limit: Option<usize>,
- ) -> Result<(), Error> {
- let last_refill = Instant::now();
- let token_left = speed_limit.unwrap_or(0);
- self.for_each_file(
- (last_refill, token_left),
- move |(last_refill, token_left), hash| {
- async move {
- let len = match self.read_block(&hash).await {
- Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
- Ok(_) => unreachable!(),
- Err(_) => 0, // resync and warn message made by read_block if necessary
- };
-
- if let Some(speed_limit) = speed_limit {
- // throttling logic
- if let Some(t) = token_left.checked_sub(len) {
- // token bucket not empty yet
- Ok((last_refill, t))
- } else {
- // token bucket empty. Sleep and refill
- tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
- Ok((Instant::now(), speed_limit))
- }
- } else {
- Ok((last_refill, token_left)) // actually not used
- }
- }
- },
- must_exit,
- )
- .await
- }
}
#[async_trait]