diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 | ||||
-rw-r--r-- | src/rpc/system.rs | 24 |
2 files changed, 14 insertions, 28 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::<Vec<Result<_, _>>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6f14fd..e14adf2a 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,7 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; +use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -110,9 +110,6 @@ pub struct System { pub ring: watch::Receiver<Arc<Ring>>, update_ring: Mutex<watch::Sender<Arc<Ring>>>, - /// The job runner of this node - pub background: Arc<BackgroundRunner>, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -232,7 +229,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc<BackgroundRunner>, replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { @@ -354,7 +350,6 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -372,7 +367,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -578,7 +572,7 @@ impl System { } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -631,7 +625,7 @@ impl System { || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2.pull_cluster_layout(from).await; Ok(()) }); @@ -676,7 +670,7 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2 .rpc .broadcast( @@ -687,7 +681,8 @@ impl System { .await?; Ok(()) }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -773,7 +768,7 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( + background::spawn( self.netapp .clone() .try_connect(node_addr, node_id) @@ -787,11 +782,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + background::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + background::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { |