aboutsummaryrefslogtreecommitdiff
path: root/src/util/background/worker.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-03 11:37:31 +0000
committerAlex <alex@adnab.me>2023-01-03 11:37:31 +0000
commit582b0761790b7958a3ba10c4b549b466997d2dcd (patch)
treeb94c84bd21ef45e2480c653dc7ed2b37fd5907fb /src/util/background/worker.rs
parent76230f20282e73a5a5afa33af68152acaf732cf5 (diff)
parent939a6d67e8ace1aa38998281f52511a61f4b4d94 (diff)
downloadgarage-582b0761790b7958a3ba10c4b549b466997d2dcd.tar.gz
garage-582b0761790b7958a3ba10c4b549b466997d2dcd.zip
Merge pull request 'Some improvements to Garage internals' (#451) from internals-rework into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/451
Diffstat (limited to 'src/util/background/worker.rs')
-rw-r--r--src/util/background/worker.rs73
1 files changed, 23 insertions, 50 deletions
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 7e9da7f8..8165e2cb 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use async_trait::async_trait;
use futures::future::*;
@@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
+// All workers that haven't exited for this time after an exit signal was recieved
+// will be interrupted in the middle of whatever they are doing.
+const EXIT_DEADLINE: Duration = Duration::from_secs(8);
+
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerState {
Busy,
@@ -50,10 +54,8 @@ pub trait Worker: Send {
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
- /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
- /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
- /// it to check if we are exiting.
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+ /// the middle for any reason, for example if an interrupt signal was recieved.
+ async fn wait_for_work(&mut self) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
@@ -93,11 +95,9 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
- let stop_signal_worker = self.stop_signal.clone();
let mut worker = WorkerHandler {
task_id,
stop_signal,
- stop_signal_worker,
worker: new_worker,
state: WorkerState::Busy,
errors: 0,
@@ -153,26 +153,14 @@ impl WorkerProcessor {
}
// We are exiting, drain everything
- let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
- while let Some(mut worker) = workers.next().await {
- if worker.state == WorkerState::Done {
- info!(
- "Worker {} (TID {}) exited",
- worker.worker.name(),
- worker.task_id
- );
- } else if Instant::now() > drain_half_time {
- warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
- } else {
- workers.push(
- async move {
- worker.step().await;
- worker
- }
- .boxed(),
- );
- }
+ while let Some(worker) = workers.next().await {
+ info!(
+ "Worker {} (TID {}) exited (last state: {:?})",
+ worker.worker.name(),
+ worker.task_id,
+ worker.state
+ );
}
};
@@ -180,7 +168,7 @@ impl WorkerProcessor {
_ = drain_everything => {
info!("All workers exited peacefully \\o/");
}
- _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ _ = tokio::time::sleep(EXIT_DEADLINE) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
@@ -190,7 +178,6 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
- stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
state: WorkerState,
errors: usize,
@@ -225,33 +212,19 @@ impl WorkerHandler {
},
WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
- if !*self.stop_signal.borrow() {
- select! {
- _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
- _ = self.stop_signal.changed() => (),
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => {
+ self.state = WorkerState::Busy;
}
+ _ = self.stop_signal.changed() => (),
}
- self.state = WorkerState::Busy;
}
WorkerState::Idle => {
- if *self.stop_signal.borrow() {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = tokio::time::sleep(Duration::from_secs(1)) => {
- // stay in Idle state
- }
- }
- } else {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = self.stop_signal.changed() => {
- // stay in Idle state
- }
+ select! {
+ new_st = self.worker.wait_for_work() => {
+ self.state = new_st;
}
+ _ = self.stop_signal.changed() => (),
}
}
WorkerState::Done => unreachable!(),