diff options
-rw-r--r-- | src/garage/server.rs | 1 | ||||
-rw-r--r-- | src/util/background.rs | 37 |
2 files changed, 23 insertions, 15 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs index 4c0f8653..ffbe97ec 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -110,6 +110,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Remove RPC handlers for system to break reference cycles garage.system.netapp.drop_all_handlers(); + opentelemetry::global::shutdown_tracer_provider(); // Await for netapp RPC system to end run_system.await?; diff --git a/src/util/background.rs b/src/util/background.rs index bfdaaf1e..d35425f5 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -6,7 +6,9 @@ use std::time::Duration; use futures::future::*; use futures::select; -use tokio::sync::{mpsc, watch, Mutex}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; use crate::error::Error; @@ -30,26 +32,31 @@ impl BackgroundRunner { let stop_signal_2 = stop_signal.clone(); let await_all_done = tokio::spawn(async move { + let mut workers = FuturesUnordered::new(); + let mut shutdown_timer = 0; loop { - let wkr = { - select! { - item = worker_out.recv().fuse() => { - match item { - Some(x) => x, - None => break, - } + let closed = match worker_out.try_recv() { + Ok(wkr) => { + workers.push(wkr); + false + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => true, + }; + select! { + res = workers.next() => { + if let Some(Err(e)) = res { + error!("Worker exited with error: {}", e); } - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal_2.borrow() { + } + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { + if closed || *stop_signal_2.borrow() { + shutdown_timer += 1; + if shutdown_timer >= 10 { break; - } else { - continue; } } } - }; - if let Err(e) = wkr.await { - error!("Error while awaiting for worker: {}", e); } } }); |