diff options
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 79 |
1 files changed, 50 insertions, 29 deletions
diff --git a/src/membership.rs b/src/membership.rs index c0c88a43..87b065a7 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -4,6 +4,7 @@ use std::hash::Hasher; use std::io::Read; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -25,7 +26,7 @@ use crate::server::Config; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); -const MAX_FAILED_PINGS: usize = 3; +const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; @@ -56,6 +57,10 @@ pub struct PingMessage { pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, + + pub is_up: bool, + pub last_seen: u64, + pub state_info: StateInfo, } @@ -91,17 +96,24 @@ pub struct System { #[derive(Debug, Clone)] pub struct Status { - pub nodes: HashMap<UUID, StatusEntry>, + pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub hash: Hash, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct StatusEntry { pub addr: SocketAddr, - pub remaining_ping_attempts: usize, + pub last_seen: u64, + pub num_failures: AtomicUsize, pub state_info: StateInfo, } +impl StatusEntry { + pub fn is_up(&self) -> bool { + self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StateInfo { pub hostname: String, @@ -126,11 +138,12 @@ impl Status { let addr = SocketAddr::new(ip, info.rpc_port); let old_status = self.nodes.insert( info.id, - StatusEntry { + Arc::new(StatusEntry { addr, - remaining_ping_attempts: MAX_FAILED_PINGS, + last_seen: now_msec(), + num_failures: AtomicUsize::from(0), state_info: info.state_info.clone(), - }, + }), ); match old_status { None => { @@ -427,13 +440,15 @@ impl System { let mut to_advertise = vec![]; for (id_option, addr, ping_resp) in ping_resps { - if let Ok(Message::Ping(info)) = ping_resp { + if let Ok(Ok(Message::Ping(info))) = ping_resp { let is_new = status.handle_ping(addr.ip(), &info); if is_new { has_changes = true; to_advertise.push(AdvertisedNode { id: info.id, addr: *addr, + is_up: true, + last_seen: now_msec(), state_info: info.state_info.clone(), }); } @@ -446,21 +461,16 @@ impl System { .spawn_cancellable(self.clone().pull_config(info.id).map(Ok)); } } else if let Some(id) = id_option { - let remaining_attempts = status - .nodes - .get(id) - .map(|x| x.remaining_ping_attempts) - .unwrap_or(0); - if remaining_attempts == 0 { - warn!( - "Removing node {} after too many failed pings", - hex::encode(&id) - ); - status.nodes.remove(&id); - has_changes = true; - } else { - if let Some(st) = status.nodes.get_mut(id) { - st.remaining_ping_attempts = remaining_attempts - 1; + if let Some(st) = status.nodes.get_mut(id) { + st.num_failures.fetch_add(1, Ordering::SeqCst); + if !st.is_up() { + warn!("Node {:?} seems to be down.", id); + if !ring.config.members.contains_key(id) { + info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id); + drop(st); + status.nodes.remove(&id); + has_changes = true; + } } } } @@ -521,6 +531,8 @@ impl System { mem.push(AdvertisedNode { id: *node, addr: status.addr, + is_up: status.is_up(), + last_seen: status.last_seen, state_info, }); } @@ -548,18 +560,27 @@ impl System { let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); let old_self = status.nodes.insert( node.id, - StatusEntry { + Arc::new(StatusEntry { addr: self_addr, - remaining_ping_attempts: MAX_FAILED_PINGS, + last_seen: now_msec(), + num_failures: AtomicUsize::from(0), state_info: self.state_info.clone(), - }, + }), ); has_changed = match old_self { None => true, Some(x) => x.addr != self_addr, }; - } else if !status.nodes.contains_key(&node.id) { - to_ping.push((node.addr, Some(node.id))); + } else { + let ping_them = match status.nodes.get(&node.id) { + // Case 1: new node + None => true, + // Case 2: the node might have changed address + Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr, + }; + if ping_them { + to_ping.push((node.addr, Some(node.id))); + } } } if has_changed { @@ -580,8 +601,8 @@ impl System { self: Arc<Self>, adv: &NetworkConfig, ) -> Result<Message, Error> { - let mut ring: Ring = self.ring.borrow().as_ref().clone(); let update_lock = self.update_lock.lock().await; + let mut ring: Ring = self.ring.borrow().as_ref().clone(); if adv.version > ring.config.version { ring.config = adv.clone(); |