diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-22 16:51:52 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-22 16:51:52 +0000 |
commit | e8214cb1807d3145907c7ed9e077fa45ada4aeea (patch) | |
tree | 00192416f1c2d2157988a1c07df4601475a30a73 /src/background.rs | |
parent | c0335ac6904598b9ac367e17651da477d4d970d7 (diff) | |
download | garage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.tar.gz garage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.zip |
Better concurrency:
Use Notify instead of stupid sleep in background worker
Use Semaphore to limit concurrent requests in rpc_client
Make more background tasks cancellable
Diffstat (limited to 'src/background.rs')
-rw-r--r-- | src/background.rs | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/src/background.rs b/src/background.rs index 1f04e49b..f0dbdcb5 100644 --- a/src/background.rs +++ b/src/background.rs @@ -3,9 +3,8 @@ use std::pin::Pin; use futures::future::join_all; use std::sync::Arc; -use std::time::Duration; use tokio::sync::Mutex; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use crate::error::Error; @@ -18,6 +17,7 @@ pub struct BackgroundRunner { queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>, + job_notify: Notify, workers: Mutex<Vec<tokio::task::JoinHandle<()>>>, } @@ -30,6 +30,7 @@ impl BackgroundRunner { stop_signal, queue_in, queue_out: Mutex::new(queue_out), + job_notify: Notify::new(), workers: Mutex::new(Vec::new()), }) } @@ -58,6 +59,7 @@ impl BackgroundRunner { { let boxed: Job = Box::pin(job); let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); + self.job_notify.notify(); } pub fn spawn_cancellable<T>(&self, job: T) @@ -66,6 +68,7 @@ impl BackgroundRunner { { let boxed: Job = Box::pin(job); let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); + self.job_notify.notify(); } pub async fn spawn_worker<F, T>(&self, name: String, worker: F) @@ -97,7 +100,7 @@ impl BackgroundRunner { info!("Background runner {} exiting", i); return; } - tokio::time::delay_for(Duration::from_secs(1)).await; + self.job_notify.notified().await; } } } |