From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/util/background/mod.rs | 117 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 src/util/background/mod.rs (limited to 'src/util/background/mod.rs') diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..619f5068 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,117 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerState}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender>, + worker_info: Arc>>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct WorkerInfo { + pub name: String, + pub info: Option, + pub state: WorkerState, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<(String, u64)>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver, + ) -> (Arc, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::>(); + + let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut worker_processor = + WorkerProcessor::new(worker_out, stop_signal, worker_info.clone()); + + let await_all_done = tokio::spawn(async move { + worker_processor.run().await; + }); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + worker_info, + }); + (bgrunner, await_all_done) + } + + pub fn get_worker_info(&self) -> HashMap { + self.worker_info.lock().unwrap().clone() + } + + /// Spawn a task to be run in background + pub fn spawn(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} -- cgit v1.2.3