aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs56
1 files changed, 38 insertions, 18 deletions
diff --git a/src/membership.rs b/src/membership.rs
index b713c7a4..cb8dba99 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -8,10 +8,14 @@ use std::sync::Arc;
use std::time::Duration;
use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
use sha2::{Digest, Sha256};
use tokio::prelude::*;
+use tokio::sync::watch;
use tokio::sync::RwLock;
+use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
use crate::proto::*;
@@ -29,6 +33,8 @@ pub struct System {
pub rpc_client: RpcClient,
pub members: RwLock<Members>,
+
+ pub background: Arc<BackgroundRunner>,
}
pub struct Members {
@@ -181,7 +187,7 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
}
impl System {
- pub fn new(config: Config, id: UUID) -> Self {
+ pub fn new(config: Config, id: UUID, background: Arc<BackgroundRunner>) -> Self {
let net_config = match read_network_config(&config.metadata_dir) {
Ok(x) => x,
Err(e) => {
@@ -209,24 +215,24 @@ impl System {
id,
rpc_client: RpcClient::new(),
members: RwLock::new(members),
+ background,
}
}
- async fn save_network_config(self: Arc<Self>) {
+ async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let mut path = self.config.metadata_dir.clone();
path.push("network_config");
let members = self.members.read().await;
let data =
- rmp_to_vec_all_named(&members.config).expect("Error while encoding network config");
+ rmp_to_vec_all_named(&members.config)?;
drop(members);
let mut f = tokio::fs::File::create(path.as_path())
- .await
- .expect("Could not create network_config");
+ .await?;
f.write_all(&data[..])
- .await
- .expect("Could not write network_config");
+ .await?;
+ Ok(())
}
pub async fn make_ping(&self) -> Message {
@@ -260,7 +266,10 @@ impl System {
.collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
- tokio::spawn(self.ping_loop());
+ self.background
+ .clone()
+ .spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok))
+ .await;
}
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@@ -294,10 +303,12 @@ impl System {
});
}
if is_new || members.status_hash != info.status_hash {
- tokio::spawn(self.clone().pull_status(info.id.clone()));
+ self.background
+ .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
}
if is_new || members.config.version < info.config_version {
- tokio::spawn(self.clone().pull_config(info.id.clone()));
+ self.background
+ .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
}
} else if let Some(id) = id_option {
let remaining_attempts = members
@@ -345,10 +356,10 @@ impl System {
drop(members);
if is_new || status_hash != ping.status_hash {
- tokio::spawn(self.clone().pull_status(ping.id.clone()));
+ self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
}
if is_new || config_version < ping.config_version {
- tokio::spawn(self.clone().pull_config(ping.id.clone()));
+ self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
}
Ok(self.make_ping().await)
@@ -405,7 +416,7 @@ impl System {
drop(members);
if to_ping.len() > 0 {
- tokio::spawn(self.clone().ping_nodes(to_ping));
+ self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
}
Ok(Message::Ok)
@@ -420,17 +431,18 @@ impl System {
members.config = adv.clone();
members.rebuild_ring();
- tokio::spawn(
+ self.background.spawn_cancellable(
self.clone()
- .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT),
+ .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
+ .map(Ok),
);
- tokio::spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_network_config());
}
Ok(Message::Ok)
}
- pub async fn ping_loop(self: Arc<Self>) {
+ pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
@@ -445,7 +457,15 @@ impl System {
self.clone().ping_nodes(ping_addrs).await;
- restart_at.await
+ select! {
+ _ = restart_at.fuse() => (),
+ must_exit = stop_signal.recv().fuse() => {
+ match must_exit {
+ None | Some(true) => return,
+ _ => (),
+ }
+ }
+ }
}
}