diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6f14fd..e14adf2a 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,7 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; +use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -110,9 +110,6 @@ pub struct System { pub ring: watch::Receiver<Arc<Ring>>, update_ring: Mutex<watch::Sender<Arc<Ring>>>, - /// The job runner of this node - pub background: Arc<BackgroundRunner>, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -232,7 +229,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc<BackgroundRunner>, replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { @@ -354,7 +350,6 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -372,7 +367,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -578,7 +572,7 @@ impl System { } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -631,7 +625,7 @@ impl System { || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2.pull_cluster_layout(from).await; Ok(()) }); @@ -676,7 +670,7 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2 .rpc .broadcast( @@ -687,7 +681,8 @@ impl System { .await?; Ok(()) }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -773,7 +768,7 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( + background::spawn( self.netapp .clone() .try_connect(node_addr, node_id) @@ -787,11 +782,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + background::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + background::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { |