aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-06 17:14:09 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-10 13:25:10 +0200
commitdd54d0b2b13ecf1f95e60b107de9af20632335f6 (patch)
treeb90ae9da1fa9c1c7cad20f98f1eb77888b3f8b1f /src
parent01c4876fb447b70106e934ad09cf9b921f33682a (diff)
downloadgarage-dd54d0b2b13ecf1f95e60b107de9af20632335f6.tar.gz
garage-dd54d0b2b13ecf1f95e60b107de9af20632335f6.zip
Refactor code for apply/revert, implement Update/Apply/RevertLayout
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs5
-rw-r--r--src/api/admin/cluster.rs77
-rw-r--r--src/api/helpers.rs8
-rw-r--r--src/api/k2v/batch.rs13
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs6
-rw-r--r--src/garage/cli/layout.rs47
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/layout.rs56
-rw-r--r--src/rpc/system.rs18
-rw-r--r--src/util/crdt/lww_map.rs5
11 files changed, 175 insertions, 62 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index dfaac015..d008f10a 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -126,10 +126,15 @@ impl ApiHandler for AdminApiServer {
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
+ Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
+ Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ /*
_ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet",
endpoint.name()
))),
+ */
}
}
}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 0eb754ac..b8e9d96c 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -2,19 +2,24 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
-use serde::Serialize;
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
-use hyper::{Body, Response, StatusCode};
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
use garage_rpc::layout::*;
-use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
use crate::error::*;
+use crate::helpers::*;
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let res = GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage.system.garage_version(),
known_nodes: garage
.system
.get_known_nodes()
@@ -72,6 +77,8 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
#[derive(Serialize)]
struct GetClusterStatusResponse {
+ node: String,
+ garage_version: &'static str,
#[serde(rename = "knownNodes")]
known_nodes: HashMap<String, KnownNodeResp>,
layout: GetClusterLayoutResponse,
@@ -92,3 +99,67 @@ struct KnownNodeResp {
last_seen_secs_ago: Option<u64>,
hostname: String,
}
+
+pub async fn handle_update_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+
+ let mut layout = garage.system.get_cluster_layout();
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for (node, role) in updates {
+ let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(node, NodeRoleV(role)));
+ }
+
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_apply_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.apply_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
+
+ let layout = garage.system.get_cluster_layout();
+ let layout = layout.revert_staged_changes(Some(param.version))?;
+ garage.system.update_cluster_layout(&layout).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
+
+#[derive(Deserialize)]
+struct ApplyRevertLayoutRequest {
+ version: u64,
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index a994b82f..5e249dae 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,4 +1,6 @@
+use hyper::{Body, Request};
use idna::domain_to_unicode;
+use serde::Deserialize;
use garage_util::data::*;
@@ -163,6 +165,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None
}
+pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 4ecddeb9..a97bd7f2 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -13,6 +13,7 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
use crate::error::*;
+use crate::helpers::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
@@ -20,9 +21,7 @@ pub async fn handle_insert_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let items: Vec<InsertBatchItem> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -52,9 +51,7 @@ pub async fn handle_read_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<ReadBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -149,9 +146,7 @@ pub async fn handle_delete_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<DeleteBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 59566358..902f67f8 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -29,7 +29,6 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
-git-version = "0.3.4"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index af0c3f22..1a58a613 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -696,11 +696,7 @@ impl AdminRpcHandler {
writeln!(
&mut ret,
"\nGarage version: {}",
- option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
- prefix = "git:",
- cargo_prefix = "cargo:",
- fallback = "unknown"
- ))
+ self.garage.system.garage_version(),
)
.unwrap();
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 88941d78..cdd3869b 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,5 +1,4 @@
use garage_util::crdt::Crdt;
-use garage_util::data::*;
use garage_util::error::*;
use garage_rpc::layout::*;
@@ -211,31 +210,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match apply_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.roles.merge(&layout.staging);
-
- if !layout.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
-
- layout.version += 1;
+ let layout = layout.apply_staged_changes(apply_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -250,25 +227,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match revert_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.version += 1;
+ let layout = layout.revert_staged_changes(revert_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 80a1975c..73328993 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -19,6 +19,7 @@ garage_util = { version = "0.7.0", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
+git-version = "0.3.4"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b9c02c21..f517f36f 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::error::*;
use crate::ring::*;
@@ -100,6 +101,61 @@ impl ClusterLayout {
}
}
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.roles.merge(&self.staging);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+
+ if !self.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 73c7b898..eb2f2e42 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -315,6 +315,14 @@ impl System {
// ---- Administrative operations (directly available and
// also available through RPC) ----
+ pub fn garage_version(&self) -> &'static str {
+ option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ))
+ }
+
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
let node_status = self.node_status.read().unwrap();
let known_nodes = self
@@ -345,6 +353,14 @@ impl System {
self.ring.borrow().layout.clone()
}
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &ClusterLayout,
+ ) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
Error::Message(format!(
@@ -495,7 +511,7 @@ impl System {
}
async fn handle_advertise_cluster_layout(
- self: Arc<Self>,
+ self: &Arc<Self>,
adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index c155c3a8..91d24c7f 100644
--- a/src/util/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -140,6 +140,11 @@ where
self.vals.clear();
}
+ /// Retain only values that match a certain predicate
+ pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) {
+ self.vals.retain(pred);
+ }
+
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {