diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-23 17:05:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-23 17:05:11 +0200 |
commit | 59b43914d4a9ae9a50ae79fee61b1a46bff941f9 (patch) | |
tree | 5ca8cdeb754b47a855d142891e41214b77e88c52 /src/util/background | |
parent | f82cf164f5ab525560415355503c04315901a739 (diff) | |
download | garage-59b43914d4a9ae9a50ae79fee61b1a46bff941f9.tar.gz garage-59b43914d4a9ae9a50ae79fee61b1a46bff941f9.zip |
(to test) error reporting and throttling at higher layer
Diffstat (limited to 'src/util/background')
-rw-r--r-- | src/util/background/mod.rs | 3 | ||||
-rw-r--r-- | src/util/background/worker.rs | 40 |
2 files changed, 37 insertions, 6 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 92090a1a..f7e15b80 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -31,6 +31,9 @@ pub struct WorkerInfo { pub name: String, pub info: Option<String>, pub status: WorkerStatus, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<String>, } impl BackgroundRunner { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f933fc06..e4a04250 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -17,6 +17,7 @@ use crate::error::Error; #[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum WorkerStatus { Busy, + Throttled(f32), Idle, Done, } @@ -82,14 +83,17 @@ impl WorkerProcessor { next_task_id += 1; let stop_signal = self.stop_signal.clone(); let stop_signal_worker = self.stop_signal.clone(); - workers.push(async move { - let mut worker = WorkerHandler { + let mut worker = WorkerHandler { task_id, stop_signal, stop_signal_worker, worker: new_worker, status: WorkerStatus::Busy, + errors: 0, + consecutive_errors: 0, + last_error: None, }; + workers.push(async move { worker.step().await; worker }.boxed()); @@ -98,21 +102,31 @@ impl WorkerProcessor { worker = await_next_worker => { if let Some(mut worker) = worker { trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + + // Save worker info let mut wi = self.worker_info.lock().unwrap(); match wi.get_mut(&worker.task_id) { Some(i) => { i.status = worker.status; i.info = worker.worker.info(); + i.errors = worker.errors; + i.consecutive_errors = worker.consecutive_errors; + if worker.last_error.is_some() { + i.last_error = worker.last_error.take(); + } } None => { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), status: worker.status, info: worker.worker.info(), + errors: worker.errors, + consecutive_errors: worker.consecutive_errors, + last_error: worker.last_error.take(), }); } } - // TODO save new worker status somewhere + if worker.status == WorkerStatus::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); } else { @@ -169,6 +183,9 @@ struct WorkerHandler { stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, status: WorkerStatus, + errors: usize, + consecutive_errors: usize, + last_error: Option<String>, } impl WorkerHandler { @@ -177,6 +194,7 @@ impl WorkerHandler { WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { self.status = s; + self.consecutive_errors = 0; } Err(e) => { error!( @@ -185,11 +203,21 @@ impl WorkerHandler { self.task_id, e ); - // Sleep a bit so that error won't repeat immediately - // (TODO good way to handle errors) - tokio::time::sleep(Duration::from_secs(10)).await; + self.errors += 1; + self.consecutive_errors += 1; + self.last_error = Some(format!("{}", e)); + // Sleep a bit so that error won't repeat immediately, exponential backoff + // strategy (min 1sec, max ~60sec) + self.status = WorkerStatus::Throttled( + (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), + ); } }, + WorkerStatus::Throttled(delay) => { + // Sleep for given delay and go back to busy state + tokio::time::sleep(Duration::from_secs_f32(delay)).await; + self.status = WorkerStatus::Busy; + } WorkerStatus::Idle => { if *self.stop_signal.borrow() { select! { |