aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/background.rs175
1 files changed, 100 insertions, 75 deletions
diff --git a/src/util/background.rs b/src/util/background.rs
index 0ec9779a..35d41d9f 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,10 +1,11 @@
use core::future::Future;
use std::pin::Pin;
-use std::sync::Mutex;
-
-use arc_swap::ArcSwapOption;
use std::sync::Arc;
-use tokio::sync::{mpsc, watch};
+use std::time::Duration;
+
+use futures::future::*;
+use futures::select;
+use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
@@ -14,99 +15,115 @@ type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>,
- queue_in: ArcSwapOption<mpsc::UnboundedSender<(Job, bool)>>,
-
- workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
+ queue_in: mpsc::UnboundedSender<(Job, bool)>,
+ worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
- pub fn new(n_runners: usize, stop_signal: watch::Receiver<bool>) -> Arc<Self> {
- let (queue_in, queue_out) = mpsc::unbounded_channel();
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (worker_in, mut worker_out) = mpsc::unbounded_channel();
+
+ let stop_signal_2 = stop_signal.clone();
+ let await_all_done = tokio::spawn(async move {
+ loop {
+ let wkr = {
+ select! {
+ item = worker_out.recv().fuse() => {
+ match item {
+ Some(x) => x,
+ None => break,
+ }
+ }
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
+ if *stop_signal_2.borrow() {
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+ };
+ if let Err(e) = wkr.await {
+ error!("Error while awaiting for worker: {}", e);
+ }
+ }
+ });
- let mut workers = vec![];
- let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out));
+ let (queue_in, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
let stop_signal = stop_signal.clone();
- workers.push(tokio::spawn(async move {
- while let Some((job, cancellable)) = queue_out.lock().await.recv().await {
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
+ worker_in
+ .send(tokio::spawn(async move {
+ loop {
+ let (job, cancellable) = {
+ select! {
+ item = wait_job(&queue_out).fuse() => match item {
+ // We received a task, process it
+ Some(x) => x,
+ // We received a signal that no more tasks will ever be sent
+ // because the sending side was dropped. Exit now.
+ None => break,
+ },
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
+ if *stop_signal.borrow() {
+ // Nothing has been going on for 10 secs, and we are shutting
+ // down. Exit now.
+ break;
+ } else {
+ // Nothing is going on but we don't want to exit.
+ continue;
+ }
+ }
+ }
+ };
+ if cancellable && *stop_signal.borrow() {
+ continue;
+ }
+ if let Err(e) = job.await {
+ error!("Job failed: {}", e)
+ }
}
- }
- info!("Worker {} exiting", i);
- }));
+ info!("Background worker {} exiting", i);
+ }))
+ .unwrap();
}
- Arc::new(Self {
+ let bgrunner = Arc::new(Self {
stop_signal,
- queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))),
- workers: Mutex::new(workers),
- })
- }
-
- pub async fn run(self: Arc<Self>) {
- let mut stop_signal = self.stop_signal.clone();
-
- loop {
- let exit_now = match stop_signal.changed().await {
- Ok(()) => *stop_signal.borrow(),
- Err(e) => {
- error!("Watch .changed() error: {}", e);
- true
- }
- };
- if exit_now {
- break;
- }
- }
-
- info!("Closing background job queue_in...");
- drop(self.queue_in.swap(None));
-
- info!("Waiting for all workers to terminate...");
- while let Some(task) = self.workers.lock().unwrap().pop() {
- if let Err(e) = task.await {
- warn!("Error awaiting task: {}", e);
- }
- }
+ queue_in,
+ worker_in,
+ });
+ (bgrunner, await_all_done)
}
// Spawn a task to be run in background
- pub async fn spawn<T>(&self, job: T)
+ pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- match self.queue_in.load().as_ref() {
- Some(chan) => {
- let boxed: Job = Box::pin(job);
- chan.send((boxed, false)).map_err(|_| "send error").unwrap();
- }
- None => {
- warn!("Doing background job now because we are exiting...");
- if let Err(e) = job.await {
- warn!("Task failed: {}", e);
- }
- }
- }
+ let boxed: Job = Box::pin(job);
+ self.queue_in
+ .send((boxed, false))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- match self.queue_in.load().as_ref() {
- Some(chan) => {
- let boxed: Job = Box::pin(job);
- chan.send((boxed, false)).map_err(|_| "send error").unwrap();
- }
- None => (), // drop job if we are exiting
- }
+ let boxed: Job = Box::pin(job);
+ self.queue_in
+ .send((boxed, true))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
@@ -114,11 +131,19 @@ impl BackgroundRunner {
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
- let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
- workers.push(tokio::spawn(async move {
+ let task = tokio::spawn(async move {
+ info!("Worker started: {}", name);
worker(stop_signal).await;
info!("Worker exited: {}", name);
- }));
+ });
+ self.worker_in
+ .send(task)
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
}
+
+async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
+ q.lock().await.recv().await
+}