aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
commit3119ea59b08e62ce14cddeb4809a397785b662bb (patch)
tree08e54b210ba73988ed1ac56db7045f39f3791bdb /src/util
parente12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff)
downloadgarage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz
garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip
New worker semantics applied to garage_table
Diffstat (limited to 'src/util')
-rw-r--r--src/util/background/job_worker.rs5
-rw-r--r--src/util/background/mod.rs20
-rw-r--r--src/util/background/worker.rs91
3 files changed, 80 insertions, 36 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
index 8cc660f8..fcdac582 100644
--- a/src/util/background/job_worker.rs
+++ b/src/util/background/job_worker.rs
@@ -34,16 +34,15 @@ impl Worker for JobWorker {
}
}
- async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
if cancellable && *must_exit.borrow() {
- // skip job
continue;
}
self.next_job = Some(job);
- return WorkerStatus::Busy
+ return WorkerStatus::Busy;
}
None => return WorkerStatus::Done,
}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 97d25784..c06e2225 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -10,7 +10,8 @@ use std::sync::Arc;
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
-use worker::{Worker, WorkerProcessor};
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerStatus};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
@@ -30,9 +31,7 @@ impl BackgroundRunner {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let await_all_done =
- tokio::spawn(
- async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
- );
+ tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await });
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
@@ -40,11 +39,14 @@ impl BackgroundRunner {
for i in 0..n_runners {
let queue_out = queue_out.clone();
- send_worker.send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- })).ok().unwrap();
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
}
let bgrunner = Arc::new(Self {
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index a173902c..92f7990c 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,16 +1,16 @@
use std::time::{Duration, Instant};
-use tracing::*;
use async_trait::async_trait;
use futures::future::*;
-use tokio::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use tracing::*;
use crate::error::Error;
-#[derive(PartialEq, Copy, Clone)]
+#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerStatus {
Busy,
Idle,
@@ -20,8 +20,20 @@ pub enum WorkerStatus {
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again immediately.
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
- async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus;
}
pub(crate) struct WorkerProcessor {
@@ -58,10 +70,12 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
workers.push(async move {
let mut worker = WorkerHandler {
task_id,
stop_signal,
+ stop_signal_worker,
worker: new_worker,
status: WorkerStatus::Busy,
};
@@ -91,15 +105,22 @@ impl WorkerProcessor {
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
- if worker.status == WorkerStatus::Busy
- || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
- {
- workers.push(async move {
- worker.step().await;
- worker
- }.boxed());
+ if worker.status == WorkerStatus::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status);
} else {
- info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
}
}
};
@@ -109,7 +130,7 @@ impl WorkerProcessor {
info!("All workers exited in time \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
- warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
}
@@ -119,27 +140,49 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
}
impl WorkerHandler {
- async fn step(&mut self) {
+ async fn step(&mut self) {
match self.status {
- WorkerStatus::Busy => {
- match self.worker.work(&mut self.stop_signal).await {
- Ok(s) => {
- self.status = s;
+ WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ }
+ },
+ WorkerStatus::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
}
- Err(e) => {
- error!("Error in worker {}: {}", self.worker.name(), e);
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
}
}
}
- WorkerStatus::Idle => {
- self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
- }
- WorkerStatus::Done => unreachable!()
+ WorkerStatus::Done => unreachable!(),
}
}
}