aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-05 10:56:44 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-17 11:38:31 +0200
commit7b474855e3a8491fcdde69d12d3fbae27f520383 (patch)
treee6643b5967b33d3f4504e74fef3b4f8bdeed9f1f
parent176715c5b27ea62e3b1bf77356360b5086d671e2 (diff)
downloadgarage-7b474855e3a8491fcdde69d12d3fbae27f520383.tar.gz
garage-7b474855e3a8491fcdde69d12d3fbae27f520383.zip
Make background runner terminate correctly
-rw-r--r--src/util/background.rs37
1 files changed, 22 insertions, 15 deletions
diff --git a/src/util/background.rs b/src/util/background.rs
index bfdaaf1e..d35425f5 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*;
use futures::select;
-use tokio::sync::{mpsc, watch, Mutex};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
@@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
+ let mut workers = FuturesUnordered::new();
+ let mut shutdown_timer = 0;
loop {
- let wkr = {
- select! {
- item = worker_out.recv().fuse() => {
- match item {
- Some(x) => x,
- None => break,
- }
+ let closed = match worker_out.try_recv() {
+ Ok(wkr) => {
+ workers.push(wkr);
+ false
+ }
+ Err(TryRecvError::Empty) => false,
+ Err(TryRecvError::Disconnected) => true,
+ };
+ select! {
+ res = workers.next() => {
+ if let Some(Err(e)) = res {
+ error!("Worker exited with error: {}", e);
}
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal_2.borrow() {
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
+ if closed || *stop_signal_2.borrow() {
+ shutdown_timer += 1;
+ if shutdown_timer >= 10 {
break;
- } else {
- continue;
}
}
}
- };
- if let Err(e) = wkr.await {
- error!("Error while awaiting for worker: {}", e);
}
}
});