aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-11-03 18:28:43 +0100
committerAlex Auvolat <alex@adnab.me>2021-11-04 13:26:59 +0100
commit2090a6187f7d106e0641bed4cac145ad5184995d (patch)
tree822dd5ff1adaaf4b1c4384b2b3604054f378770b /src/model/block.rs
parent6f13d083ab188060d2a2dc5f619070a445fe61ba (diff)
downloadgarage-2090a6187f7d106e0641bed4cac145ad5184995d.tar.gz
garage-2090a6187f7d106e0641bed4cac145ad5184995d.zip
Add tranquilizer mechanism to improve on token bucket mechanismtranquility
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs57
1 files changed, 32 insertions, 25 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 08002911..406abf7b 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -14,7 +14,7 @@ use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
-use garage_util::token_bucket::TokenBucket;
+use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -29,6 +29,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
+pub const BACKGROUND_TRANQUILITY: u32 = 3;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
@@ -214,24 +215,15 @@ impl BlockManager {
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
- speed_limit: Option<usize>,
+ tranquility: u32,
) -> Result<(), Error> {
- let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
+ let tranquilizer = Tranquilizer::new(30);
self.for_each_file(
- token_bucket,
- move |mut token_bucket, hash| {
- async move {
- let len = match self.read_block(&hash).await {
- Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
- Ok(_) => unreachable!(),
- Err(_) => 0, // resync and warn message made by read_block if necessary
- };
-
- if let Some(tb) = &mut token_bucket {
- tb.take(len as u64).await;
- }
- Ok(token_bucket)
- }
+ tranquilizer,
+ move |mut tranquilizer, hash| async move {
+ let _ = self.read_block(&hash).await;
+ tranquilizer.tranquilize(tranquility).await;
+ Ok(tranquilizer)
},
must_exit,
)
@@ -381,18 +373,32 @@ impl BlockManager {
}
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ let mut tranquilizer = Tranquilizer::new(30);
+
while !*must_exit.borrow() {
- if let Err(e) = self.resync_iter(&mut must_exit).await {
- warn!("Error in block resync loop: {}", e);
- select! {
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {},
- _ = must_exit.changed().fuse() => {},
+ match self.resync_iter(&mut must_exit).await {
+ Ok(true) => {
+ tranquilizer.tranquilize(BACKGROUND_TRANQUILITY).await;
+ }
+ Ok(false) => {
+ tranquilizer.reset();
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ error!(
+ "Could not do a resync iteration: {} (this is a very bad error)",
+ e
+ );
+ tranquilizer.reset();
}
}
}
}
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
@@ -403,7 +409,7 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
- res?; // propagate error to delay main loop
+ Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
@@ -412,14 +418,15 @@ impl BlockManager {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
+ Ok(false)
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
+ Ok(false)
}
- Ok(())
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {