use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_rpc::layout::*; use garage_model::garage::Garage; use crate::error::*; use crate::helpers::*; pub async fn handle_get_cluster_status(garage: &Arc) -> Result, Error> { let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage.system.garage_version(), known_nodes: garage .system .get_known_nodes() .into_iter() .map(|i| { ( hex::encode(i.id), KnownNodeResp { addr: i.addr, is_up: i.is_up, last_seen_secs_ago: i.last_seen_secs_ago, hostname: i.status.hostname, }, ) }) .collect(), layout: get_cluster_layout(garage), }; let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::from(resp_json))?) } pub async fn handle_get_cluster_layout(garage: &Arc) -> Result, Error> { let res = get_cluster_layout(garage); let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::from(resp_json))?) } fn get_cluster_layout(garage: &Arc) -> GetClusterLayoutResponse { let layout = garage.system.get_cluster_layout(); GetClusterLayoutResponse { version: layout.version, roles: layout .roles .items() .iter() .filter(|(_, _, v)| v.0.is_some()) .map(|(k, _, v)| (hex::encode(k), v.0.clone())) .collect(), staged_role_changes: layout .staging .items() .iter() .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) .map(|(k, _, v)| (hex::encode(k), v.0.clone())) .collect(), } } #[derive(Serialize)] struct GetClusterStatusResponse { node: String, garage_version: &'static str, #[serde(rename = "knownNodes")] known_nodes: HashMap, layout: GetClusterLayoutResponse, } #[derive(Serialize)] struct GetClusterLayoutResponse { version: u64, roles: HashMap>, #[serde(rename = "stagedRoleChanges")] staged_role_changes: HashMap>, } #[derive(Serialize)] struct KnownNodeResp { addr: SocketAddr, is_up: bool, last_seen_secs_ago: Option, hostname: String, } pub async fn handle_update_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let updates = parse_json_body::(req).await?; let mut layout = garage.system.get_cluster_layout(); let mut roles = layout.roles.clone(); roles.merge(&layout.staging); for (node, role) in updates { let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; layout .staging .merge(&roles.update_mutator(node, NodeRoleV(role))); } garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) } pub async fn handle_apply_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let layout = layout.apply_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) } pub async fn handle_revert_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let layout = layout.revert_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) } type UpdateClusterLayoutRequest = HashMap>; #[derive(Deserialize)] struct ApplyRevertLayoutRequest { version: u64, }