diff options
-rw-r--r-- | src/block/manager.rs | 15 | ||||
-rw-r--r-- | src/block/repair.rs | 3 | ||||
-rw-r--r-- | src/garage/admin.rs | 4 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 16 | ||||
-rw-r--r-- | src/model/index_counter.rs | 12 | ||||
-rw-r--r-- | src/util/background/mod.rs | 3 | ||||
-rw-r--r-- | src/util/background/worker.rs | 40 | ||||
-rw-r--r-- | src/util/tranquilizer.rs | 21 |
8 files changed, 88 insertions, 26 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 30d6e792..db73ecbc 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -734,12 +734,9 @@ impl Worker for ResyncWorker { ) -> Result<WorkerStatus, Error> { self.tranquilizer.reset(); match self.manager.resync_iter().await { - Ok(ResyncIterResult::BusyDidSomething) => { - self.tranquilizer - .tranquilize(self.manager.background_tranquility) - .await; - Ok(WorkerStatus::Busy) - } + Ok(ResyncIterResult::BusyDidSomething) => Ok(self + .tranquilizer + .tranquilize_worker(self.manager.background_tranquility)), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -750,10 +747,8 @@ impl Worker for ResyncWorker { // We don't really know how to handle them so just ¯\_(ツ)_/¯ // (there is kind of an assumption that Sled won't error on us, // if it does there is not much we can do -- TODO should we just panic?) - error!( - "Could not do a resync iteration: {} (this is a very bad error)", - e - ); + // Here we just give the error to the worker manager, + // it will print it to the logs and increment a counter Err(e.into()) } } diff --git a/src/block/repair.rs b/src/block/repair.rs index 0445527c..a5c01629 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -138,8 +138,7 @@ impl Worker for ScrubWorker { self.tranquilizer.reset(); if let Some(hash) = self.iterator.next().await? { let _ = self.manager.read_block(&hash).await; - self.tranquilizer.tranquilize(self.tranquility).await; - Ok(WorkerStatus::Busy) + Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) } else { Ok(WorkerStatus::Done) } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index f307bea1..c98f5142 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -835,7 +835,9 @@ impl AdminRpcHandler { let workers = if busy { workers .into_iter() - .filter(|(_, w)| w.status == WorkerStatus::Busy) + .filter(|(_, w)| { + matches!(w.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) + }) .collect() } else { workers diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 81361864..ddb353f9 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -242,7 +242,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) { wi.sort_by_key(|(tid, info)| { ( match info.status { - WorkerStatus::Busy => 0, + WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0, WorkerStatus::Idle => 1, WorkerStatus::Done => 2, }, @@ -256,6 +256,20 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) { if let Some(i) = &info.info { table.push(format!("\t\t{}", i)); } + if info.consecutive_errors > 0 { + table.push(format!( + "\t\t{} CONSECUTIVE ERRORS ({} total), last: {}", + info.consecutive_errors, + info.errors, + info.last_error.as_deref().unwrap_or("(?)") + )); + } else if info.errors > 0 { + table.push(format!( + "\t\t{} errors, last: {}", + info.errors, + info.last_error.as_deref().unwrap_or("(?)") + )); + } } format_table(table); } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 4318a064..9e29b421 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -2,7 +2,6 @@ use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -408,6 +407,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { format!("{} index counter propagator", T::COUNTER_TABLE_NAME) } + fn info(&self) -> Option<String> { + Some(format!("{} items in queue", self.buf.len())) + } + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { // This loop batches updates to counters to be sent all at once. // They are sent once the propagate_rx channel has been emptied (or is closed). @@ -429,9 +432,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); return Ok(WorkerStatus::Done); } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors); - tokio::time::sleep(Duration::from_secs(5)).await; - return Ok(WorkerStatus::Busy); + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); } else { self.buf.clear(); self.errors = 0; diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 92090a1a..f7e15b80 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -31,6 +31,9 @@ pub struct WorkerInfo { pub name: String, pub info: Option<String>, pub status: WorkerStatus, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<String>, } impl BackgroundRunner { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f933fc06..e4a04250 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -17,6 +17,7 @@ use crate::error::Error; #[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] pub enum WorkerStatus { Busy, + Throttled(f32), Idle, Done, } @@ -82,14 +83,17 @@ impl WorkerProcessor { next_task_id += 1; let stop_signal = self.stop_signal.clone(); let stop_signal_worker = self.stop_signal.clone(); - workers.push(async move { - let mut worker = WorkerHandler { + let mut worker = WorkerHandler { task_id, stop_signal, stop_signal_worker, worker: new_worker, status: WorkerStatus::Busy, + errors: 0, + consecutive_errors: 0, + last_error: None, }; + workers.push(async move { worker.step().await; worker }.boxed()); @@ -98,21 +102,31 @@ impl WorkerProcessor { worker = await_next_worker => { if let Some(mut worker) = worker { trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + + // Save worker info let mut wi = self.worker_info.lock().unwrap(); match wi.get_mut(&worker.task_id) { Some(i) => { i.status = worker.status; i.info = worker.worker.info(); + i.errors = worker.errors; + i.consecutive_errors = worker.consecutive_errors; + if worker.last_error.is_some() { + i.last_error = worker.last_error.take(); + } } None => { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), status: worker.status, info: worker.worker.info(), + errors: worker.errors, + consecutive_errors: worker.consecutive_errors, + last_error: worker.last_error.take(), }); } } - // TODO save new worker status somewhere + if worker.status == WorkerStatus::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); } else { @@ -169,6 +183,9 @@ struct WorkerHandler { stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, status: WorkerStatus, + errors: usize, + consecutive_errors: usize, + last_error: Option<String>, } impl WorkerHandler { @@ -177,6 +194,7 @@ impl WorkerHandler { WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { self.status = s; + self.consecutive_errors = 0; } Err(e) => { error!( @@ -185,11 +203,21 @@ impl WorkerHandler { self.task_id, e ); - // Sleep a bit so that error won't repeat immediately - // (TODO good way to handle errors) - tokio::time::sleep(Duration::from_secs(10)).await; + self.errors += 1; + self.consecutive_errors += 1; + self.last_error = Some(format!("{}", e)); + // Sleep a bit so that error won't repeat immediately, exponential backoff + // strategy (min 1sec, max ~60sec) + self.status = WorkerStatus::Throttled( + (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), + ); } }, + WorkerStatus::Throttled(delay) => { + // Sleep for given delay and go back to busy state + tokio::time::sleep(Duration::from_secs_f32(delay)).await; + self.status = WorkerStatus::Busy; + } WorkerStatus::Idle => { if *self.stop_signal.borrow() { select! { diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index 28711387..f0c2b410 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -3,6 +3,8 @@ use std::time::{Duration, Instant}; use tokio::time::sleep; +use crate::background::WorkerStatus; + /// A tranquilizer is a helper object that is used to make /// background operations not take up too much time. /// @@ -33,7 +35,7 @@ impl Tranquilizer { } } - pub async fn tranquilize(&mut self, tranquility: u32) { + fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> { let observation = Instant::now() - self.last_step_begin; self.observations.push_back(observation); @@ -45,10 +47,25 @@ impl Tranquilizer { if !self.observations.is_empty() { let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32); + Some(delay) + } else { + None + } + } + + pub async fn tranquilize(&mut self, tranquility: u32) { + if let Some(delay) = self.tranquilize_internal(tranquility) { sleep(delay).await; + self.reset(); } + } - self.reset(); + #[must_use] + pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus { + match self.tranquilize_internal(tranquility) { + Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()), + None => WorkerStatus::Busy, + } } pub fn reset(&mut self) { |