diff options
author | Alex <alex@adnab.me> | 2023-01-03 11:37:31 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-03 11:37:31 +0000 |
commit | 582b0761790b7958a3ba10c4b549b466997d2dcd (patch) | |
tree | b94c84bd21ef45e2480c653dc7ed2b37fd5907fb /src/rpc/rpc_helper.rs | |
parent | 76230f20282e73a5a5afa33af68152acaf732cf5 (diff) | |
parent | 939a6d67e8ace1aa38998281f52511a61f4b4d94 (diff) | |
download | garage-582b0761790b7958a3ba10c4b549b466997d2dcd.tar.gz garage-582b0761790b7958a3ba10c4b549b466997d2dcd.zip |
Merge pull request 'Some improvements to Garage internals' (#451) from internals-rework into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/451
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 |
1 files changed, 5 insertions, 13 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::<Vec<Result<_, _>>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } |