use std::net::SocketAddr; use std::sync::Arc; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; use garage_util::data::*; use garage_rpc::layout; use garage_model::garage::Garage; use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; pub async fn handle_get_cluster_status(garage: &Arc) -> Result, Error> { let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), garage_features: garage_util::version::garage_features(), rust_version: garage_util::version::rust_version(), db_engine: garage.db.engine(), known_nodes: garage .system .get_known_nodes() .into_iter() .map(|i| KnownNodeResp { id: hex::encode(i.id), addr: i.addr, is_up: i.is_up, last_seen_secs_ago: i.last_seen_secs_ago, hostname: i.status.hostname, }) .collect(), layout: get_cluster_layout(garage), }; Ok(json_ok_response(&res)?) } pub async fn handle_get_cluster_health(garage: &Arc) -> Result, Error> { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = ClusterHealth { status: match health.status { ClusterHealthStatus::Healthy => "healthy", ClusterHealthStatus::Degraded => "degraded", ClusterHealthStatus::Unavailable => "unavailable", }, known_nodes: health.known_nodes, connected_nodes: health.connected_nodes, storage_nodes: health.storage_nodes, storage_nodes_ok: health.storage_nodes_ok, partitions: health.partitions, partitions_quorum: health.partitions_quorum, partitions_all_ok: health.partitions_all_ok, }; Ok(json_ok_response(&health)?) } pub async fn handle_connect_cluster_nodes( garage: &Arc, req: Request, ) -> Result, Error> { let req = parse_json_body::>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) .await .into_iter() .map(|r| match r { Ok(()) => ConnectClusterNodesResponse { success: true, error: None, }, Err(e) => ConnectClusterNodesResponse { success: false, error: Some(format!("{}", e)), }, }) .collect::>(); Ok(json_ok_response(&res)?) } pub async fn handle_get_cluster_layout(garage: &Arc) -> Result, Error> { let res = get_cluster_layout(garage); Ok(json_ok_response(&res)?) } fn get_cluster_layout(garage: &Arc) -> GetClusterLayoutResponse { let layout = garage.system.get_cluster_layout(); let roles = layout .roles .items() .iter() .filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x))) .map(|(k, v)| NodeRoleResp { id: hex::encode(k), zone: v.zone.clone(), capacity: v.capacity, tags: v.tags.clone(), }) .collect::>(); let staged_role_changes = layout .staging_roles .items() .iter() .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) .map(|(k, _, v)| match &v.0 { None => NodeRoleChange { id: hex::encode(k), remove: true, ..Default::default() }, Some(r) => NodeRoleChange { id: hex::encode(k), remove: false, zone: Some(r.zone.clone()), capacity: r.capacity, tags: Some(r.tags.clone()), }, }) .collect::>(); GetClusterLayoutResponse { version: layout.version, roles, staged_role_changes, } } // ---- #[derive(Debug, Clone, Copy, Serialize)] #[serde(rename_all = "camelCase")] pub struct ClusterHealth { pub status: &'static str, pub known_nodes: usize, pub connected_nodes: usize, pub storage_nodes: usize, pub storage_nodes_ok: usize, pub partitions: usize, pub partitions_quorum: usize, pub partitions_all_ok: usize, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct GetClusterStatusResponse { node: String, garage_version: &'static str, garage_features: Option<&'static [&'static str]>, rust_version: &'static str, db_engine: String, known_nodes: Vec, layout: GetClusterLayoutResponse, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct ConnectClusterNodesResponse { success: bool, error: Option, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct GetClusterLayoutResponse { version: u64, roles: Vec, staged_role_changes: Vec, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NodeRoleResp { id: String, zone: String, capacity: Option, tags: Vec, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct KnownNodeResp { id: String, addr: SocketAddr, is_up: bool, last_seen_secs_ago: Option, hostname: String, } // ---- update functions ---- pub async fn handle_update_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let updates = parse_json_body::(req).await?; let mut layout = garage.system.get_cluster_layout(); let mut roles = layout.roles.clone(); roles.merge(&layout.staging_roles); for change in updates { let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; let new_role = match (change.remove, change.zone, change.capacity, change.tags) { (true, None, None, None) => None, (false, Some(zone), capacity, Some(tags)) => Some(layout::NodeRole { zone, capacity, tags, }), _ => return Err(Error::bad_request("Invalid layout change")), }; layout .staging_roles .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty())?) } pub async fn handle_apply_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") .body(Body::from(msg.join("\n")))?) } pub async fn handle_revert_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let layout = layout.revert_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty())?) } // ---- type UpdateClusterLayoutRequest = Vec; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ApplyRevertLayoutRequest { version: u64, } // ---- #[derive(Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] struct NodeRoleChange { id: String, #[serde(default)] remove: bool, #[serde(default)] zone: Option, #[serde(default)] capacity: Option, #[serde(default)] tags: Option>, }