aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs29
-rw-r--r--src/api/admin/cluster.rs5
-rw-r--r--src/api/admin/router.rs7
-rw-r--r--src/model/garage.rs13
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/replication_mode.rs (renamed from src/table/replication/mode.rs)1
-rw-r--r--src/rpc/system.rs99
-rw-r--r--src/table/replication/mod.rs2
8 files changed, 147 insertions, 10 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 2896d058..2d325fb1 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -15,6 +15,7 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
+use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError;
use crate::generic_server::*;
@@ -76,6 +77,31 @@ impl AdminApiServer {
.body(Body::empty())?)
}
+ fn handle_health(&self) -> Result<Response<Body>, Error> {
+ let health = self.garage.system.health();
+
+ let (status, status_str) = match health.status {
+ ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
+ ClusterHealthStatus::Degraded => (
+ StatusCode::OK,
+ "Garage is operational but some storage nodes are unavailable",
+ ),
+ ClusterHealthStatus::Unavailable => (
+ StatusCode::SERVICE_UNAVAILABLE,
+ "Quorum is not available for some/all partitions, reads and writes will fail",
+ ),
+ };
+ let status_str = format!(
+ "{}\nConsult the full health check API endpoint at /v0/health for more details\n",
+ status_str
+ );
+
+ Ok(Response::builder()
+ .status(status)
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(Body::from(status_str))?)
+ }
+
fn handle_metrics(&self) -> Result<Response<Body>, Error> {
#[cfg(feature = "metrics")]
{
@@ -124,6 +150,7 @@ impl ApiHandler for AdminApiServer {
) -> Result<Response<Body>, Error> {
let expected_auth_header =
match endpoint.authorization_type() {
+ Authorization::None => None,
Authorization::MetricsToken => self.metrics_token.as_ref(),
Authorization::AdminToken => match &self.admin_token {
None => return Err(Error::forbidden(
@@ -147,8 +174,10 @@ impl ApiHandler for AdminApiServer {
match endpoint {
Endpoint::Options => self.handle_options(&req),
+ Endpoint::Health => self.handle_health(),
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
+ Endpoint::GetClusterHealth => handle_get_cluster_health(&self.garage).await,
Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
// Layout
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 706db727..182a4f6f 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -43,6 +43,11 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
Ok(json_ok_response(&res)?)
}
+pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ let health = garage.system.health();
+ Ok(json_ok_response(&health)?)
+}
+
pub async fn handle_connect_cluster_nodes(
garage: &Arc<Garage>,
req: Request<Body>,
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index 3eee8b67..3fa07b3c 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -6,6 +6,7 @@ use crate::admin::error::*;
use crate::router_macros::*;
pub enum Authorization {
+ None,
MetricsToken,
AdminToken,
}
@@ -16,8 +17,10 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
Options,
+ Health,
Metrics,
GetClusterStatus,
+ GetClusterHealth,
ConnectClusterNodes,
// Layout
GetClusterLayout,
@@ -88,8 +91,10 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options,
+ GET "/health" => Health,
GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus,
+ GET "/v0/health" => GetClusterHealth,
POST "/v0/connect" => ConnectClusterNodes,
// Layout endpoints
GET "/v0/layout" => GetClusterLayout,
@@ -130,6 +135,7 @@ impl Endpoint {
/// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization {
match self {
+ Self::Health => Authorization::None,
Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken,
}
@@ -137,6 +143,7 @@ impl Endpoint {
}
generateQueryParameters! {
+ "format" => format,
"id" => id,
"search" => search,
"globalAlias" => global_alias,
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 75012952..e34d034f 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
+use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::system::System;
use garage_block::manager::*;
-use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
@@ -34,6 +34,9 @@ pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
+ /// The replication mode of this cluster
+ pub replication_mode: ReplicationMode,
+
/// The local database
pub db: db::Db,
/// A background job runner
@@ -164,12 +167,7 @@ impl Garage {
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let system = System::new(
- network_key,
- background.clone(),
- replication_mode.replication_factor(),
- &config,
- )?;
+ let system = System::new(network_key, background.clone(), replication_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
@@ -258,6 +256,7 @@ impl Garage {
// -- done --
Ok(Arc::new(Self {
config,
+ replication_mode,
db,
background,
system,
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 92caf75d..86f63568 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -9,6 +9,7 @@ mod consul;
mod kubernetes;
pub mod layout;
+pub mod replication_mode;
pub mod ring;
pub mod system;
diff --git a/src/table/replication/mode.rs b/src/rpc/replication_mode.rs
index c6f84c45..e244e063 100644
--- a/src/table/replication/mode.rs
+++ b/src/rpc/replication_mode.rs
@@ -1,3 +1,4 @@
+#[derive(Clone, Copy)]
pub enum ReplicationMode {
None,
TwoWay,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index d6576f20..2c6f14fd 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -35,6 +35,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
+use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -102,6 +103,7 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ replication_mode: ReplicationMode,
replication_factor: usize,
/// The ring
@@ -136,6 +138,37 @@ pub struct KnownNodeInfo {
pub status: NodeStatus,
}
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub struct ClusterHealth {
+ /// The current health status of the cluster (see below)
+ pub status: ClusterHealthStatus,
+ /// Number of nodes already seen once in the cluster
+ pub known_nodes: usize,
+ /// Number of nodes currently connected
+ pub connected_nodes: usize,
+ /// Number of storage nodes declared in the current layout
+ pub storage_nodes: usize,
+ /// Number of storage nodes currently connected
+ pub storage_nodes_ok: usize,
+ /// Number of partitions in the layout
+ pub partitions: usize,
+ /// Number of partitions for which we have a quorum of connected nodes
+ pub partitions_quorum: usize,
+ /// Number of partitions for which all storage nodes are connected
+ pub partitions_all_ok: usize,
+}
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub enum ClusterHealthStatus {
+ /// All nodes are available
+ Healthy,
+ /// Some storage nodes are unavailable, but quorum is stil
+ /// achieved for all partitions
+ Degraded,
+ /// Quorum is not available for some partitions
+ Unavailable,
+}
+
pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub");
@@ -200,9 +233,11 @@ impl System {
pub fn new(
network_key: NetworkKey,
background: Arc<BackgroundRunner>,
- replication_factor: usize,
+ replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
+ let replication_factor = replication_mode.replication_factor();
+
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -324,6 +359,7 @@ impl System {
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
+ replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -429,6 +465,67 @@ impl System {
}
}
+ pub fn health(&self) -> ClusterHealth {
+ let ring: Arc<_> = self.ring.borrow().clone();
+ let quorum = self.replication_mode.write_quorum();
+ let replication_factor = self.replication_factor;
+
+ let nodes = self
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<Uuid, _>>();
+ let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+
+ let storage_nodes = ring
+ .layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .collect::<Vec<_>>();
+ let storage_nodes_ok = storage_nodes
+ .iter()
+ .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count();
+
+ let partitions = ring.partitions();
+ let partitions_n_up = partitions
+ .iter()
+ .map(|(_, h)| {
+ let pn = ring.get_nodes(h, ring.replication_factor);
+ pn.iter()
+ .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count()
+ })
+ .collect::<Vec<usize>>();
+ let partitions_all_ok = partitions_n_up
+ .iter()
+ .filter(|c| **c == replication_factor)
+ .count();
+ let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
+
+ let status =
+ if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ ClusterHealthStatus::Healthy
+ } else if partitions_quorum == partitions.len() {
+ ClusterHealthStatus::Degraded
+ } else {
+ ClusterHealthStatus::Unavailable
+ };
+
+ ClusterHealth {
+ status,
+ known_nodes: nodes.len(),
+ connected_nodes,
+ storage_nodes: storage_nodes.len(),
+ storage_nodes_ok,
+ partitions: partitions.len(),
+ partitions_quorum,
+ partitions_all_ok,
+ }
+ }
+
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
index 19e6772f..dfcb026a 100644
--- a/src/table/replication/mod.rs
+++ b/src/table/replication/mod.rs
@@ -1,10 +1,8 @@
mod parameters;
mod fullcopy;
-mod mode;
mod sharded;
pub use fullcopy::TableFullReplication;
-pub use mode::ReplicationMode;
pub use parameters::*;
pub use sharded::TableShardedReplication;