diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
commit | 53289b69e5037700689665b4edf20f2382ff15f6 (patch) | |
tree | e9920e1dce29e94bfddc3812b44ee2519ba14bed /src/membership.rs | |
parent | 4a2624b76afff714a70ee7a9e4ffd97c54c7ecc4 (diff) | |
download | garage-53289b69e5037700689665b4edf20f2382ff15f6.tar.gz garage-53289b69e5037700689665b4edf20f2382ff15f6.zip |
Background task runner that replaces tokio::spawn
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 56 |
1 files changed, 38 insertions, 18 deletions
diff --git a/src/membership.rs b/src/membership.rs index b713c7a4..cb8dba99 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -8,10 +8,14 @@ use std::sync::Arc; use std::time::Duration; use futures::future::join_all; +use futures::select; +use futures_util::future::*; use sha2::{Digest, Sha256}; use tokio::prelude::*; +use tokio::sync::watch; use tokio::sync::RwLock; +use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; use crate::proto::*; @@ -29,6 +33,8 @@ pub struct System { pub rpc_client: RpcClient, pub members: RwLock<Members>, + + pub background: Arc<BackgroundRunner>, } pub struct Members { @@ -181,7 +187,7 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { } impl System { - pub fn new(config: Config, id: UUID) -> Self { + pub fn new(config: Config, id: UUID, background: Arc<BackgroundRunner>) -> Self { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, Err(e) => { @@ -209,24 +215,24 @@ impl System { id, rpc_client: RpcClient::new(), members: RwLock::new(members), + background, } } - async fn save_network_config(self: Arc<Self>) { + async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let mut path = self.config.metadata_dir.clone(); path.push("network_config"); let members = self.members.read().await; let data = - rmp_to_vec_all_named(&members.config).expect("Error while encoding network config"); + rmp_to_vec_all_named(&members.config)?; drop(members); let mut f = tokio::fs::File::create(path.as_path()) - .await - .expect("Could not create network_config"); + .await?; f.write_all(&data[..]) - .await - .expect("Could not write network_config"); + .await?; + Ok(()) } pub async fn make_ping(&self) -> Message { @@ -260,7 +266,10 @@ impl System { .collect::<Vec<_>>(); self.clone().ping_nodes(bootstrap_peers).await; - tokio::spawn(self.ping_loop()); + self.background + .clone() + .spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok)) + .await; } pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { @@ -294,10 +303,12 @@ impl System { }); } if is_new || members.status_hash != info.status_hash { - tokio::spawn(self.clone().pull_status(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); } if is_new || members.config.version < info.config_version { - tokio::spawn(self.clone().pull_config(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); } } else if let Some(id) = id_option { let remaining_attempts = members @@ -345,10 +356,10 @@ impl System { drop(members); if is_new || status_hash != ping.status_hash { - tokio::spawn(self.clone().pull_status(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); } if is_new || config_version < ping.config_version { - tokio::spawn(self.clone().pull_config(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); } Ok(self.make_ping().await) @@ -405,7 +416,7 @@ impl System { drop(members); if to_ping.len() > 0 { - tokio::spawn(self.clone().ping_nodes(to_ping)); + self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } Ok(Message::Ok) @@ -420,17 +431,18 @@ impl System { members.config = adv.clone(); members.rebuild_ring(); - tokio::spawn( + self.background.spawn_cancellable( self.clone() - .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT), + .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) + .map(Ok), ); - tokio::spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_network_config()); } Ok(Message::Ok) } - pub async fn ping_loop(self: Arc<Self>) { + pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); @@ -445,7 +457,15 @@ impl System { self.clone().ping_nodes(ping_addrs).await; - restart_at.await + select! { + _ = restart_at.fuse() => (), + must_exit = stop_signal.recv().fuse() => { + match must_exit { + None | Some(true) => return, + _ => (), + } + } + } } } |