aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-05 15:28:57 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-05 15:28:57 +0100
commit280d1be7b1fde13d23e47f75aa8acd2f90efb81f (patch)
tree5a74e5bdef1cef54360b2b3ca57a53bf1ce61ba2
parent2065f011ca3f7c736feecffd108c89d3f8019e85 (diff)
downloadgarage-280d1be7b1fde13d23e47f75aa8acd2f90efb81f.tar.gz
garage-280d1be7b1fde13d23e47f75aa8acd2f90efb81f.zip
Refactor health check and add ability to return it in json
-rw-r--r--src/api/admin/api_server.rs131
-rw-r--r--src/api/admin/router.rs9
-rw-r--r--src/model/garage.rs9
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/replication_mode.rs (renamed from src/table/replication/mode.rs)0
-rw-r--r--src/rpc/system.rs99
-rw-r--r--src/table/replication/mod.rs2
7 files changed, 156 insertions, 95 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 9beeda1f..f86ed599 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
use std::fmt::Write;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -17,8 +16,7 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
-use garage_rpc::layout::NodeRoleV;
-use garage_util::data::Uuid;
+use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError;
use crate::generic_server::*;
@@ -80,92 +78,61 @@ impl AdminApiServer {
.body(Body::empty())?)
}
- fn handle_health(&self) -> Result<Response<Body>, Error> {
- let ring: Arc<_> = self.garage.system.ring.borrow().clone();
- let quorum = self.garage.replication_mode.write_quorum();
- let replication_factor = self.garage.replication_mode.replication_factor();
+ fn handle_health(&self, format: Option<&str>) -> Result<Response<Body>, Error> {
+ let health = self.garage.system.health();
- let nodes = self
- .garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|n| (n.id, n))
- .collect::<HashMap<Uuid, _>>();
- let n_nodes_connected = nodes.iter().filter(|(_, n)| n.is_up).count();
-
- let storage_nodes = ring
- .layout
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
- .collect::<Vec<_>>();
- let n_storage_nodes_ok = storage_nodes
- .iter()
- .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count();
-
- let partitions = ring.partitions();
- let partitions_n_up = partitions
- .iter()
- .map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count()
- })
- .collect::<Vec<usize>>();
- let n_partitions_full_ok = partitions_n_up
- .iter()
- .filter(|c| **c == replication_factor)
- .count();
- let n_partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
-
- let (status, status_str) = if n_partitions_quorum == partitions.len()
- && n_storage_nodes_ok == storage_nodes.len()
- {
- (StatusCode::OK, "Garage is fully operational")
- } else if n_partitions_quorum == partitions.len() {
- (
+ let (status, status_str) = match health.status {
+ ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
+ ClusterHealthStatus::Degraded => (
StatusCode::OK,
"Garage is operational but some storage nodes are unavailable",
- )
- } else {
- (
+ ),
+ ClusterHealthStatus::Unavailable => (
StatusCode::SERVICE_UNAVAILABLE,
"Quorum is not available for some/all partitions, reads and writes will fail",
- )
+ ),
};
- let mut buf = status_str.to_string();
- writeln!(
- &mut buf,
- "\nAll nodes: {} connected, {} known",
- n_nodes_connected,
- nodes.len()
- )
- .unwrap();
- writeln!(
- &mut buf,
- "Storage nodes: {} connected, {} in layout",
- n_storage_nodes_ok,
- storage_nodes.len()
- )
- .unwrap();
- writeln!(&mut buf, "Number of partitions: {}", partitions.len()).unwrap();
- writeln!(&mut buf, "Partitions with quorum: {}", n_partitions_quorum).unwrap();
- writeln!(
- &mut buf,
- "Partitions with all nodes available: {}",
- n_partitions_full_ok
- )
- .unwrap();
+ let resp = Response::builder().status(status);
- Ok(Response::builder()
- .status(status)
- .header(http::header::CONTENT_TYPE, "text/plain")
- .body(Body::from(buf))?)
+ if matches!(format, Some("json")) {
+ let resp_json =
+ serde_json::to_string_pretty(&health).map_err(garage_util::error::Error::from)?;
+ Ok(resp
+ .header(http::header::CONTENT_TYPE, "application/json")
+ .body(Body::from(resp_json))?)
+ } else {
+ let mut buf = status_str.to_string();
+ writeln!(
+ &mut buf,
+ "\nAll nodes: {} connected, {} known",
+ health.connected_nodes, health.known_nodes,
+ )
+ .unwrap();
+ writeln!(
+ &mut buf,
+ "Storage nodes: {} connected, {} in layout",
+ health.storage_nodes_ok, health.storage_nodes
+ )
+ .unwrap();
+ writeln!(&mut buf, "Number of partitions: {}", health.partitions).unwrap();
+ writeln!(
+ &mut buf,
+ "Partitions with quorum: {}",
+ health.partitions_quorum
+ )
+ .unwrap();
+ writeln!(
+ &mut buf,
+ "Partitions with all nodes available: {}",
+ health.partitions_all_ok
+ )
+ .unwrap();
+
+ Ok(resp
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(Body::from(buf))?)
+ }
}
fn handle_metrics(&self) -> Result<Response<Body>, Error> {
@@ -240,7 +207,7 @@ impl ApiHandler for AdminApiServer {
match endpoint {
Endpoint::Options => self.handle_options(&req),
- Endpoint::Health => self.handle_health(),
+ Endpoint::Health { format } => self.handle_health(format.as_deref()),
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index 14411f75..6ffcc131 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -17,7 +17,9 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
Options,
- Health,
+ Health {
+ format: Option<String>,
+ },
Metrics,
GetClusterStatus,
ConnectClusterNodes,
@@ -90,7 +92,7 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options,
- GET "/health" => Health,
+ GET "/health" => Health (query_opt::format),
GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus,
POST "/v0/connect" => ConnectClusterNodes,
@@ -133,7 +135,7 @@ impl Endpoint {
/// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization {
match self {
- Self::Health => Authorization::None,
+ Self::Health { .. } => Authorization::None,
Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken,
}
@@ -141,6 +143,7 @@ impl Endpoint {
}
generateQueryParameters! {
+ "format" => format,
"id" => id,
"search" => search,
"globalAlias" => global_alias,
diff --git a/src/model/garage.rs b/src/model/garage.rs
index c2aabea1..e34d034f 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
+use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::system::System;
use garage_block::manager::*;
-use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
@@ -167,12 +167,7 @@ impl Garage {
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let system = System::new(
- network_key,
- background.clone(),
- replication_mode.replication_factor(),
- &config,
- )?;
+ let system = System::new(network_key, background.clone(), replication_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 92caf75d..86f63568 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -9,6 +9,7 @@ mod consul;
mod kubernetes;
pub mod layout;
+pub mod replication_mode;
pub mod ring;
pub mod system;
diff --git a/src/table/replication/mode.rs b/src/rpc/replication_mode.rs
index e244e063..e244e063 100644
--- a/src/table/replication/mode.rs
+++ b/src/rpc/replication_mode.rs
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index d6576f20..2c6f14fd 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -35,6 +35,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
+use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -102,6 +103,7 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ replication_mode: ReplicationMode,
replication_factor: usize,
/// The ring
@@ -136,6 +138,37 @@ pub struct KnownNodeInfo {
pub status: NodeStatus,
}
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub struct ClusterHealth {
+ /// The current health status of the cluster (see below)
+ pub status: ClusterHealthStatus,
+ /// Number of nodes already seen once in the cluster
+ pub known_nodes: usize,
+ /// Number of nodes currently connected
+ pub connected_nodes: usize,
+ /// Number of storage nodes declared in the current layout
+ pub storage_nodes: usize,
+ /// Number of storage nodes currently connected
+ pub storage_nodes_ok: usize,
+ /// Number of partitions in the layout
+ pub partitions: usize,
+ /// Number of partitions for which we have a quorum of connected nodes
+ pub partitions_quorum: usize,
+ /// Number of partitions for which all storage nodes are connected
+ pub partitions_all_ok: usize,
+}
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub enum ClusterHealthStatus {
+ /// All nodes are available
+ Healthy,
+ /// Some storage nodes are unavailable, but quorum is stil
+ /// achieved for all partitions
+ Degraded,
+ /// Quorum is not available for some partitions
+ Unavailable,
+}
+
pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub");
@@ -200,9 +233,11 @@ impl System {
pub fn new(
network_key: NetworkKey,
background: Arc<BackgroundRunner>,
- replication_factor: usize,
+ replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
+ let replication_factor = replication_mode.replication_factor();
+
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -324,6 +359,7 @@ impl System {
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
+ replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -429,6 +465,67 @@ impl System {
}
}
+ pub fn health(&self) -> ClusterHealth {
+ let ring: Arc<_> = self.ring.borrow().clone();
+ let quorum = self.replication_mode.write_quorum();
+ let replication_factor = self.replication_factor;
+
+ let nodes = self
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<Uuid, _>>();
+ let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+
+ let storage_nodes = ring
+ .layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .collect::<Vec<_>>();
+ let storage_nodes_ok = storage_nodes
+ .iter()
+ .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count();
+
+ let partitions = ring.partitions();
+ let partitions_n_up = partitions
+ .iter()
+ .map(|(_, h)| {
+ let pn = ring.get_nodes(h, ring.replication_factor);
+ pn.iter()
+ .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count()
+ })
+ .collect::<Vec<usize>>();
+ let partitions_all_ok = partitions_n_up
+ .iter()
+ .filter(|c| **c == replication_factor)
+ .count();
+ let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
+
+ let status =
+ if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ ClusterHealthStatus::Healthy
+ } else if partitions_quorum == partitions.len() {
+ ClusterHealthStatus::Degraded
+ } else {
+ ClusterHealthStatus::Unavailable
+ };
+
+ ClusterHealth {
+ status,
+ known_nodes: nodes.len(),
+ connected_nodes,
+ storage_nodes: storage_nodes.len(),
+ storage_nodes_ok,
+ partitions: partitions.len(),
+ partitions_quorum,
+ partitions_all_ok,
+ }
+ }
+
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
index 19e6772f..dfcb026a 100644
--- a/src/table/replication/mod.rs
+++ b/src/table/replication/mod.rs
@@ -1,10 +1,8 @@
mod parameters;
mod fullcopy;
-mod mode;
mod sharded;
pub use fullcopy::TableFullReplication;
-pub use mode::ReplicationMode;
pub use parameters::*;
pub use sharded::TableShardedReplication;