aboutsummaryrefslogtreecommitdiff
path: root/src/util/background/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/background/worker.rs')
-rw-r--r--src/util/background/worker.rs145
1 files changed, 145 insertions, 0 deletions
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..a173902c
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,145 @@
+use std::time::{Duration, Instant};
+
+use tracing::*;
+use async_trait::async_trait;
+use futures::future::*;
+use tokio::select;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, watch};
+
+use crate::error::Error;
+
+#[derive(PartialEq, Copy, Clone)]
+pub enum WorkerStatus {
+ Busy,
+ Idle,
+ Done,
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
+ async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ workers.push(async move {
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ worker: new_worker,
+ status: WorkerStatus::Busy,
+ };
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ // TODO save new worker status somewhere
+ if worker.status == WorkerStatus::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.status == WorkerStatus::Busy
+ || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
+ {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ } else {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited in time \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
+ }
+ }
+ }
+}
+
+// TODO add tranquilizer
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ status: WorkerStatus,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.status {
+ WorkerStatus::Busy => {
+ match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ }
+ Err(e) => {
+ error!("Error in worker {}: {}", self.worker.name(), e);
+ }
+ }
+ }
+ WorkerStatus::Idle => {
+ self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
+ }
+ WorkerStatus::Done => unreachable!()
+ }
+ }
+}