diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-05 10:56:44 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-17 11:38:31 +0200 |
commit | 7b474855e3a8491fcdde69d12d3fbae27f520383 (patch) | |
tree | e6643b5967b33d3f4504e74fef3b4f8bdeed9f1f | |
parent | 176715c5b27ea62e3b1bf77356360b5086d671e2 (diff) | |
download | garage-7b474855e3a8491fcdde69d12d3fbae27f520383.tar.gz garage-7b474855e3a8491fcdde69d12d3fbae27f520383.zip |
Make background runner terminate correctly
-rw-r--r-- | src/util/background.rs | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/src/util/background.rs b/src/util/background.rs index bfdaaf1e..d35425f5 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -6,7 +6,9 @@ use std::time::Duration; use futures::future::*; use futures::select; -use tokio::sync::{mpsc, watch, Mutex}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; use crate::error::Error; @@ -30,26 +32,31 @@ impl BackgroundRunner { let stop_signal_2 = stop_signal.clone(); let await_all_done = tokio::spawn(async move { + let mut workers = FuturesUnordered::new(); + let mut shutdown_timer = 0; loop { - let wkr = { - select! { - item = worker_out.recv().fuse() => { - match item { - Some(x) => x, - None => break, - } + let closed = match worker_out.try_recv() { + Ok(wkr) => { + workers.push(wkr); + false + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => true, + }; + select! { + res = workers.next() => { + if let Some(Err(e)) = res { + error!("Worker exited with error: {}", e); } - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal_2.borrow() { + } + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { + if closed || *stop_signal_2.borrow() { + shutdown_timer += 1; + if shutdown_timer >= 10 { break; - } else { - continue; } } } - }; - if let Err(e) = wkr.await { - error!("Error while awaiting for worker: {}", e); } } }); |