diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/server.rs | 27 | ||||
-rw-r--r-- | src/rpc/system.rs | 18 |
2 files changed, 26 insertions, 19 deletions
diff --git a/src/garage/server.rs b/src/garage/server.rs index 958089c6..472616c7 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -130,20 +130,27 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er warn!("This Garage version is built without the metrics feature"); } - // Stuff runs - - // When a cancel signal is sent, stuff stops - - // Collect stuff - for (desc, join_handle) in servers { - if let Err(e) = join_handle.await? { - error!("{} server exited with error: {}", desc, e); - } else { - info!("{} server exited without error.", desc); + if servers.is_empty() { + // Nothing runs except netapp (not in servers) + // Await shutdown signal before proceeding to shutting down netapp + wait_from(watch_cancel).await; + } else { + // Stuff runs + + // When a cancel signal is sent, stuff stops + + // Collect stuff + for (desc, join_handle) in servers { + if let Err(e) = join_handle.await? { + error!("{} server exited with error: {}", desc, e); + } else { + info!("{} server exited without error.", desc); + } } } // Remove RPC handlers for system to break reference cycles + info!("Deregistering RPC handlers for shutdown..."); garage.system.netapp.drop_all_handlers(); opentelemetry::global::shutdown_tracer_provider(); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 7fc3c20c..78fcc74b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -9,10 +9,10 @@ use std::time::{Duration, Instant}; use arc_swap::ArcSwap; use async_trait::async_trait; -use futures::{join, select}; -use futures_util::future::*; +use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; +use tokio::select; use tokio::sync::watch; use tokio::sync::Mutex; @@ -702,7 +702,7 @@ impl System { async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL); + let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); @@ -711,13 +711,14 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH), + RequestStrategy::with_priority(PRIO_HIGH) + .with_custom_timeout(STATUS_EXCHANGE_INTERVAL), ) .await; select! { - _ = restart_at.fuse() => {}, - _ = stop_signal.changed().fuse() => {}, + _ = tokio::time::sleep_until(restart_at.into()) => {}, + _ = stop_signal.changed() => {}, } } } @@ -799,10 +800,9 @@ impl System { #[cfg(feature = "kubernetes-discovery")] tokio::spawn(self.clone().advertise_to_kubernetes()); - let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { - _ = restart_at.fuse() => {}, - _ = stop_signal.changed().fuse() => {}, + _ = tokio::time::sleep(DISCOVERY_INTERVAL) => {}, + _ = stop_signal.changed() => {}, } } } |