aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/admin/api_server.rs5
-rw-r--r--src/api/admin/cluster.rs77
-rw-r--r--src/api/helpers.rs8
-rw-r--r--src/api/k2v/batch.rs13
4 files changed, 91 insertions, 12 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index dfaac015..d008f10a 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -126,10 +126,15 @@ impl ApiHandler for AdminApiServer {
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
+ Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
+ Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ /*
_ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet",
endpoint.name()
))),
+ */
}
}
}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 0eb754ac..b8e9d96c 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -2,19 +2,24 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
-use serde::Serialize;
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
-use hyper::{Body, Response, StatusCode};
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
use garage_rpc::layout::*;
-use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
use crate::error::*;
+use crate::helpers::*;
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let res = GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage.system.garage_version(),
known_nodes: garage
.system
.get_known_nodes()
@@ -72,6 +77,8 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
#[derive(Serialize)]
struct GetClusterStatusResponse {
+ node: String,
+ garage_version: &'static str,
#[serde(rename = "knownNodes")]
known_nodes: HashMap<String, KnownNodeResp>,
layout: GetClusterLayoutResponse,
@@ -92,3 +99,67 @@ struct KnownNodeResp {
last_seen_secs_ago: Option<u64>,
hostname: String,
}
+
+pub async fn handle_update_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+
+ let mut layout = garage.system.get_cluster_layout();
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for (node, role) in updates {
+ let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ }
+
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_apply_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.apply_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.revert_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+
+#[derive(Deserialize)]
+struct ApplyRevertLayoutRequest {
+ version: u64,
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index a994b82f..5e249dae 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,4 +1,6 @@
+use hyper::{Body, Request};
use idna::domain_to_unicode;
+use serde::Deserialize;
use garage_util::data::*;
@@ -163,6 +165,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None
}
+pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 4ecddeb9..a97bd7f2 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -13,6 +13,7 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
use crate::error::*;
+use crate::helpers::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
@@ -20,9 +21,7 @@ pub async fn handle_insert_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let items: Vec<InsertBatchItem> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -52,9 +51,7 @@ pub async fn handle_read_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<ReadBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -149,9 +146,7 @@ pub async fn handle_delete_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<DeleteBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries