diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/admin/api_server.rs | 5 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 77 | ||||
-rw-r--r-- | src/api/helpers.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 13 |
4 files changed, 91 insertions, 12 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index dfaac015..d008f10a 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -126,10 +126,15 @@ impl ApiHandler for AdminApiServer { Endpoint::Metrics => self.handle_metrics(), Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, + Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, + Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + /* _ => Err(Error::NotImplemented(format!( "Admin endpoint {} not implemented yet", endpoint.name() ))), + */ } } } diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 0eb754ac..b8e9d96c 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -2,19 +2,24 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use serde::Serialize; +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; -use hyper::{Body, Response, StatusCode}; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::Error as GarageError; use garage_rpc::layout::*; -use garage_util::error::Error as GarageError; use garage_model::garage::Garage; use crate::error::*; +use crate::helpers::*; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { let res = GetClusterStatusResponse { + node: hex::encode(garage.system.id), + garage_version: garage.system.garage_version(), known_nodes: garage .system .get_known_nodes() @@ -72,6 +77,8 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse { #[derive(Serialize)] struct GetClusterStatusResponse { + node: String, + garage_version: &'static str, #[serde(rename = "knownNodes")] known_nodes: HashMap<String, KnownNodeResp>, layout: GetClusterLayoutResponse, @@ -92,3 +99,67 @@ struct KnownNodeResp { last_seen_secs_ago: Option<u64>, hostname: String, } + +pub async fn handle_update_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + + let mut layout = garage.system.get_cluster_layout(); + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for (node, role) in updates { + let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; + let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; + + layout + .staging + .merge(&roles.update_mutator(node, NodeRoleV(role))); + } + + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_apply_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.apply_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_revert_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.revert_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>; + +#[derive(Deserialize)] +struct ApplyRevertLayoutRequest { + version: u64, +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index a994b82f..5e249dae 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,6 @@ +use hyper::{Body, Request}; use idna::domain_to_unicode; +use serde::Deserialize; use garage_util::data::*; @@ -163,6 +165,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { None } +pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 4ecddeb9..a97bd7f2 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -13,6 +13,7 @@ use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::error::*; +use crate::helpers::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( @@ -20,9 +21,7 @@ pub async fn handle_insert_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let items: Vec<InsertBatchItem> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; let mut items2 = vec![]; for it in items { @@ -52,9 +51,7 @@ pub async fn handle_read_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<ReadBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries @@ -149,9 +146,7 @@ pub async fn handle_delete_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<DeleteBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries |