aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs156
1 files changed, 80 insertions, 76 deletions
diff --git a/src/membership.rs b/src/membership.rs
index e468c9b0..7aaa0759 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -41,30 +41,17 @@ impl Members {
NodeStatus{
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
+ datacenter: info.datacenter.clone(),
});
match old_status {
None => {
- eprintln!("Discovered new node (ping): {}", hex::encode(info.id));
+ eprintln!("Newly pingable node: {}", hex::encode(info.id));
true
}
Some(x) => x.addr != addr,
}
}
- fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
- if !self.status.contains_key(id) {
- eprintln!("Discovered new node (advertisment): {}", hex::encode(id));
- self.status.insert(id.clone(),
- NodeStatus{
- addr: addr.clone(),
- remaining_ping_attempts: MAX_FAILED_PINGS,
- });
- true
- } else {
- false
- }
- }
-
fn recalculate_status_hash(&mut self) {
let mut nodes = self.status.iter().collect::<Vec<_>>();
nodes.sort_by_key(|(id, _status)| *id);
@@ -72,7 +59,7 @@ impl Members {
let mut hasher = Sha256::new();
eprintln!("Current set of pingable nodes: --");
for (id, status) in nodes {
- eprintln!("{} {}", hex::encode(id), status.addr);
+ eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter);
hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
}
eprintln!("END --");
@@ -82,6 +69,7 @@ impl Members {
pub struct NodeStatus {
pub addr: SocketAddr,
+ pub datacenter: String,
pub remaining_ping_attempts: usize,
}
@@ -109,6 +97,7 @@ impl System {
let members = self.members.read().await;
Message::Ping(PingMessage{
id: self.id,
+ datacenter: self.config.datacenter.clone(),
rpc_port: self.config.rpc_port,
status_hash: members.status_hash.clone(),
config_version: members.config.version,
@@ -123,27 +112,69 @@ impl System {
}
pub async fn bootstrap(self: Arc<Self>) {
+ let bootstrap_peers = self.config.bootstrap_peers
+ .iter()
+ .map(|ip| (ip.clone(), None))
+ .collect::<Vec<_>>();
+ self.clone().ping_nodes(bootstrap_peers).await;
+
+ tokio::spawn(self.ping_loop());
+ }
+
+ pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
let ping_msg = self.make_ping().await;
let ping_resps = join_all(
- self.config.bootstrap_peers.iter().cloned()
- .map(|to| {
+ peers.iter()
+ .map(|(addr, id_option)| {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
async move {
- (to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await)
+ (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
}
})).await;
let mut members = self.members.write().await;
- for (addr, ping_resp) in ping_resps {
+
+ let mut has_changes = false;
+ let mut to_advertise = vec![];
+
+ for (id_option, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(info)) = ping_resp {
- members.handle_ping(addr.ip(), &info);
+ let is_new = members.handle_ping(addr.ip(), &info);
+ if is_new {
+ has_changes = true;
+ to_advertise.push(AdvertisedNode{
+ id: info.id.clone(),
+ addr: addr.clone(),
+ });
+ }
+ if is_new || members.status_hash != info.status_hash {
+ tokio::spawn(self.clone().pull_status(info.id.clone()));
+ }
+ if is_new || members.config.version < info.config_version {
+ tokio::spawn(self.clone().pull_config(info.id.clone()));
+ }
+ } else if let Some(id) = id_option {
+ let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
+ if remaining_attempts == 0 {
+ eprintln!("Removing node {} after too many failed pings", hex::encode(id));
+ members.status.remove(id);
+ has_changes = true;
+ } else {
+ if let Some(st) = members.status.get_mut(id) {
+ st.remaining_ping_attempts = remaining_attempts - 1;
+ }
+ }
}
}
- members.recalculate_status_hash();
+ if has_changes {
+ members.recalculate_status_hash();
+ }
drop(members);
- tokio::spawn(self.ping_loop());
+ if to_advertise.len() > 0 {
+ self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await;
+ }
}
pub async fn handle_ping(self: Arc<Self>,
@@ -191,21 +222,36 @@ impl System {
adv: &[AdvertisedNode])
-> Result<Message, Error>
{
- let mut propagate = vec![];
+ let mut to_ping = vec![];
let mut members = self.members.write().await;
+ let mut has_changed = false;
+
for node in adv.iter() {
- let is_new = members.handle_advertise_node(&node.id, &node.addr);
- if is_new {
- tokio::spawn(self.clone().pull_status(node.id.clone()));
- tokio::spawn(self.clone().pull_config(node.id.clone()));
- propagate.push(node.clone());
+ if node.id == self.id {
+ // learn our own ip address
+ let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
+ let old_self = members.status.insert(node.id.clone(),
+ NodeStatus{
+ addr: self_addr,
+ datacenter: self.config.datacenter.clone(),
+ remaining_ping_attempts: MAX_FAILED_PINGS,
+ });
+ has_changed = match old_self {
+ None => true,
+ Some(x) => x.addr != self_addr,
+ };
+ } else if !members.status.contains_key(&node.id) {
+ to_ping.push((node.addr.clone(), Some(node.id.clone())));
}
}
-
- if propagate.len() > 0 {
+ if has_changed {
members.recalculate_status_hash();
- tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
+ }
+ drop(members);
+
+ if to_ping.len() > 0 {
+ tokio::spawn(self.clone().ping_nodes(to_ping));
}
Ok(Message::Ok)
@@ -231,53 +277,11 @@ impl System {
let members = self.members.read().await;
let ping_addrs = members.status.iter()
.filter(|(id, _)| **id != self.id)
- .map(|(id, status)| (id.clone(), status.addr.clone()))
+ .map(|(id, status)| (status.addr.clone(), Some(id.clone())))
.collect::<Vec<_>>();
drop(members);
- let ping_msg = self.make_ping().await;
- let ping_resps = join_all(
- ping_addrs.iter()
- .map(|(id, addr)| {
- let sys = self.clone();
- let ping_msg_ref = &ping_msg;
- async move {
- (id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
- }
- })).await;
-
- let mut members = self.members.write().await;
- let mut has_changes = false;
-
- for (id, addr, ping_resp) in ping_resps {
- if let Ok(Message::Ping(ping)) = ping_resp {
- let is_new = members.handle_ping(addr.ip(), &ping);
- if is_new {
- has_changes = true;
- }
- if is_new || members.status_hash != ping.status_hash {
- tokio::spawn(self.clone().pull_status(ping.id.clone()));
- }
- if is_new || members.config.version < ping.config_version {
- tokio::spawn(self.clone().pull_config(ping.id.clone()));
- }
- } else {
- let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
- if remaining_attempts == 0 {
- eprintln!("Removing node {} after too many failed pings", hex::encode(id));
- members.status.remove(id);
- has_changes = true;
- } else {
- if let Some(st) = members.status.get_mut(id) {
- st.remaining_ping_attempts = remaining_attempts - 1;
- }
- }
- }
- }
- if has_changes {
- members.recalculate_status_hash();
- }
- drop(members);
+ self.clone().ping_nodes(ping_addrs).await;
restart_at.await
}