aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs28
1 files changed, 21 insertions, 7 deletions
diff --git a/src/membership.rs b/src/membership.rs
index 8b067686..e468c9b0 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -70,9 +70,12 @@ impl Members {
nodes.sort_by_key(|(id, _status)| *id);
let mut hasher = Sha256::new();
+ eprintln!("Current set of pingable nodes: --");
for (id, status) in nodes {
+ eprintln!("{} {}", hex::encode(id), status.addr);
hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
}
+ eprintln!("END --");
self.status_hash.copy_from_slice(&hasher.result()[..]);
}
}
@@ -85,18 +88,20 @@ pub struct NodeStatus {
impl System {
pub fn new(config: Config, id: UUID) -> Self {
- System{
- config,
- id,
- rpc_client: Client::new(),
- members: RwLock::new(Members{
+ let mut members = Members{
status: HashMap::new(),
status_hash: [0u8; 32],
config: NetworkConfig{
members: HashMap::new(),
version: 0,
},
- }),
+ };
+ members.recalculate_status_hash();
+ System{
+ config,
+ id,
+ rpc_client: Client::new(),
+ members: RwLock::new(members),
}
}
@@ -197,9 +202,9 @@ impl System {
propagate.push(node.clone());
}
}
- drop(members);
if propagate.len() > 0 {
+ members.recalculate_status_hash();
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
}
@@ -242,9 +247,14 @@ impl System {
})).await;
let mut members = self.members.write().await;
+ let mut has_changes = false;
+
for (id, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(ping)) = ping_resp {
let is_new = members.handle_ping(addr.ip(), &ping);
+ if is_new {
+ has_changes = true;
+ }
if is_new || members.status_hash != ping.status_hash {
tokio::spawn(self.clone().pull_status(ping.id.clone()));
}
@@ -256,6 +266,7 @@ impl System {
if remaining_attempts == 0 {
eprintln!("Removing node {} after too many failed pings", hex::encode(id));
members.status.remove(id);
+ has_changes = true;
} else {
if let Some(st) = members.status.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1;
@@ -263,6 +274,9 @@ impl System {
}
}
}
+ if has_changes {
+ members.recalculate_status_hash();
+ }
drop(members);
restart_at.await