diff options
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/src/membership.rs b/src/membership.rs index b025c2bb..8b067686 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -4,7 +4,6 @@ use std::time::Duration; use std::net::{IpAddr, SocketAddr}; use futures::future::join_all; -use futures::stream::StreamExt; use hyper::client::Client; use tokio::sync::RwLock; use sha2::{Sha256, Digest}; @@ -37,15 +36,24 @@ pub struct Members { impl Members { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { - self.status.insert(info.id.clone(), + let addr = SocketAddr::new(ip, info.rpc_port); + let old_status = self.status.insert(info.id.clone(), NodeStatus{ - addr: SocketAddr::new(ip, info.rpc_port), + addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - }).is_none() + }); + match old_status { + None => { + eprintln!("Discovered new node (ping): {}", hex::encode(info.id)); + true + } + Some(x) => x.addr != addr, + } } fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool { if !self.status.contains_key(id) { + eprintln!("Discovered new node (advertisment): {}", hex::encode(id)); self.status.insert(id.clone(), NodeStatus{ addr: addr.clone(), @@ -104,7 +112,7 @@ impl System { pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) { let members = self.members.read().await; - let to = members.status.keys().cloned().collect::<Vec<_>>(); + let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>(); drop(members); rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await; } @@ -140,7 +148,9 @@ impl System { { let mut members = self.members.write().await; let is_new = members.handle_ping(from.ip(), ping); - members.recalculate_status_hash(); + if is_new { + members.recalculate_status_hash(); + } let status_hash = members.status_hash.clone(); let config_version = members.config.version; drop(members); @@ -187,6 +197,7 @@ impl System { propagate.push(node.clone()); } } + drop(members); if propagate.len() > 0 { tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); @@ -214,6 +225,7 @@ impl System { let members = self.members.read().await; let ping_addrs = members.status.iter() + .filter(|(id, _)| **id != self.id) .map(|(id, status)| (id.clone(), status.addr.clone())) .collect::<Vec<_>>(); drop(members); @@ -257,13 +269,15 @@ impl System { } } - pub async fn pull_status(self: Arc<Self>, peer: UUID) { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullStatus, - PING_TIMEOUT).await; - if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { - self.handle_advertise_nodes_up(&nodes).await; + pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static { + async move { + let resp = rpc_call(self.clone(), + &peer, + &Message::PullStatus, + PING_TIMEOUT).await; + if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { + let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; + } } } @@ -273,7 +287,7 @@ impl System { &Message::PullConfig, PING_TIMEOUT).await; if let Ok(Message::AdvertiseConfig(config)) = resp { - self.handle_advertise_config(&config).await; + let _: Result<_, _> = self.handle_advertise_config(&config).await; } } } |