From 280d1be7b1fde13d23e47f75aa8acd2f90efb81f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Dec 2022 15:28:57 +0100 Subject: Refactor health check and add ability to return it in json --- src/rpc/lib.rs | 1 + src/rpc/replication_mode.rs | 57 ++++++++++++++++++++++++++ src/rpc/system.rs | 99 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 src/rpc/replication_mode.rs (limited to 'src/rpc') diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 92caf75d..86f63568 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -9,6 +9,7 @@ mod consul; mod kubernetes; pub mod layout; +pub mod replication_mode; pub mod ring; pub mod system; diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs new file mode 100644 index 00000000..e244e063 --- /dev/null +++ b/src/rpc/replication_mode.rs @@ -0,0 +1,57 @@ +#[derive(Clone, Copy)] +pub enum ReplicationMode { + None, + TwoWay, + TwoWayDangerous, + ThreeWay, + ThreeWayDegraded, + ThreeWayDangerous, +} + +impl ReplicationMode { + pub fn parse(v: &str) -> Option { + match v { + "none" | "1" => Some(Self::None), + "2" => Some(Self::TwoWay), + "2-dangerous" => Some(Self::TwoWayDangerous), + "3" => Some(Self::ThreeWay), + "3-degraded" => Some(Self::ThreeWayDegraded), + "3-dangerous" => Some(Self::ThreeWayDangerous), + _ => None, + } + } + + pub fn control_write_max_faults(&self) -> usize { + match self { + Self::None => 0, + _ => 1, + } + } + + pub fn replication_factor(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay | Self::TwoWayDangerous => 2, + Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3, + } + } + + pub fn read_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay | Self::TwoWayDangerous => 1, + Self::ThreeWay => 2, + Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1, + } + } + + pub fn write_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay => 2, + Self::TwoWayDangerous => 1, + Self::ThreeWay | Self::ThreeWayDegraded => 2, + Self::ThreeWayDangerous => 1, + } + } +} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index d6576f20..2c6f14fd 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -35,6 +35,7 @@ use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; +use crate::replication_mode::*; use crate::ring::*; use crate::rpc_helper::*; @@ -102,6 +103,7 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, + replication_mode: ReplicationMode, replication_factor: usize, /// The ring @@ -136,6 +138,37 @@ pub struct KnownNodeInfo { pub status: NodeStatus, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ClusterHealth { + /// The current health status of the cluster (see below) + pub status: ClusterHealthStatus, + /// Number of nodes already seen once in the cluster + pub known_nodes: usize, + /// Number of nodes currently connected + pub connected_nodes: usize, + /// Number of storage nodes declared in the current layout + pub storage_nodes: usize, + /// Number of storage nodes currently connected + pub storage_nodes_ok: usize, + /// Number of partitions in the layout + pub partitions: usize, + /// Number of partitions for which we have a quorum of connected nodes + pub partitions_quorum: usize, + /// Number of partitions for which all storage nodes are connected + pub partitions_all_ok: usize, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ClusterHealthStatus { + /// All nodes are available + Healthy, + /// Some storage nodes are unavailable, but quorum is stil + /// achieved for all partitions + Degraded, + /// Quorum is not available for some partitions + Unavailable, +} + pub fn read_node_id(metadata_dir: &Path) -> Result { let mut pubkey_file = metadata_dir.to_path_buf(); pubkey_file.push("node_key.pub"); @@ -200,9 +233,11 @@ impl System { pub fn new( network_key: NetworkKey, background: Arc, - replication_factor: usize, + replication_mode: ReplicationMode, config: &Config, ) -> Result, Error> { + let replication_factor = replication_mode.replication_factor(); + let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -324,6 +359,7 @@ impl System { config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, + replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -429,6 +465,67 @@ impl System { } } + pub fn health(&self) -> ClusterHealth { + let ring: Arc<_> = self.ring.borrow().clone(); + let quorum = self.replication_mode.write_quorum(); + let replication_factor = self.replication_factor; + + let nodes = self + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::>(); + let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + + let storage_nodes = ring + .layout + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) + .collect::>(); + let storage_nodes_ok = storage_nodes + .iter() + .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count(); + + let partitions = ring.partitions(); + let partitions_n_up = partitions + .iter() + .map(|(_, h)| { + let pn = ring.get_nodes(h, ring.replication_factor); + pn.iter() + .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count() + }) + .collect::>(); + let partitions_all_ok = partitions_n_up + .iter() + .filter(|c| **c == replication_factor) + .count(); + let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); + + let status = + if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { + ClusterHealthStatus::Healthy + } else if partitions_quorum == partitions.len() { + ClusterHealthStatus::Degraded + } else { + ClusterHealthStatus::Unavailable + }; + + ClusterHealth { + status, + known_nodes: nodes.len(), + connected_nodes, + storage_nodes: storage_nodes.len(), + storage_nodes_ok, + partitions: partitions.len(), + partitions_quorum, + partitions_all_ok, + } + } + // ---- INTERNALS ---- #[cfg(feature = "consul-discovery")] -- cgit v1.2.3