aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-05 10:56:44 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-10 13:25:10 +0200
commit99fcfa3844a346463e5739fec19ac2a6b560adfc (patch)
treecb82b76999f81f0c5e351d062618c89f64ac3e99
parent633958c7b1ce9c83df5159051fd299b484d0d797 (diff)
downloadgarage-99fcfa3844a346463e5739fec19ac2a6b560adfc.tar.gz
garage-99fcfa3844a346463e5739fec19ac2a6b560adfc.zip
Make background runner terminate correctly
-rw-r--r--src/garage/server.rs1
-rw-r--r--src/util/background.rs37
2 files changed, 23 insertions, 15 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 4c0f8653..ffbe97ec 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -110,6 +110,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
+ opentelemetry::global::shutdown_tracer_provider();
// Await for netapp RPC system to end
run_system.await?;
diff --git a/src/util/background.rs b/src/util/background.rs
index bfdaaf1e..d35425f5 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*;
use futures::select;
-use tokio::sync::{mpsc, watch, Mutex};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
@@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
+ let mut workers = FuturesUnordered::new();
+ let mut shutdown_timer = 0;
loop {
- let wkr = {
- select! {
- item = worker_out.recv().fuse() => {
- match item {
- Some(x) => x,
- None => break,
- }
+ let closed = match worker_out.try_recv() {
+ Ok(wkr) => {
+ workers.push(wkr);
+ false
+ }
+ Err(TryRecvError::Empty) => false,
+ Err(TryRecvError::Disconnected) => true,
+ };
+ select! {
+ res = workers.next() => {
+ if let Some(Err(e)) = res {
+ error!("Worker exited with error: {}", e);
}
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal_2.borrow() {
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
+ if closed || *stop_signal_2.borrow() {
+ shutdown_timer += 1;
+ if shutdown_timer >= 10 {
break;
- } else {
- continue;
}
}
}
- };
- if let Err(e) = wkr.await {
- error!("Error while awaiting for worker: {}", e);
}
}
});