aboutsummaryrefslogtreecommitdiff
path: root/src/util/background/mod.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-21 12:37:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-21 12:37:52 +0200
commite12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (patch)
tree1a748d618917cbcf4e45b3756a0fcb62bb6ae42c /src/util/background/mod.rs
parent77e3fd6db2c9cd3a10889bd071e95ef839cfbefc (diff)
downloadgarage-e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62.tar.gz
garage-e12bc3b5959c0aa5ae3c8a746c62bab2e7343a62.zip
First try on background worker manager
Diffstat (limited to 'src/util/background/mod.rs')
-rw-r--r--src/util/background/mod.rs91
1 files changed, 91 insertions, 0 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..97d25784
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,91 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::{Worker, WorkerProcessor};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let await_all_done =
+ tokio::spawn(
+ async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
+ );
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker.send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ })).ok().unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}