aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/util/background.rs37
1 files changed, 22 insertions, 15 deletions
diff --git a/src/util/background.rs b/src/util/background.rs
index bfdaaf1e..d35425f5 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*;
use futures::select;
-use tokio::sync::{mpsc, watch, Mutex};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
@@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
+ let mut workers = FuturesUnordered::new();
+ let mut shutdown_timer = 0;
loop {
- let wkr = {
- select! {
- item = worker_out.recv().fuse() => {
- match item {
- Some(x) => x,
- None => break,
- }
+ let closed = match worker_out.try_recv() {
+ Ok(wkr) => {
+ workers.push(wkr);
+ false
+ }
+ Err(TryRecvError::Empty) => false,
+ Err(TryRecvError::Disconnected) => true,
+ };
+ select! {
+ res = workers.next() => {
+ if let Some(Err(e)) = res {
+ error!("Worker exited with error: {}", e);
}
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal_2.borrow() {
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
+ if closed || *stop_signal_2.borrow() {
+ shutdown_timer += 1;
+ if shutdown_timer >= 10 {
break;
- } else {
- continue;
}
}
}
- };
- if let Err(e) = wkr.await {
- error!("Error while awaiting for worker: {}", e);
}
}
});