diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-08 10:39:41 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-08 10:39:41 +0200 |
commit | d1cf1a0fa6952e84874f36f1dc66e4a978959d8f (patch) | |
tree | f65135f725b9f22ac2eac54e5023ab6a548549ec /src/util | |
parent | 0f660b086c23d13c91b0c55fd4d43017a09c1f4b (diff) | |
download | garage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.tar.gz garage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.zip |
Rename WorkerStatus to WorkerState
because it's a state in a state machine
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/background/job_worker.rs | 12 | ||||
-rw-r--r-- | src/util/background/mod.rs | 4 | ||||
-rw-r--r-- | src/util/background/worker.rs | 54 | ||||
-rw-r--r-- | src/util/tranquilizer.rs | 8 |
4 files changed, 39 insertions, 39 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs index fcdac582..6754382a 100644 --- a/src/util/background/job_worker.rs +++ b/src/util/background/job_worker.rs @@ -24,17 +24,17 @@ impl Worker for JobWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { match self.next_job.take() { - None => return Ok(WorkerStatus::Idle), + None => return Ok(WorkerState::Idle), Some(job) => { job.await?; - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { loop { match self.job_chan.lock().await.recv().await { Some((job, cancellable)) => { @@ -42,9 +42,9 @@ impl Worker for JobWorker { continue; } self.next_job = Some(job); - return WorkerStatus::Busy; + return WorkerState::Busy; } - None => return WorkerStatus::Done, + None => return WorkerState::Done, } } } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 636b9c13..619f5068 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; use worker::WorkerProcessor; -pub use worker::{Worker, WorkerStatus}; +pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; @@ -30,7 +30,7 @@ pub struct BackgroundRunner { pub struct WorkerInfo { pub name: String, pub info: Option<String>, - pub status: WorkerStatus, + pub state: WorkerState, pub errors: usize, pub consecutive_errors: usize, pub last_error: Option<(String, u64)>, diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index aadc677f..7f573a07 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -16,20 +16,20 @@ use crate::error::Error; use crate::time::now_msec; #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] -pub enum WorkerStatus { +pub enum WorkerState { Busy, Throttled(f32), Idle, Done, } -impl std::fmt::Display for WorkerStatus { +impl std::fmt::Display for WorkerState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - WorkerStatus::Busy => write!(f, "Busy"), - WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t), - WorkerStatus::Idle => write!(f, "Idle"), - WorkerStatus::Done => write!(f, "Done"), + WorkerState::Busy => write!(f, "Busy"), + WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Idle => write!(f, "Idle"), + WorkerState::Done => write!(f, "Done"), } } } @@ -43,18 +43,18 @@ pub trait Worker: Send { } /// Work: do a basic unit of work, if one is available (otherwise, should return - /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the + /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the /// middle of processing, it will only be interrupted at the last minute when Garage is trying /// to exit and this hasn't returned yet. This function may return an error to indicate that /// its unit of work could not be processed due to an error: the error will be logged and /// .work() will be called again after a short delay. - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>; /// Wait for work: await for some task to become available. This future can be interrupted in /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus; + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -100,7 +100,7 @@ impl WorkerProcessor { stop_signal, stop_signal_worker, worker: new_worker, - status: WorkerStatus::Busy, + state: WorkerState::Busy, errors: 0, consecutive_errors: 0, last_error: None, @@ -113,13 +113,13 @@ impl WorkerProcessor { } worker = await_next_worker => { if let Some(mut worker) = worker { - trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state); // Save worker info let mut wi = self.worker_info.lock().unwrap(); match wi.get_mut(&worker.task_id) { Some(i) => { - i.status = worker.status; + i.state = worker.state; i.info = worker.worker.info(); i.errors = worker.errors; i.consecutive_errors = worker.consecutive_errors; @@ -130,7 +130,7 @@ impl WorkerProcessor { None => { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), - status: worker.status, + state: worker.state, info: worker.worker.info(), errors: worker.errors, consecutive_errors: worker.consecutive_errors, @@ -139,7 +139,7 @@ impl WorkerProcessor { } } - if worker.status == WorkerStatus::Done { + if worker.state == WorkerState::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); } else { workers.push(async move { @@ -157,14 +157,14 @@ impl WorkerProcessor { let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { while let Some(mut worker) = workers.next().await { - if worker.status == WorkerStatus::Done { + if worker.state == WorkerState::Done { info!( "Worker {} (TID {}) exited", worker.worker.name(), worker.task_id ); } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status); + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); } else { workers.push( async move { @@ -193,7 +193,7 @@ struct WorkerHandler { stop_signal: watch::Receiver<bool>, stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, - status: WorkerStatus, + state: WorkerState, errors: usize, consecutive_errors: usize, last_error: Option<(String, u64)>, @@ -201,10 +201,10 @@ struct WorkerHandler { impl WorkerHandler { async fn step(&mut self) { - match self.status { - WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { + match self.state { + WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { - self.status = s; + self.state = s; self.consecutive_errors = 0; } Err(e) => { @@ -219,12 +219,12 @@ impl WorkerHandler { self.last_error = Some((format!("{}", e), now_msec())); // Sleep a bit so that error won't repeat immediately, exponential backoff // strategy (min 1sec, max ~60sec) - self.status = WorkerStatus::Throttled( + self.state = WorkerState::Throttled( (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), ); } }, - WorkerStatus::Throttled(delay) => { + WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state if !*self.stop_signal.borrow() { select! { @@ -232,13 +232,13 @@ impl WorkerHandler { _ = self.stop_signal.changed() => (), } } - self.status = WorkerStatus::Busy; + self.state = WorkerState::Busy; } - WorkerStatus::Idle => { + WorkerState::Idle => { if *self.stop_signal.borrow() { select! { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.status = new_st; + self.state = new_st; } _ = tokio::time::sleep(Duration::from_secs(1)) => { // stay in Idle state @@ -247,7 +247,7 @@ impl WorkerHandler { } else { select! { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.status = new_st; + self.state = new_st; } _ = self.stop_signal.changed() => { // stay in Idle state @@ -255,7 +255,7 @@ impl WorkerHandler { } } } - WorkerStatus::Done => unreachable!(), + WorkerState::Done => unreachable!(), } } } diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index 9c796f8b..fdb2918b 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use tokio::time::sleep; -use crate::background::WorkerStatus; +use crate::background::WorkerState; /// A tranquilizer is a helper object that is used to make /// background operations not take up too much time. @@ -61,10 +61,10 @@ impl Tranquilizer { } #[must_use] - pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus { + pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState { match self.tranquilize_internal(tranquility) { - Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()), - None => WorkerStatus::Busy, + Some(delay) => WorkerState::Throttled(delay.as_secs_f32()), + None => WorkerState::Busy, } } |