diff options
author | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-07-08 13:30:26 +0200 |
commit | 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 (patch) | |
tree | 256ee885e93cf1c41dc2869fe13a648aa91ab9b5 /src/util/background/mod.rs | |
parent | aab34bfe5415e9584432bf32e29a151dc5af9ebd (diff) | |
download | garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.tar.gz garage-4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7.zip |
Background task manager (#332)
- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
- [x] Merkle updater: several items per iteration
- [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
- [x] have only one worker with a channel to start/pause/cancel
- [x] automatic scrub
- [x] ability to view and change tranquility from CLI
- [x] persistence of a few info
- [ ] Testing
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/util/background/mod.rs')
-rw-r--r-- | src/util/background/mod.rs | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..619f5068 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,117 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerState}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct WorkerInfo { + pub name: String, + pub info: Option<String>, + pub state: WorkerState, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<(String, u64)>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver<bool>, + ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); + + let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut worker_processor = + WorkerProcessor::new(worker_out, stop_signal, worker_info.clone()); + + let await_all_done = tokio::spawn(async move { + worker_processor.run().await; + }); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + worker_info, + }); + (bgrunner, await_all_done) + } + + pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> { + self.worker_info.lock().unwrap().clone() + } + + /// Spawn a task to be run in background + pub fn spawn<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker<W>(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} |