aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-09-12 12:54:50 +0000
committerAlex <alex@adnab.me>2023-09-12 12:54:50 +0000
commitbe1a16b42b173779c2457d11e489ea13952a9eae (patch)
tree4ab75c134805d73c5b1cf8121b7dcb551303b452
parent143a349f55b0c652d407019c5850313b0c95ccbd (diff)
parent91e764a2bf7a479a068fdfdad020cd79b5e86b88 (diff)
downloadgarage-be1a16b42b173779c2457d11e489ea13952a9eae.tar.gz
garage-be1a16b42b173779c2457d11e489ea13952a9eae.zip
Merge pull request 'Fix multiple shutdown issues' (#633) from fix-shutdown into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/633
-rw-r--r--src/garage/server.rs27
-rw-r--r--src/rpc/system.rs18
2 files changed, 26 insertions, 19 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 958089c6..472616c7 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -130,20 +130,27 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
warn!("This Garage version is built without the metrics feature");
}
- // Stuff runs
-
- // When a cancel signal is sent, stuff stops
-
- // Collect stuff
- for (desc, join_handle) in servers {
- if let Err(e) = join_handle.await? {
- error!("{} server exited with error: {}", desc, e);
- } else {
- info!("{} server exited without error.", desc);
+ if servers.is_empty() {
+ // Nothing runs except netapp (not in servers)
+ // Await shutdown signal before proceeding to shutting down netapp
+ wait_from(watch_cancel).await;
+ } else {
+ // Stuff runs
+
+ // When a cancel signal is sent, stuff stops
+
+ // Collect stuff
+ for (desc, join_handle) in servers {
+ if let Err(e) = join_handle.await? {
+ error!("{} server exited with error: {}", desc, e);
+ } else {
+ info!("{} server exited without error.", desc);
+ }
}
}
// Remove RPC handlers for system to break reference cycles
+ info!("Deregistering RPC handlers for shutdown...");
garage.system.netapp.drop_all_handlers();
opentelemetry::global::shutdown_tracer_provider();
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 36b58311..4daa5ba9 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -9,10 +9,10 @@ use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
use async_trait::async_trait;
-use futures::{join, select};
-use futures_util::future::*;
+use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
+use tokio::select;
use tokio::sync::watch;
use tokio::sync::Mutex;
@@ -702,7 +702,7 @@ impl System {
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
+ let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
@@ -711,13 +711,14 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH),
+ RequestStrategy::with_priority(PRIO_HIGH)
+ .with_custom_timeout(STATUS_EXCHANGE_INTERVAL),
)
.await;
select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
+ _ = tokio::time::sleep_until(restart_at.into()) => {},
+ _ = stop_signal.changed() => {},
}
}
}
@@ -799,10 +800,9 @@ impl System {
#[cfg(feature = "kubernetes-discovery")]
tokio::spawn(self.clone().advertise_to_kubernetes());
- let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
+ _ = tokio::time::sleep(DISCOVERY_INTERVAL) => {},
+ _ = stop_signal.changed() => {},
}
}
}