aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs15
-rw-r--r--src/block/repair.rs3
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/util.rs16
-rw-r--r--src/model/index_counter.rs12
-rw-r--r--src/util/background/mod.rs3
-rw-r--r--src/util/background/worker.rs40
-rw-r--r--src/util/tranquilizer.rs21
8 files changed, 88 insertions, 26 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 30d6e792..db73ecbc 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -734,12 +734,9 @@ impl Worker for ResyncWorker {
) -> Result<WorkerStatus, Error> {
self.tranquilizer.reset();
match self.manager.resync_iter().await {
- Ok(ResyncIterResult::BusyDidSomething) => {
- self.tranquilizer
- .tranquilize(self.manager.background_tranquility)
- .await;
- Ok(WorkerStatus::Busy)
- }
+ Ok(ResyncIterResult::BusyDidSomething) => Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.manager.background_tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@@ -750,10 +747,8 @@ impl Worker for ResyncWorker {
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
+ // Here we just give the error to the worker manager,
+ // it will print it to the logs and increment a counter
Err(e.into())
}
}
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 0445527c..a5c01629 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -138,8 +138,7 @@ impl Worker for ScrubWorker {
self.tranquilizer.reset();
if let Some(hash) = self.iterator.next().await? {
let _ = self.manager.read_block(&hash).await;
- self.tranquilizer.tranquilize(self.tranquility).await;
- Ok(WorkerStatus::Busy)
+ Ok(self.tranquilizer.tranquilize_worker(self.tranquility))
} else {
Ok(WorkerStatus::Done)
}
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index f307bea1..c98f5142 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -835,7 +835,9 @@ impl AdminRpcHandler {
let workers = if busy {
workers
.into_iter()
- .filter(|(_, w)| w.status == WorkerStatus::Busy)
+ .filter(|(_, w)| {
+ matches!(w.status, WorkerStatus::Busy | WorkerStatus::Throttled(_))
+ })
.collect()
} else {
workers
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 81361864..ddb353f9 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -242,7 +242,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
wi.sort_by_key(|(tid, info)| {
(
match info.status {
- WorkerStatus::Busy => 0,
+ WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0,
WorkerStatus::Idle => 1,
WorkerStatus::Done => 2,
},
@@ -256,6 +256,20 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
if let Some(i) = &info.info {
table.push(format!("\t\t{}", i));
}
+ if info.consecutive_errors > 0 {
+ table.push(format!(
+ "\t\t{} CONSECUTIVE ERRORS ({} total), last: {}",
+ info.consecutive_errors,
+ info.errors,
+ info.last_error.as_deref().unwrap_or("(?)")
+ ));
+ } else if info.errors > 0 {
+ table.push(format!(
+ "\t\t{} errors, last: {}",
+ info.errors,
+ info.last_error.as_deref().unwrap_or("(?)")
+ ));
+ }
}
format_table(table);
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 4318a064..9e29b421 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -2,7 +2,6 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -408,6 +407,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
}
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items in queue", self.buf.len()))
+ }
+
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
// This loop batches updates to counters to be sent all at once.
// They are sent once the propagate_rx channel has been emptied (or is closed).
@@ -429,9 +432,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
return Ok(WorkerStatus::Done);
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors);
- tokio::time::sleep(Duration::from_secs(5)).await;
- return Ok(WorkerStatus::Busy);
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
} else {
self.buf.clear();
self.errors = 0;
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 92090a1a..f7e15b80 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -31,6 +31,9 @@ pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
pub status: WorkerStatus,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<String>,
}
impl BackgroundRunner {
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index f933fc06..e4a04250 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -17,6 +17,7 @@ use crate::error::Error;
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus {
Busy,
+ Throttled(f32),
Idle,
Done,
}
@@ -82,14 +83,17 @@ impl WorkerProcessor {
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
let stop_signal_worker = self.stop_signal.clone();
- workers.push(async move {
- let mut worker = WorkerHandler {
+ let mut worker = WorkerHandler {
task_id,
stop_signal,
stop_signal_worker,
worker: new_worker,
status: WorkerStatus::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
};
+ workers.push(async move {
worker.step().await;
worker
}.boxed());
@@ -98,21 +102,31 @@ impl WorkerProcessor {
worker = await_next_worker => {
if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
+
+ // Save worker info
let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.status = worker.status;
i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
}
None => {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
status: worker.status,
info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
});
}
}
- // TODO save new worker status somewhere
+
if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else {
@@ -169,6 +183,9 @@ struct WorkerHandler {
stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<String>,
}
impl WorkerHandler {
@@ -177,6 +194,7 @@ impl WorkerHandler {
WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.status = s;
+ self.consecutive_errors = 0;
}
Err(e) => {
error!(
@@ -185,11 +203,21 @@ impl WorkerHandler {
self.task_id,
e
);
- // Sleep a bit so that error won't repeat immediately
- // (TODO good way to handle errors)
- tokio::time::sleep(Duration::from_secs(10)).await;
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some(format!("{}", e));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.status = WorkerStatus::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
}
},
+ WorkerStatus::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ tokio::time::sleep(Duration::from_secs_f32(delay)).await;
+ self.status = WorkerStatus::Busy;
+ }
WorkerStatus::Idle => {
if *self.stop_signal.borrow() {
select! {
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 28711387..f0c2b410 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
+use crate::background::WorkerStatus;
+
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@@ -33,7 +35,7 @@ impl Tranquilizer {
}
}
- pub async fn tranquilize(&mut self, tranquility: u32) {
+ fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
@@ -45,10 +47,25 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ Some(delay)
+ } else {
+ None
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
+ self.reset();
}
+ }
- self.reset();
+ #[must_use]
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus {
+ match self.tranquilize_internal(tranquility) {
+ Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()),
+ None => WorkerStatus::Busy,
+ }
}
pub fn reset(&mut self) {