aboutsummaryrefslogtreecommitdiff
path: root/src/util/background
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-23 17:05:11 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-23 17:05:11 +0200
commit59b43914d4a9ae9a50ae79fee61b1a46bff941f9 (patch)
tree5ca8cdeb754b47a855d142891e41214b77e88c52 /src/util/background
parentf82cf164f5ab525560415355503c04315901a739 (diff)
downloadgarage-59b43914d4a9ae9a50ae79fee61b1a46bff941f9.tar.gz
garage-59b43914d4a9ae9a50ae79fee61b1a46bff941f9.zip
(to test) error reporting and throttling at higher layer
Diffstat (limited to 'src/util/background')
-rw-r--r--src/util/background/mod.rs3
-rw-r--r--src/util/background/worker.rs40
2 files changed, 37 insertions, 6 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 92090a1a..f7e15b80 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -31,6 +31,9 @@ pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
pub status: WorkerStatus,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<String>,
}
impl BackgroundRunner {
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index f933fc06..e4a04250 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -17,6 +17,7 @@ use crate::error::Error;
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus {
Busy,
+ Throttled(f32),
Idle,
Done,
}
@@ -82,14 +83,17 @@ impl WorkerProcessor {
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
let stop_signal_worker = self.stop_signal.clone();
- workers.push(async move {
- let mut worker = WorkerHandler {
+ let mut worker = WorkerHandler {
task_id,
stop_signal,
stop_signal_worker,
worker: new_worker,
status: WorkerStatus::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
};
+ workers.push(async move {
worker.step().await;
worker
}.boxed());
@@ -98,21 +102,31 @@ impl WorkerProcessor {
worker = await_next_worker => {
if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
+
+ // Save worker info
let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.status = worker.status;
i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
}
None => {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
status: worker.status,
info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
});
}
}
- // TODO save new worker status somewhere
+
if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else {
@@ -169,6 +183,9 @@ struct WorkerHandler {
stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<String>,
}
impl WorkerHandler {
@@ -177,6 +194,7 @@ impl WorkerHandler {
WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.status = s;
+ self.consecutive_errors = 0;
}
Err(e) => {
error!(
@@ -185,11 +203,21 @@ impl WorkerHandler {
self.task_id,
e
);
- // Sleep a bit so that error won't repeat immediately
- // (TODO good way to handle errors)
- tokio::time::sleep(Duration::from_secs(10)).await;
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some(format!("{}", e));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.status = WorkerStatus::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
}
},
+ WorkerStatus::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ tokio::time::sleep(Duration::from_secs_f32(delay)).await;
+ self.status = WorkerStatus::Busy;
+ }
WorkerStatus::Idle => {
if *self.stop_signal.borrow() {
select! {