aboutsummaryrefslogtreecommitdiff
path: root/src/api/admin/cluster.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/admin/cluster.rs')
-rw-r--r--src/api/admin/cluster.rs207
1 files changed, 157 insertions, 50 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 98bf2b5a..c8107b82 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,14 +1,13 @@
-use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
use garage_util::data::*;
-use garage_rpc::layout::*;
+use garage_rpc::layout;
use garage_model::garage::Garage;
@@ -26,26 +25,37 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
.system
.get_known_nodes()
.into_iter()
- .map(|i| {
- (
- hex::encode(i.id),
- KnownNodeResp {
- addr: i.addr,
- is_up: i.is_up,
- last_seen_secs_ago: i.last_seen_secs_ago,
- hostname: i.status.hostname,
- },
- )
+ .map(|i| KnownNodeResp {
+ id: hex::encode(i.id),
+ addr: i.addr,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ hostname: i.status.hostname,
})
.collect(),
- layout: get_cluster_layout(garage),
+ layout: format_cluster_layout(&garage.system.get_cluster_layout()),
};
Ok(json_ok_response(&res)?)
}
pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+ use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
+ let health = ClusterHealth {
+ status: match health.status {
+ ClusterHealthStatus::Healthy => "healthy",
+ ClusterHealthStatus::Degraded => "degraded",
+ ClusterHealthStatus::Unavailable => "unavailable",
+ },
+ known_nodes: health.known_nodes,
+ connected_nodes: health.connected_nodes,
+ storage_nodes: health.storage_nodes,
+ storage_nodes_ok: health.storage_nodes_ok,
+ partitions: health.partitions,
+ partitions_quorum: health.partitions_quorum,
+ partitions_all_ok: health.partitions_all_ok,
+ };
Ok(json_ok_response(&health)?)
}
@@ -74,33 +84,68 @@ pub async fn handle_connect_cluster_nodes(
}
pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
- let res = get_cluster_layout(garage);
+ let res = format_cluster_layout(&garage.system.get_cluster_layout());
Ok(json_ok_response(&res)?)
}
-fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
- let layout = garage.system.get_cluster_layout();
+fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse {
+ let roles = layout
+ .roles
+ .items()
+ .iter()
+ .filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
+ .map(|(k, v)| NodeRoleResp {
+ id: hex::encode(k),
+ zone: v.zone.clone(),
+ capacity: v.capacity,
+ tags: v.tags.clone(),
+ })
+ .collect::<Vec<_>>();
+
+ let staged_role_changes = layout
+ .staging_roles
+ .items()
+ .iter()
+ .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .map(|(k, _, v)| match &v.0 {
+ None => NodeRoleChange {
+ id: hex::encode(k),
+ action: NodeRoleChangeEnum::Remove { remove: true },
+ },
+ Some(r) => NodeRoleChange {
+ id: hex::encode(k),
+ action: NodeRoleChangeEnum::Update {
+ zone: r.zone.clone(),
+ capacity: r.capacity,
+ tags: r.tags.clone(),
+ },
+ },
+ })
+ .collect::<Vec<_>>();
GetClusterLayoutResponse {
version: layout.version,
- roles: layout
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| v.0.is_some())
- .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
- .collect(),
- staged_role_changes: layout
- .staging
- .items()
- .iter()
- .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
- .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
- .collect(),
+ roles,
+ staged_role_changes,
}
}
+// ----
+
+#[derive(Debug, Clone, Copy, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ClusterHealth {
+ status: &'static str,
+ known_nodes: usize,
+ connected_nodes: usize,
+ storage_nodes: usize,
+ storage_nodes_ok: usize,
+ partitions: usize,
+ partitions_quorum: usize,
+ partitions_all_ok: usize,
+}
+
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct GetClusterStatusResponse {
@@ -109,11 +154,19 @@ struct GetClusterStatusResponse {
garage_features: Option<&'static [&'static str]>,
rust_version: &'static str,
db_engine: String,
- known_nodes: HashMap<String, KnownNodeResp>,
+ known_nodes: Vec<KnownNodeResp>,
+ layout: GetClusterLayoutResponse,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct ApplyClusterLayoutResponse {
+ message: Vec<String>,
layout: GetClusterLayoutResponse,
}
#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
struct ConnectClusterNodesResponse {
success: bool,
error: Option<String>,
@@ -123,18 +176,31 @@ struct ConnectClusterNodesResponse {
#[serde(rename_all = "camelCase")]
struct GetClusterLayoutResponse {
version: u64,
- roles: HashMap<String, Option<NodeRole>>,
- staged_role_changes: HashMap<String, Option<NodeRole>>,
+ roles: Vec<NodeRoleResp>,
+ staged_role_changes: Vec<NodeRoleChange>,
}
#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct NodeRoleResp {
+ id: String,
+ zone: String,
+ capacity: Option<u64>,
+ tags: Vec<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
struct KnownNodeResp {
+ id: String,
addr: SocketAddr,
is_up: bool,
last_seen_secs_ago: Option<u64>,
hostname: String,
}
+// ---- update functions ----
+
pub async fn handle_update_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
@@ -144,22 +210,35 @@ pub async fn handle_update_cluster_layout(
let mut layout = garage.system.get_cluster_layout();
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
- for (node, role) in updates {
- let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ for change in updates {
+ let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+ let new_role = match change.action {
+ NodeRoleChangeEnum::Remove { remove: true } => None,
+ NodeRoleChangeEnum::Update {
+ zone,
+ capacity,
+ tags,
+ } => Some(layout::NodeRole {
+ zone,
+ capacity,
+ tags,
+ }),
+ _ => return Err(Error::bad_request("Invalid layout change")),
+ };
+
layout
- .staging
- .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ .staging_roles
+ .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
garage.system.update_cluster_layout(&layout).await?;
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ let res = format_cluster_layout(&layout);
+ Ok(json_ok_response(&res)?)
}
pub async fn handle_apply_cluster_layout(
@@ -169,12 +248,15 @@ pub async fn handle_apply_cluster_layout(
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
let layout = garage.system.get_cluster_layout();
- let layout = layout.apply_staged_changes(Some(param.version))?;
+ let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
+
garage.system.update_cluster_layout(&layout).await?;
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ let res = ApplyClusterLayoutResponse {
+ message: msg,
+ layout: format_cluster_layout(&layout),
+ };
+ Ok(json_ok_response(&res)?)
}
pub async fn handle_revert_cluster_layout(
@@ -187,14 +269,39 @@ pub async fn handle_revert_cluster_layout(
let layout = layout.revert_staged_changes(Some(param.version))?;
garage.system.update_cluster_layout(&layout).await?;
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ let res = format_cluster_layout(&layout);
+ Ok(json_ok_response(&res)?)
}
-type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+// ----
+
+type UpdateClusterLayoutRequest = Vec<NodeRoleChange>;
#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
struct ApplyRevertLayoutRequest {
version: u64,
}
+
+// ----
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct NodeRoleChange {
+ id: String,
+ #[serde(flatten)]
+ action: NodeRoleChangeEnum,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(untagged)]
+enum NodeRoleChangeEnum {
+ #[serde(rename_all = "camelCase")]
+ Remove { remove: bool },
+ #[serde(rename_all = "camelCase")]
+ Update {
+ zone: String,
+ capacity: Option<u64>,
+ tags: Vec<String>,
+ },
+}