aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/garage/cli/structs.rs6
-rw-r--r--src/garage/repair.rs4
-rw-r--r--src/model/block.rs57
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/tranquilizer.rs57
5 files changed, 95 insertions, 30 deletions
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 620be9ef..0df6ef87 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -285,9 +285,9 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
- /// Limit on i/o speed, in B/s
- #[structopt(name = "limit")]
- limit: Option<usize>,
+ /// Tranquility factor (see tranquilizer documentation)
+ #[structopt(name = "tranquility", default_value = "2")]
+ tranquility: u32,
},
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index bfe7bf84..a786f1f1 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -51,11 +51,11 @@ impl Repair {
.repair_data_store(&must_exit)
.await?;
}
- RepairWhat::Scrub { limit } => {
+ RepairWhat::Scrub { tranquility } => {
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
- .scrub_data_store(&must_exit, limit)
+ .scrub_data_store(&must_exit, tranquility)
.await?;
}
}
diff --git a/src/model/block.rs b/src/model/block.rs
index 08002911..406abf7b 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -14,7 +14,7 @@ use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
-use garage_util::token_bucket::TokenBucket;
+use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -29,6 +29,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
+pub const BACKGROUND_TRANQUILITY: u32 = 3;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
@@ -214,24 +215,15 @@ impl BlockManager {
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
- speed_limit: Option<usize>,
+ tranquility: u32,
) -> Result<(), Error> {
- let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
+ let tranquilizer = Tranquilizer::new(30);
self.for_each_file(
- token_bucket,
- move |mut token_bucket, hash| {
- async move {
- let len = match self.read_block(&hash).await {
- Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
- Ok(_) => unreachable!(),
- Err(_) => 0, // resync and warn message made by read_block if necessary
- };
-
- if let Some(tb) = &mut token_bucket {
- tb.take(len as u64).await;
- }
- Ok(token_bucket)
- }
+ tranquilizer,
+ move |mut tranquilizer, hash| async move {
+ let _ = self.read_block(&hash).await;
+ tranquilizer.tranquilize(tranquility).await;
+ Ok(tranquilizer)
},
must_exit,
)
@@ -381,18 +373,32 @@ impl BlockManager {
}
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ let mut tranquilizer = Tranquilizer::new(30);
+
while !*must_exit.borrow() {
- if let Err(e) = self.resync_iter(&mut must_exit).await {
- warn!("Error in block resync loop: {}", e);
- select! {
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {},
- _ = must_exit.changed().fuse() => {},
+ match self.resync_iter(&mut must_exit).await {
+ Ok(true) => {
+ tranquilizer.tranquilize(BACKGROUND_TRANQUILITY).await;
+ }
+ Ok(false) => {
+ tranquilizer.reset();
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ error!(
+ "Could not do a resync iteration: {} (this is a very bad error)",
+ e
+ );
+ tranquilizer.reset();
}
}
}
}
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
@@ -403,7 +409,7 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
- res?; // propagate error to delay main loop
+ Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
@@ -412,14 +418,15 @@ impl BlockManager {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
+ Ok(false)
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
+ Ok(false)
}
- Ok(())
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
diff --git a/src/util/lib.rs b/src/util/lib.rs
index e2e01785..478b9ea4 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -10,3 +10,4 @@ pub mod error;
pub mod persister;
pub mod time;
pub mod token_bucket;
+pub mod tranquilizer;
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
new file mode 100644
index 00000000..28711387
--- /dev/null
+++ b/src/util/tranquilizer.rs
@@ -0,0 +1,57 @@
+use std::collections::VecDeque;
+use std::time::{Duration, Instant};
+
+use tokio::time::sleep;
+
+/// A tranquilizer is a helper object that is used to make
+/// background operations not take up too much time.
+///
+/// Background operations are done in a loop that does the following:
+/// - do one step of the background process
+/// - tranquilize, i.e. wait some time to not overload the system
+///
+/// The tranquilizer observes how long the steps take, and keeps
+/// in memory a number of observations. The tranquilize operation
+/// simply sleeps k * avg(observed step times), where k is
+/// the tranquility factor. For instance with a tranquility of 2,
+/// the tranquilizer will sleep on average 2 units of time for every
+/// 1 unit of time spent doing the background task.
+pub struct Tranquilizer {
+ n_observations: usize,
+ observations: VecDeque<Duration>,
+ sum_observations: Duration,
+ last_step_begin: Instant,
+}
+
+impl Tranquilizer {
+ pub fn new(n_observations: usize) -> Self {
+ Self {
+ n_observations,
+ observations: VecDeque::with_capacity(n_observations + 1),
+ sum_observations: Duration::ZERO,
+ last_step_begin: Instant::now(),
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ let observation = Instant::now() - self.last_step_begin;
+
+ self.observations.push_back(observation);
+ self.sum_observations += observation;
+
+ while self.observations.len() > self.n_observations {
+ self.sum_observations -= self.observations.pop_front().unwrap();
+ }
+
+ if !self.observations.is_empty() {
+ let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ sleep(delay).await;
+ }
+
+ self.reset();
+ }
+
+ pub fn reset(&mut self) {
+ self.last_step_begin = Instant::now();
+ }
+}