diff options
-rw-r--r-- | src/rpc/system.rs | 77 |
1 files changed, 45 insertions, 32 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index dc127afb..c7d41ee4 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,5 +1,5 @@ //! Module containing structs related to membership management -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; @@ -418,48 +418,61 @@ impl System { } pub fn health(&self) -> ClusterHealth { - // TODO: adapt this function to take into account layout history - // when estimating cluster health, and not just use current layout - let quorum = self.replication_mode.write_quorum(); - let replication_factor = self.replication_factor; + // Gather information about running nodes. + // Technically, `nodes` contains currently running nodes, as well + // as nodes that this Garage process has been connected to at least + // once since it started. let nodes = self .get_known_nodes() .into_iter() .map(|n| (n.id, n)) .collect::<HashMap<Uuid, _>>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false); + + // Acquire a rwlock read-lock to the current cluster layout + let layout = self.cluster_layout(); + + // Obtain information about nodes that have a role as storage nodes + // in one of the active layout versions + let mut storage_nodes = HashSet::<Uuid>::with_capacity(16); + for ver in layout.versions.iter() { + storage_nodes.extend( + ver.roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) + .map(|(n, _, _)| *n), + ) + } + let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count(); - let layout = self.cluster_layout(); // acquires a rwlock - - let storage_nodes = layout - .current() - .roles - .items() - .iter() - .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) - .collect::<Vec<_>>(); - let storage_nodes_ok = storage_nodes - .iter() - .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count(); - + // Determine the number of partitions that have: + // - a quorum of up nodes for all write sets (i.e. are available) + // - for which all nodes in all write sets are up (i.e. are fully healthy) let partitions = layout.current().partitions().collect::<Vec<_>>(); - let partitions_n_up = partitions - .iter() - .map(|(_, h)| { - let pn = layout.current().nodes_of(h, replication_factor); - pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count() - }) - .collect::<Vec<usize>>(); - let partitions_all_ok = partitions_n_up - .iter() - .filter(|c| **c == replication_factor) - .count(); - let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); + let mut partitions_quorum = 0; + let mut partitions_all_ok = 0; + for (_, hash) in partitions.iter() { + let write_sets = layout + .versions + .iter() + .map(|x| x.nodes_of(hash, x.replication_factor)); + let has_quorum = write_sets + .clone() + .all(|set| set.filter(|x| node_up(x)).count() >= quorum); + let all_ok = write_sets.clone().all(|mut set| set.all(|x| node_up(&x))); + if has_quorum { + partitions_quorum += 1; + } + if all_ok { + partitions_all_ok += 1; + } + } + // Determine overall cluster status let status = if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { ClusterHealthStatus::Healthy |