aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/system.rs77
1 files changed, 45 insertions, 32 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index dc127afb..c7d41ee4 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,5 +1,5 @@
//! Module containing structs related to membership management
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
@@ -418,48 +418,61 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- // TODO: adapt this function to take into account layout history
- // when estimating cluster health, and not just use current layout
-
let quorum = self.replication_mode.write_quorum();
- let replication_factor = self.replication_factor;
+ // Gather information about running nodes.
+ // Technically, `nodes` contains currently running nodes, as well
+ // as nodes that this Garage process has been connected to at least
+ // once since it started.
let nodes = self
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false);
+
+ // Acquire a rwlock read-lock to the current cluster layout
+ let layout = self.cluster_layout();
+
+ // Obtain information about nodes that have a role as storage nodes
+ // in one of the active layout versions
+ let mut storage_nodes = HashSet::<Uuid>::with_capacity(16);
+ for ver in layout.versions.iter() {
+ storage_nodes.extend(
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .map(|(n, _, _)| *n),
+ )
+ }
+ let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
- let layout = self.cluster_layout(); // acquires a rwlock
-
- let storage_nodes = layout
- .current()
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
- .collect::<Vec<_>>();
- let storage_nodes_ok = storage_nodes
- .iter()
- .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count();
-
+ // Determine the number of partitions that have:
+ // - a quorum of up nodes for all write sets (i.e. are available)
+ // - for which all nodes in all write sets are up (i.e. are fully healthy)
let partitions = layout.current().partitions().collect::<Vec<_>>();
- let partitions_n_up = partitions
- .iter()
- .map(|(_, h)| {
- let pn = layout.current().nodes_of(h, replication_factor);
- pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count()
- })
- .collect::<Vec<usize>>();
- let partitions_all_ok = partitions_n_up
- .iter()
- .filter(|c| **c == replication_factor)
- .count();
- let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
+ let mut partitions_quorum = 0;
+ let mut partitions_all_ok = 0;
+ for (_, hash) in partitions.iter() {
+ let write_sets = layout
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(hash, x.replication_factor));
+ let has_quorum = write_sets
+ .clone()
+ .all(|set| set.filter(|x| node_up(x)).count() >= quorum);
+ let all_ok = write_sets.clone().all(|mut set| set.all(|x| node_up(&x)));
+ if has_quorum {
+ partitions_quorum += 1;
+ }
+ if all_ok {
+ partitions_all_ok += 1;
+ }
+ }
+ // Determine overall cluster status
let status =
if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy