diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-21 13:50:55 +0200 |
commit | 3119ea59b08e62ce14cddeb4809a397785b662bb (patch) | |
tree | 08e54b210ba73988ed1ac56db7045f39f3791bdb /src/util | |
parent | e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff) | |
download | garage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip |
New worker semantics applied to garage_table
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/background/job_worker.rs | 5 | ||||
-rw-r--r-- | src/util/background/mod.rs | 20 | ||||
-rw-r--r-- | src/util/background/worker.rs | 91 |
3 files changed, 80 insertions, 36 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs index 8cc660f8..fcdac582 100644 --- a/src/util/background/job_worker.rs +++ b/src/util/background/job_worker.rs @@ -34,16 +34,15 @@ impl Worker for JobWorker { } } - async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { loop { match self.job_chan.lock().await.recv().await { Some((job, cancellable)) => { if cancellable && *must_exit.borrow() { - // skip job continue; } self.next_job = Some(job); - return WorkerStatus::Busy + return WorkerStatus::Busy; } None => return WorkerStatus::Done, } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 97d25784..c06e2225 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -10,7 +10,8 @@ use std::sync::Arc; use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; -use worker::{Worker, WorkerProcessor}; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerStatus}; pub(crate) type JobOutput = Result<(), Error>; pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; @@ -30,9 +31,7 @@ impl BackgroundRunner { let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); let await_all_done = - tokio::spawn( - async move { WorkerProcessor::new(worker_out, stop_signal).run().await }, - ); + tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await }); let (send_job, queue_out) = mpsc::unbounded_channel(); let queue_out = Arc::new(Mutex::new(queue_out)); @@ -40,11 +39,14 @@ impl BackgroundRunner { for i in 0..n_runners { let queue_out = queue_out.clone(); - send_worker.send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })).ok().unwrap(); + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); } let bgrunner = Arc::new(Self { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index a173902c..92f7990c 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,16 +1,16 @@ use std::time::{Duration, Instant}; -use tracing::*; use async_trait::async_trait; use futures::future::*; -use tokio::select; use futures::stream::FuturesUnordered; use futures::StreamExt; +use tokio::select; use tokio::sync::{mpsc, watch}; +use tracing::*; use crate::error::Error; -#[derive(PartialEq, Copy, Clone)] +#[derive(PartialEq, Copy, Clone, Debug)] pub enum WorkerStatus { Busy, Idle, @@ -20,8 +20,20 @@ pub enum WorkerStatus { #[async_trait] pub trait Worker: Send { fn name(&self) -> String; + + /// Work: do a basic unit of work, if one is available (otherwise, should return + /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the + /// middle of processing, it will only be interrupted at the last minute when Garage is trying + /// to exit and this hasn't returned yet. This function may return an error to indicate that + /// its unit of work could not be processed due to an error: the error will be logged and + /// .work() will be called again immediately. async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; - async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus; + + /// Wait for work: await for some task to become available. This future can be interrupted in + /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we + /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows + /// it to check if we are exiting. + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus; } pub(crate) struct WorkerProcessor { @@ -58,10 +70,12 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); + let stop_signal_worker = self.stop_signal.clone(); workers.push(async move { let mut worker = WorkerHandler { task_id, stop_signal, + stop_signal_worker, worker: new_worker, status: WorkerStatus::Busy, }; @@ -91,15 +105,22 @@ impl WorkerProcessor { let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { while let Some(mut worker) = workers.next().await { - if worker.status == WorkerStatus::Busy - || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time) - { - workers.push(async move { - worker.step().await; - worker - }.boxed()); + if worker.status == WorkerStatus::Done { + info!( + "Worker {} (TID {}) exited", + worker.worker.name(), + worker.task_id + ); + } else if Instant::now() > drain_half_time { + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status); } else { - info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + workers.push( + async move { + worker.step().await; + worker + } + .boxed(), + ); } } }; @@ -109,7 +130,7 @@ impl WorkerProcessor { info!("All workers exited in time \\o/"); } _ = tokio::time::sleep(Duration::from_secs(9)) => { - warn!("Some workers could not exit in time, we are cancelling some things in the middle."); + error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } } @@ -119,27 +140,49 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver<bool>, + stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, status: WorkerStatus, } impl WorkerHandler { - async fn step(&mut self) { + async fn step(&mut self) { match self.status { - WorkerStatus::Busy => { - match self.worker.work(&mut self.stop_signal).await { - Ok(s) => { - self.status = s; + WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.status = s; + } + Err(e) => { + error!( + "Error in worker {} (TID {}): {}", + self.worker.name(), + self.task_id, + e + ); + } + }, + WorkerStatus::Idle => { + if *self.stop_signal.borrow() { + select! { + new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => { + self.status = new_st; + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // stay in Idle state + } } - Err(e) => { - error!("Error in worker {}: {}", self.worker.name(), e); + } else { + select! { + new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => { + self.status = new_st; + } + _ = self.stop_signal.changed() => { + // stay in Idle state + } } } } - WorkerStatus::Idle => { - self.status = self.worker.wait_for_work(&mut self.stop_signal).await; - } - WorkerStatus::Done => unreachable!() + WorkerStatus::Done => unreachable!(), } } } |