diff options
author | Alex <alex@adnab.me> | 2023-01-03 11:37:31 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-03 11:37:31 +0000 |
commit | 582b0761790b7958a3ba10c4b549b466997d2dcd (patch) | |
tree | b94c84bd21ef45e2480c653dc7ed2b37fd5907fb /src/util/background/mod.rs | |
parent | 76230f20282e73a5a5afa33af68152acaf732cf5 (diff) | |
parent | 939a6d67e8ace1aa38998281f52511a61f4b4d94 (diff) | |
download | garage-582b0761790b7958a3ba10c4b549b466997d2dcd.tar.gz garage-582b0761790b7958a3ba10c4b549b466997d2dcd.zip |
Merge pull request 'Some improvements to Garage internals' (#451) from internals-rework into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/451
Diffstat (limited to 'src/util/background/mod.rs')
-rw-r--r-- | src/util/background/mod.rs | 58 |
1 files changed, 2 insertions, 56 deletions
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,18 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; -use core::future::Future; - use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; - /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, } @@ -49,10 +40,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver<bool>, - ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +51,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +62,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker<W>(&self, worker: W) where W: Worker + 'static, |