From 53289b69e5037700689665b4edf20f2382ff15f6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Apr 2020 18:51:11 +0200 Subject: Background task runner that replaces tokio::spawn --- src/membership.rs | 56 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 18 deletions(-) (limited to 'src/membership.rs') diff --git a/src/membership.rs b/src/membership.rs index b713c7a4..cb8dba99 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -8,10 +8,14 @@ use std::sync::Arc; use std::time::Duration; use futures::future::join_all; +use futures::select; +use futures_util::future::*; use sha2::{Digest, Sha256}; use tokio::prelude::*; +use tokio::sync::watch; use tokio::sync::RwLock; +use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; use crate::proto::*; @@ -29,6 +33,8 @@ pub struct System { pub rpc_client: RpcClient, pub members: RwLock, + + pub background: Arc, } pub struct Members { @@ -181,7 +187,7 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result { } impl System { - pub fn new(config: Config, id: UUID) -> Self { + pub fn new(config: Config, id: UUID, background: Arc) -> Self { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, Err(e) => { @@ -209,24 +215,24 @@ impl System { id, rpc_client: RpcClient::new(), members: RwLock::new(members), + background, } } - async fn save_network_config(self: Arc) { + async fn save_network_config(self: Arc) -> Result<(), Error> { let mut path = self.config.metadata_dir.clone(); path.push("network_config"); let members = self.members.read().await; let data = - rmp_to_vec_all_named(&members.config).expect("Error while encoding network config"); + rmp_to_vec_all_named(&members.config)?; drop(members); let mut f = tokio::fs::File::create(path.as_path()) - .await - .expect("Could not create network_config"); + .await?; f.write_all(&data[..]) - .await - .expect("Could not write network_config"); + .await?; + Ok(()) } pub async fn make_ping(&self) -> Message { @@ -260,7 +266,10 @@ impl System { .collect::>(); self.clone().ping_nodes(bootstrap_peers).await; - tokio::spawn(self.ping_loop()); + self.background + .clone() + .spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok)) + .await; } pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { @@ -294,10 +303,12 @@ impl System { }); } if is_new || members.status_hash != info.status_hash { - tokio::spawn(self.clone().pull_status(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); } if is_new || members.config.version < info.config_version { - tokio::spawn(self.clone().pull_config(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); } } else if let Some(id) = id_option { let remaining_attempts = members @@ -345,10 +356,10 @@ impl System { drop(members); if is_new || status_hash != ping.status_hash { - tokio::spawn(self.clone().pull_status(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); } if is_new || config_version < ping.config_version { - tokio::spawn(self.clone().pull_config(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); } Ok(self.make_ping().await) @@ -405,7 +416,7 @@ impl System { drop(members); if to_ping.len() > 0 { - tokio::spawn(self.clone().ping_nodes(to_ping)); + self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } Ok(Message::Ok) @@ -420,17 +431,18 @@ impl System { members.config = adv.clone(); members.rebuild_ring(); - tokio::spawn( + self.background.spawn_cancellable( self.clone() - .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT), + .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) + .map(Ok), ); - tokio::spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_network_config()); } Ok(Message::Ok) } - pub async fn ping_loop(self: Arc) { + pub async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); @@ -445,7 +457,15 @@ impl System { self.clone().ping_nodes(ping_addrs).await; - restart_at.await + select! { + _ = restart_at.fuse() => (), + must_exit = stop_signal.recv().fuse() => { + match must_exit { + None | Some(true) => return, + _ => (), + } + } + } } } -- cgit v1.2.3