aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs79
1 files changed, 50 insertions, 29 deletions
diff --git a/src/membership.rs b/src/membership.rs
index c0c88a43..87b065a7 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -4,6 +4,7 @@ use std::hash::Hasher;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -25,7 +26,7 @@ use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
-const MAX_FAILED_PINGS: usize = 3;
+const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
@@ -56,6 +57,10 @@ pub struct PingMessage {
pub struct AdvertisedNode {
pub id: UUID,
pub addr: SocketAddr,
+
+ pub is_up: bool,
+ pub last_seen: u64,
+
pub state_info: StateInfo,
}
@@ -91,17 +96,24 @@ pub struct System {
#[derive(Debug, Clone)]
pub struct Status {
- pub nodes: HashMap<UUID, StatusEntry>,
+ pub nodes: HashMap<UUID, Arc<StatusEntry>>,
pub hash: Hash,
}
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct StatusEntry {
pub addr: SocketAddr,
- pub remaining_ping_attempts: usize,
+ pub last_seen: u64,
+ pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
+impl StatusEntry {
+ pub fn is_up(&self) -> bool {
+ self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateInfo {
pub hostname: String,
@@ -126,11 +138,12 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert(
info.id,
- StatusEntry {
+ Arc::new(StatusEntry {
addr,
- remaining_ping_attempts: MAX_FAILED_PINGS,
+ last_seen: now_msec(),
+ num_failures: AtomicUsize::from(0),
state_info: info.state_info.clone(),
- },
+ }),
);
match old_status {
None => {
@@ -427,13 +440,15 @@ impl System {
let mut to_advertise = vec![];
for (id_option, addr, ping_resp) in ping_resps {
- if let Ok(Message::Ping(info)) = ping_resp {
+ if let Ok(Ok(Message::Ping(info))) = ping_resp {
let is_new = status.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
to_advertise.push(AdvertisedNode {
id: info.id,
addr: *addr,
+ is_up: true,
+ last_seen: now_msec(),
state_info: info.state_info.clone(),
});
}
@@ -446,21 +461,16 @@ impl System {
.spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
}
} else if let Some(id) = id_option {
- let remaining_attempts = status
- .nodes
- .get(id)
- .map(|x| x.remaining_ping_attempts)
- .unwrap_or(0);
- if remaining_attempts == 0 {
- warn!(
- "Removing node {} after too many failed pings",
- hex::encode(&id)
- );
- status.nodes.remove(&id);
- has_changes = true;
- } else {
- if let Some(st) = status.nodes.get_mut(id) {
- st.remaining_ping_attempts = remaining_attempts - 1;
+ if let Some(st) = status.nodes.get_mut(id) {
+ st.num_failures.fetch_add(1, Ordering::SeqCst);
+ if !st.is_up() {
+ warn!("Node {:?} seems to be down.", id);
+ if !ring.config.members.contains_key(id) {
+ info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id);
+ drop(st);
+ status.nodes.remove(&id);
+ has_changes = true;
+ }
}
}
}
@@ -521,6 +531,8 @@ impl System {
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
+ is_up: status.is_up(),
+ last_seen: status.last_seen,
state_info,
});
}
@@ -548,18 +560,27 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
let old_self = status.nodes.insert(
node.id,
- StatusEntry {
+ Arc::new(StatusEntry {
addr: self_addr,
- remaining_ping_attempts: MAX_FAILED_PINGS,
+ last_seen: now_msec(),
+ num_failures: AtomicUsize::from(0),
state_info: self.state_info.clone(),
- },
+ }),
);
has_changed = match old_self {
None => true,
Some(x) => x.addr != self_addr,
};
- } else if !status.nodes.contains_key(&node.id) {
- to_ping.push((node.addr, Some(node.id)));
+ } else {
+ let ping_them = match status.nodes.get(&node.id) {
+ // Case 1: new node
+ None => true,
+ // Case 2: the node might have changed address
+ Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr,
+ };
+ if ping_them {
+ to_ping.push((node.addr, Some(node.id)));
+ }
}
}
if has_changed {
@@ -580,8 +601,8 @@ impl System {
self: Arc<Self>,
adv: &NetworkConfig,
) -> Result<Message, Error> {
- let mut ring: Ring = self.ring.borrow().as_ref().clone();
let update_lock = self.update_lock.lock().await;
+ let mut ring: Ring = self.ring.borrow().as_ref().clone();
if adv.version > ring.config.version {
ring.config = adv.clone();