diff options
Diffstat (limited to 'src/util/background/worker.rs')
-rw-r--r-- | src/util/background/worker.rs | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs new file mode 100644 index 00000000..a173902c --- /dev/null +++ b/src/util/background/worker.rs @@ -0,0 +1,145 @@ +use std::time::{Duration, Instant}; + +use tracing::*; +use async_trait::async_trait; +use futures::future::*; +use tokio::select; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, watch}; + +use crate::error::Error; + +#[derive(PartialEq, Copy, Clone)] +pub enum WorkerStatus { + Busy, + Idle, + Done, +} + +#[async_trait] +pub trait Worker: Send { + fn name(&self) -> String; + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; + async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus; +} + +pub(crate) struct WorkerProcessor { + stop_signal: watch::Receiver<bool>, + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, +} + +impl WorkerProcessor { + pub(crate) fn new( + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + stop_signal: watch::Receiver<bool>, + ) -> Self { + Self { + stop_signal, + worker_chan, + } + } + + pub(crate) async fn run(&mut self) { + let mut workers = FuturesUnordered::new(); + let mut next_task_id = 1; + + while !*self.stop_signal.borrow() { + let await_next_worker = async { + if workers.is_empty() { + futures::future::pending().await + } else { + workers.next().await + } + }; + select! { + new_worker_opt = self.worker_chan.recv() => { + if let Some(new_worker) = new_worker_opt { + let task_id = next_task_id; + next_task_id += 1; + let stop_signal = self.stop_signal.clone(); + workers.push(async move { + let mut worker = WorkerHandler { + task_id, + stop_signal, + worker: new_worker, + status: WorkerStatus::Busy, + }; + worker.step().await; + worker + }.boxed()); + } + } + worker = await_next_worker => { + if let Some(mut worker) = worker { + // TODO save new worker status somewhere + if worker.status == WorkerStatus::Done { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } else { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + } + _ = self.stop_signal.changed() => (), + } + } + + // We are exiting, drain everything + let drain_half_time = Instant::now() + Duration::from_secs(5); + let drain_everything = async move { + while let Some(mut worker) = workers.next().await { + if worker.status == WorkerStatus::Busy + || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time) + { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } else { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } + } + }; + + select! { + _ = drain_everything => { + info!("All workers exited in time \\o/"); + } + _ = tokio::time::sleep(Duration::from_secs(9)) => { + warn!("Some workers could not exit in time, we are cancelling some things in the middle."); + } + } + } +} + +// TODO add tranquilizer +struct WorkerHandler { + task_id: usize, + stop_signal: watch::Receiver<bool>, + worker: Box<dyn Worker>, + status: WorkerStatus, +} + +impl WorkerHandler { + async fn step(&mut self) { + match self.status { + WorkerStatus::Busy => { + match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.status = s; + } + Err(e) => { + error!("Error in worker {}: {}", self.worker.name(), e); + } + } + } + WorkerStatus::Idle => { + self.status = self.worker.wait_for_work(&mut self.stop_signal).await; + } + WorkerStatus::Done => unreachable!() + } + } +} |