diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
commit | 570e5e5bbb7a3eac41350db9433e28ed289b97f4 (patch) | |
tree | a7fc299ba180098be5a3bef28a39256870ce697b /src/rpc/rpc_helper.rs | |
parent | 6e44369cbc810b8912ca0f7f5fd293e87f10c851 (diff) | |
parent | 4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff) | |
download | garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.tar.gz garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 |
1 files changed, 5 insertions, 13 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::<Vec<Result<_, _>>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } |