aboutsummaryrefslogtreecommitdiff
path: root/src/background.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-22 16:51:52 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-22 16:51:52 +0000
commite8214cb1807d3145907c7ed9e077fa45ada4aeea (patch)
tree00192416f1c2d2157988a1c07df4601475a30a73 /src/background.rs
parentc0335ac6904598b9ac367e17651da477d4d970d7 (diff)
downloadgarage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.tar.gz
garage-e8214cb1807d3145907c7ed9e077fa45ada4aeea.zip
Better concurrency:
Use Notify instead of stupid sleep in background worker Use Semaphore to limit concurrent requests in rpc_client Make more background tasks cancellable
Diffstat (limited to 'src/background.rs')
-rw-r--r--src/background.rs9
1 files changed, 6 insertions, 3 deletions
diff --git a/src/background.rs b/src/background.rs
index 1f04e49b..f0dbdcb5 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -3,9 +3,8 @@ use std::pin::Pin;
use futures::future::join_all;
use std::sync::Arc;
-use std::time::Duration;
use tokio::sync::Mutex;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, Notify};
use crate::error::Error;
@@ -18,6 +17,7 @@ pub struct BackgroundRunner {
queue_in: mpsc::UnboundedSender<(Job, bool)>,
queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
+ job_notify: Notify,
workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
}
@@ -30,6 +30,7 @@ impl BackgroundRunner {
stop_signal,
queue_in,
queue_out: Mutex::new(queue_out),
+ job_notify: Notify::new(),
workers: Mutex::new(Vec::new()),
})
}
@@ -58,6 +59,7 @@ impl BackgroundRunner {
{
let boxed: Job = Box::pin(job);
let _: Result<_, _> = self.queue_in.clone().send((boxed, false));
+ self.job_notify.notify();
}
pub fn spawn_cancellable<T>(&self, job: T)
@@ -66,6 +68,7 @@ impl BackgroundRunner {
{
let boxed: Job = Box::pin(job);
let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
+ self.job_notify.notify();
}
pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
@@ -97,7 +100,7 @@ impl BackgroundRunner {
info!("Background runner {} exiting", i);
return;
}
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ self.job_notify.notified().await;
}
}
}