diff options
Diffstat (limited to 'src/util/background')
-rw-r--r-- | src/util/background/job_worker.rs | 52 | ||||
-rw-r--r-- | src/util/background/mod.rs | 91 | ||||
-rw-r--r-- | src/util/background/worker.rs | 145 |
3 files changed, 288 insertions, 0 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs new file mode 100644 index 00000000..8cc660f8 --- /dev/null +++ b/src/util/background/job_worker.rs @@ -0,0 +1,52 @@ +//! Job worker: a generic worker that just processes incoming +//! jobs one by one + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{mpsc, Mutex}; + +use crate::background::worker::*; +use crate::background::*; + +pub(crate) struct JobWorker { + pub(crate) index: usize, + pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>, + pub(crate) next_job: Option<Job>, +} + +#[async_trait] +impl Worker for JobWorker { + fn name(&self) -> String { + format!("Job worker #{}", self.index) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.next_job.take() { + None => return Ok(WorkerStatus::Idle), + Some(job) => { + job.await?; + Ok(WorkerStatus::Busy) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus { + loop { + match self.job_chan.lock().await.recv().await { + Some((job, cancellable)) => { + if cancellable && *must_exit.borrow() { + // skip job + continue; + } + self.next_job = Some(job); + return WorkerStatus::Busy + } + None => return WorkerStatus::Done, + } + } + } +} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..97d25784 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,91 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::{Worker, WorkerProcessor}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver<bool>, + ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); + + let await_all_done = + tokio::spawn( + async move { WorkerProcessor::new(worker_out, stop_signal).run().await }, + ); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker.send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })).ok().unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + }); + (bgrunner, await_all_done) + } + + /// Spawn a task to be run in background + pub fn spawn<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker<W>(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs new file mode 100644 index 00000000..a173902c --- /dev/null +++ b/src/util/background/worker.rs @@ -0,0 +1,145 @@ +use std::time::{Duration, Instant}; + +use tracing::*; +use async_trait::async_trait; +use futures::future::*; +use tokio::select; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, watch}; + +use crate::error::Error; + +#[derive(PartialEq, Copy, Clone)] +pub enum WorkerStatus { + Busy, + Idle, + Done, +} + +#[async_trait] +pub trait Worker: Send { + fn name(&self) -> String; + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; + async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus; +} + +pub(crate) struct WorkerProcessor { + stop_signal: watch::Receiver<bool>, + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, +} + +impl WorkerProcessor { + pub(crate) fn new( + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + stop_signal: watch::Receiver<bool>, + ) -> Self { + Self { + stop_signal, + worker_chan, + } + } + + pub(crate) async fn run(&mut self) { + let mut workers = FuturesUnordered::new(); + let mut next_task_id = 1; + + while !*self.stop_signal.borrow() { + let await_next_worker = async { + if workers.is_empty() { + futures::future::pending().await + } else { + workers.next().await + } + }; + select! { + new_worker_opt = self.worker_chan.recv() => { + if let Some(new_worker) = new_worker_opt { + let task_id = next_task_id; + next_task_id += 1; + let stop_signal = self.stop_signal.clone(); + workers.push(async move { + let mut worker = WorkerHandler { + task_id, + stop_signal, + worker: new_worker, + status: WorkerStatus::Busy, + }; + worker.step().await; + worker + }.boxed()); + } + } + worker = await_next_worker => { + if let Some(mut worker) = worker { + // TODO save new worker status somewhere + if worker.status == WorkerStatus::Done { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } else { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + } + _ = self.stop_signal.changed() => (), + } + } + + // We are exiting, drain everything + let drain_half_time = Instant::now() + Duration::from_secs(5); + let drain_everything = async move { + while let Some(mut worker) = workers.next().await { + if worker.status == WorkerStatus::Busy + || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time) + { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } else { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } + } + }; + + select! { + _ = drain_everything => { + info!("All workers exited in time \\o/"); + } + _ = tokio::time::sleep(Duration::from_secs(9)) => { + warn!("Some workers could not exit in time, we are cancelling some things in the middle."); + } + } + } +} + +// TODO add tranquilizer +struct WorkerHandler { + task_id: usize, + stop_signal: watch::Receiver<bool>, + worker: Box<dyn Worker>, + status: WorkerStatus, +} + +impl WorkerHandler { + async fn step(&mut self) { + match self.status { + WorkerStatus::Busy => { + match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.status = s; + } + Err(e) => { + error!("Error in worker {}: {}", self.worker.name(), e); + } + } + } + WorkerStatus::Idle => { + self.status = self.worker.wait_for_work(&mut self.stop_signal).await; + } + WorkerStatus::Done => unreachable!() + } + } +} |