diff options
author | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
commit | 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch) | |
tree | 256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/util/background.rs | |
parent | aab34bfe5415e9584432bf32e29a151dc5af9ebd (diff) | |
download | garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip |
Background task manager (#332)
- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
- [x] Merkle updater: several items per iteration
- [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
- [x] have only one worker with a channel to start/pause/cancel
- [x] automatic scrub
- [x] ability to view and change tranquility from CLI
- [x] persistence of a few info
- [ ] Testing
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/util/background.rs')
-rw-r--r-- | src/util/background.rs | 160 |
1 files changed, 0 insertions, 160 deletions
diff --git a/src/util/background.rs b/src/util/background.rs deleted file mode 100644 index d35425f5..00000000 --- a/src/util/background.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! Job runner for futures and async functions -use core::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - -use futures::future::*; -use futures::select; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; - -use crate::error::Error; - -type JobOutput = Result<(), Error>; -type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; - -/// Job runner for futures and async functions -pub struct BackgroundRunner { - stop_signal: watch::Receiver<bool>, - queue_in: mpsc::UnboundedSender<(Job, bool)>, - worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, -} - -impl BackgroundRunner { - /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver<bool>, - ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { - let (worker_in, mut worker_out) = mpsc::unbounded_channel(); - - let stop_signal_2 = stop_signal.clone(); - let await_all_done = tokio::spawn(async move { - let mut workers = FuturesUnordered::new(); - let mut shutdown_timer = 0; - loop { - let closed = match worker_out.try_recv() { - Ok(wkr) => { - workers.push(wkr); - false - } - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Disconnected) => true, - }; - select! { - res = workers.next() => { - if let Some(Err(e)) = res { - error!("Worker exited with error: {}", e); - } - } - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if closed || *stop_signal_2.borrow() { - shutdown_timer += 1; - if shutdown_timer >= 10 { - break; - } - } - } - } - } - }); - - let (queue_in, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - let stop_signal = stop_signal.clone(); - - worker_in - .send(tokio::spawn(async move { - loop { - let (job, cancellable) = { - select! { - item = wait_job(&queue_out).fuse() => match item { - // We received a task, process it - Some(x) => x, - // We received a signal that no more tasks will ever be sent - // because the sending side was dropped. Exit now. - None => break, - }, - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal.borrow() { - // Nothing has been going on for 5 secs, and we are shutting - // down. Exit now. - break; - } else { - // Nothing is going on but we don't want to exit. - continue; - } - } - } - }; - if cancellable && *stop_signal.borrow() { - continue; - } - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } - info!("Background worker {} exiting", i); - })) - .unwrap(); - } - - let bgrunner = Arc::new(Self { - stop_signal, - queue_in, - worker_in, - }); - (bgrunner, await_all_done) - } - - /// Spawn a task to be run in background - pub fn spawn<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, false)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, true)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - pub fn spawn_worker<F, T>(&self, name: String, worker: F) - where - F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, - T: Future<Output = ()> + Send + 'static, - { - let stop_signal = self.stop_signal.clone(); - let task = tokio::spawn(async move { - info!("Worker started: {}", name); - worker(stop_signal).await; - info!("Worker exited: {}", name); - }); - self.worker_in - .send(task) - .map_err(|_| "could not put job in queue") - .unwrap(); - } -} - -async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> { - q.lock().await.recv().await -} |