aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-14 12:51:16 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-14 12:51:42 +0100
commitd56c472712df7c064387429a5af73d3bc0eb438d (patch)
treed3d73490ae3167a228ee91f69a583242c572cd9f /src/rpc
parent2183518edccadef47cdeaf6476033b52d8832d6e (diff)
downloadgarage-d56c472712df7c064387429a5af73d3bc0eb438d.tar.gz
garage-d56c472712df7c064387429a5af73d3bc0eb438d.zip
Refactor background runner and get rid of job worker
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/rpc_helper.rs18
-rw-r--r--src/rpc/system.rs24
2 files changed, 14 insertions, 28 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 949aced6..1ec250c3 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -5,7 +5,6 @@ use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
-use futures_util::future::FutureExt;
use tokio::select;
use tokio::sync::watch;
@@ -24,7 +23,6 @@ pub use netapp::message::{
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
@@ -104,7 +101,6 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
rpc_timeout: Option<Duration>,
) -> Self {
@@ -113,7 +109,6 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- background,
ring,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
@@ -377,16 +372,13 @@ impl RpcHelper {
if !resp_stream.is_empty() {
// Continue remaining requests in background.
- // Continue the remaining requests immediately using tokio::spawn
- // but enqueue a task in the background runner
- // to ensure that the process won't exit until the requests are done
- // (if we had just enqueued the resp_stream.collect directly in the background runner,
- // the requests might have been put on hold in the background runner's queue,
- // in which case they might timeout or otherwise fail)
- let wait_finished_fut = tokio::spawn(async move {
+ // Note: these requests can get interrupted on process shutdown,
+ // we must not count on them being executed for certain.
+ // For all background things that have to happen with certainty,
+ // they have to be put in a proper queue that is persisted to disk.
+ tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await;
});
- self.0.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6f14fd..e14adf2a 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -21,7 +21,7 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
+use garage_util::background::{self};
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -110,9 +110,6 @@ pub struct System {
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
}
@@ -232,7 +229,6 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
@@ -354,7 +350,6 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
@@ -372,7 +367,6 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
@@ -578,7 +572,7 @@ impl System {
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -631,7 +625,7 @@ impl System {
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
+ background::spawn(async move {
self2.pull_cluster_layout(from).await;
Ok(())
});
@@ -676,7 +670,7 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
+ background::spawn(async move {
self2
.rpc
.broadcast(
@@ -687,7 +681,8 @@ impl System {
.await?;
Ok(())
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -773,7 +768,7 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
+ background::spawn(
self.netapp
.clone()
.try_connect(node_addr, node_id)
@@ -787,11 +782,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ background::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ background::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {