aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/membership.rs42
-rw-r--r--src/rpc.rs23
2 files changed, 29 insertions, 36 deletions
diff --git a/src/membership.rs b/src/membership.rs
index b025c2bb..8b067686 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -4,7 +4,6 @@ use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
use futures::future::join_all;
-use futures::stream::StreamExt;
use hyper::client::Client;
use tokio::sync::RwLock;
use sha2::{Sha256, Digest};
@@ -37,15 +36,24 @@ pub struct Members {
impl Members {
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
- self.status.insert(info.id.clone(),
+ let addr = SocketAddr::new(ip, info.rpc_port);
+ let old_status = self.status.insert(info.id.clone(),
NodeStatus{
- addr: SocketAddr::new(ip, info.rpc_port),
+ addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
- }).is_none()
+ });
+ match old_status {
+ None => {
+ eprintln!("Discovered new node (ping): {}", hex::encode(info.id));
+ true
+ }
+ Some(x) => x.addr != addr,
+ }
}
fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
if !self.status.contains_key(id) {
+ eprintln!("Discovered new node (advertisment): {}", hex::encode(id));
self.status.insert(id.clone(),
NodeStatus{
addr: addr.clone(),
@@ -104,7 +112,7 @@ impl System {
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
let members = self.members.read().await;
- let to = members.status.keys().cloned().collect::<Vec<_>>();
+ let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>();
drop(members);
rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
}
@@ -140,7 +148,9 @@ impl System {
{
let mut members = self.members.write().await;
let is_new = members.handle_ping(from.ip(), ping);
- members.recalculate_status_hash();
+ if is_new {
+ members.recalculate_status_hash();
+ }
let status_hash = members.status_hash.clone();
let config_version = members.config.version;
drop(members);
@@ -187,6 +197,7 @@ impl System {
propagate.push(node.clone());
}
}
+ drop(members);
if propagate.len() > 0 {
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
@@ -214,6 +225,7 @@ impl System {
let members = self.members.read().await;
let ping_addrs = members.status.iter()
+ .filter(|(id, _)| **id != self.id)
.map(|(id, status)| (id.clone(), status.addr.clone()))
.collect::<Vec<_>>();
drop(members);
@@ -257,13 +269,15 @@ impl System {
}
}
- pub async fn pull_status(self: Arc<Self>, peer: UUID) {
- let resp = rpc_call(self.clone(),
- &peer,
- &Message::PullStatus,
- PING_TIMEOUT).await;
- if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
- self.handle_advertise_nodes_up(&nodes).await;
+ pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static {
+ async move {
+ let resp = rpc_call(self.clone(),
+ &peer,
+ &Message::PullStatus,
+ PING_TIMEOUT).await;
+ if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
+ let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
+ }
}
}
@@ -273,7 +287,7 @@ impl System {
&Message::PullConfig,
PING_TIMEOUT).await;
if let Ok(Message::AdvertiseConfig(config)) = resp {
- self.handle_advertise_config(&config).await;
+ let _: Result<_, _> = self.handle_advertise_config(&config).await;
}
}
}
diff --git a/src/rpc.rs b/src/rpc.rs
index d8bb08a8..5f25dafb 100644
--- a/src/rpc.rs
+++ b/src/rpc.rs
@@ -24,31 +24,10 @@ pub async fn rpc_call_many(sys: Arc<System>,
timeout: Duration)
-> Vec<Result<Message, Error>>
{
- let resp_stream = to.iter()
+ let mut resp_stream = to.iter()
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
.collect::<FuturesUnordered<_>>();
- collect_rpc_results(resp_stream, stop_after).await
-}
-
-pub async fn rpc_call_many_addr(sys: Arc<System>,
- to: &[SocketAddr],
- msg: &Message,
- stop_after: Option<usize>,
- timeout: Duration)
- -> Vec<Result<Message, Error>>
-{
- let resp_stream = to.iter()
- .map(|to| rpc_call_addr(sys.clone(), to, msg, timeout))
- .collect::<FuturesUnordered<_>>();
-
- collect_rpc_results(resp_stream, stop_after).await
-}
-
-async fn collect_rpc_results(mut resp_stream: FuturesUnordered<impl Future<Output=Result<Message, Error>>>,
- stop_after: Option<usize>)
- -> Vec<Result<Message, Error>>
-{
let mut results = vec![];
let mut n_ok = 0;
while let Some(resp) = resp_stream.next().await {