aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-08 10:39:41 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-08 10:39:41 +0200
commitd1cf1a0fa6952e84874f36f1dc66e4a978959d8f (patch)
treef65135f725b9f22ac2eac54e5023ab6a548549ec /src/util
parent0f660b086c23d13c91b0c55fd4d43017a09c1f4b (diff)
downloadgarage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.tar.gz
garage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.zip
Rename WorkerStatus to WorkerState
because it's a state in a state machine
Diffstat (limited to 'src/util')
-rw-r--r--src/util/background/job_worker.rs12
-rw-r--r--src/util/background/mod.rs4
-rw-r--r--src/util/background/worker.rs54
-rw-r--r--src/util/tranquilizer.rs8
4 files changed, 39 insertions, 39 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
index fcdac582..6754382a 100644
--- a/src/util/background/job_worker.rs
+++ b/src/util/background/job_worker.rs
@@ -24,17 +24,17 @@ impl Worker for JobWorker {
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
- ) -> Result<WorkerStatus, Error> {
+ ) -> Result<WorkerState, Error> {
match self.next_job.take() {
- None => return Ok(WorkerStatus::Idle),
+ None => return Ok(WorkerState::Idle),
Some(job) => {
job.await?;
- Ok(WorkerStatus::Busy)
+ Ok(WorkerState::Busy)
}
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
@@ -42,9 +42,9 @@ impl Worker for JobWorker {
continue;
}
self.next_job = Some(job);
- return WorkerStatus::Busy;
+ return WorkerState::Busy;
}
- None => return WorkerStatus::Done,
+ None => return WorkerState::Done,
}
}
}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 636b9c13..619f5068 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
use worker::WorkerProcessor;
-pub use worker::{Worker, WorkerStatus};
+pub use worker::{Worker, WorkerState};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
@@ -30,7 +30,7 @@ pub struct BackgroundRunner {
pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
- pub status: WorkerStatus,
+ pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index aadc677f..7f573a07 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -16,20 +16,20 @@ use crate::error::Error;
use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
-pub enum WorkerStatus {
+pub enum WorkerState {
Busy,
Throttled(f32),
Idle,
Done,
}
-impl std::fmt::Display for WorkerStatus {
+impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
- WorkerStatus::Busy => write!(f, "Busy"),
- WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t),
- WorkerStatus::Idle => write!(f, "Idle"),
- WorkerStatus::Done => write!(f, "Done"),
+ WorkerState::Busy => write!(f, "Busy"),
+ WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Idle => write!(f, "Idle"),
+ WorkerState::Done => write!(f, "Done"),
}
}
}
@@ -43,18 +43,18 @@ pub trait Worker: Send {
}
/// Work: do a basic unit of work, if one is available (otherwise, should return
- /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
+ /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying
/// to exit and this hasn't returned yet. This function may return an error to indicate that
/// its unit of work could not be processed due to an error: the error will be logged and
/// .work() will be called again after a short delay.
- async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
/// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
/// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
/// it to check if we are exiting.
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus;
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
@@ -100,7 +100,7 @@ impl WorkerProcessor {
stop_signal,
stop_signal_worker,
worker: new_worker,
- status: WorkerStatus::Busy,
+ state: WorkerState::Busy,
errors: 0,
consecutive_errors: 0,
last_error: None,
@@ -113,13 +113,13 @@ impl WorkerProcessor {
}
worker = await_next_worker => {
if let Some(mut worker) = worker {
- trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
+ trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
// Save worker info
let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) {
Some(i) => {
- i.status = worker.status;
+ i.state = worker.state;
i.info = worker.worker.info();
i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors;
@@ -130,7 +130,7 @@ impl WorkerProcessor {
None => {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
- status: worker.status,
+ state: worker.state,
info: worker.worker.info(),
errors: worker.errors,
consecutive_errors: worker.consecutive_errors,
@@ -139,7 +139,7 @@ impl WorkerProcessor {
}
}
- if worker.status == WorkerStatus::Done {
+ if worker.state == WorkerState::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else {
workers.push(async move {
@@ -157,14 +157,14 @@ impl WorkerProcessor {
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
- if worker.status == WorkerStatus::Done {
+ if worker.state == WorkerState::Done {
info!(
"Worker {} (TID {}) exited",
worker.worker.name(),
worker.task_id
);
} else if Instant::now() > drain_half_time {
- warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status);
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
} else {
workers.push(
async move {
@@ -193,7 +193,7 @@ struct WorkerHandler {
stop_signal: watch::Receiver<bool>,
stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
- status: WorkerStatus,
+ state: WorkerState,
errors: usize,
consecutive_errors: usize,
last_error: Option<(String, u64)>,
@@ -201,10 +201,10 @@ struct WorkerHandler {
impl WorkerHandler {
async fn step(&mut self) {
- match self.status {
- WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
+ match self.state {
+ WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
- self.status = s;
+ self.state = s;
self.consecutive_errors = 0;
}
Err(e) => {
@@ -219,12 +219,12 @@ impl WorkerHandler {
self.last_error = Some((format!("{}", e), now_msec()));
// Sleep a bit so that error won't repeat immediately, exponential backoff
// strategy (min 1sec, max ~60sec)
- self.status = WorkerStatus::Throttled(
+ self.state = WorkerState::Throttled(
(1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
);
}
},
- WorkerStatus::Throttled(delay) => {
+ WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
if !*self.stop_signal.borrow() {
select! {
@@ -232,13 +232,13 @@ impl WorkerHandler {
_ = self.stop_signal.changed() => (),
}
}
- self.status = WorkerStatus::Busy;
+ self.state = WorkerState::Busy;
}
- WorkerStatus::Idle => {
+ WorkerState::Idle => {
if *self.stop_signal.borrow() {
select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.status = new_st;
+ self.state = new_st;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// stay in Idle state
@@ -247,7 +247,7 @@ impl WorkerHandler {
} else {
select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.status = new_st;
+ self.state = new_st;
}
_ = self.stop_signal.changed() => {
// stay in Idle state
@@ -255,7 +255,7 @@ impl WorkerHandler {
}
}
}
- WorkerStatus::Done => unreachable!(),
+ WorkerState::Done => unreachable!(),
}
}
}
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 9c796f8b..fdb2918b 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
-use crate::background::WorkerStatus;
+use crate::background::WorkerState;
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
@@ -61,10 +61,10 @@ impl Tranquilizer {
}
#[must_use]
- pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus {
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
match self.tranquilize_internal(tranquility) {
- Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()),
- None => WorkerStatus::Busy,
+ Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
+ None => WorkerState::Busy,
}
}