aboutsummaryrefslogtreecommitdiff
path: root/src/util/background
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/background')
-rw-r--r--src/util/background/job_worker.rs52
-rw-r--r--src/util/background/mod.rs91
-rw-r--r--src/util/background/worker.rs145
3 files changed, 288 insertions, 0 deletions
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..8cc660f8
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,52 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerStatus::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerStatus::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ // skip job
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerStatus::Busy
+ }
+ None => return WorkerStatus::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..97d25784
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,91 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::{Worker, WorkerProcessor};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let await_all_done =
+ tokio::spawn(
+ async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
+ );
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker.send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ })).ok().unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..a173902c
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,145 @@
+use std::time::{Duration, Instant};
+
+use tracing::*;
+use async_trait::async_trait;
+use futures::future::*;
+use tokio::select;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, watch};
+
+use crate::error::Error;
+
+#[derive(PartialEq, Copy, Clone)]
+pub enum WorkerStatus {
+ Busy,
+ Idle,
+ Done,
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
+ async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ workers.push(async move {
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ worker: new_worker,
+ status: WorkerStatus::Busy,
+ };
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ // TODO save new worker status somewhere
+ if worker.status == WorkerStatus::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.status == WorkerStatus::Busy
+ || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
+ {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ } else {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited in time \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
+ }
+ }
+ }
+}
+
+// TODO add tranquilizer
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ status: WorkerStatus,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.status {
+ WorkerStatus::Busy => {
+ match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ }
+ Err(e) => {
+ error!("Error in worker {}: {}", self.worker.name(), e);
+ }
+ }
+ }
+ WorkerStatus::Idle => {
+ self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
+ }
+ WorkerStatus::Done => unreachable!()
+ }
+ }
+}