diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/admin/api_server.rs | 5 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 77 | ||||
-rw-r--r-- | src/api/helpers.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 13 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/admin.rs | 6 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 47 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/layout.rs | 56 | ||||
-rw-r--r-- | src/rpc/system.rs | 18 | ||||
-rw-r--r-- | src/util/crdt/lww_map.rs | 5 |
11 files changed, 175 insertions, 62 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index dfaac015..d008f10a 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -126,10 +126,15 @@ impl ApiHandler for AdminApiServer { Endpoint::Metrics => self.handle_metrics(), Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, + Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, + Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + /* _ => Err(Error::NotImplemented(format!( "Admin endpoint {} not implemented yet", endpoint.name() ))), + */ } } } diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 0eb754ac..b8e9d96c 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -2,19 +2,24 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use serde::Serialize; +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; -use hyper::{Body, Response, StatusCode}; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::Error as GarageError; use garage_rpc::layout::*; -use garage_util::error::Error as GarageError; use garage_model::garage::Garage; use crate::error::*; +use crate::helpers::*; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { let res = GetClusterStatusResponse { + node: hex::encode(garage.system.id), + garage_version: garage.system.garage_version(), known_nodes: garage .system .get_known_nodes() @@ -72,6 +77,8 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse { #[derive(Serialize)] struct GetClusterStatusResponse { + node: String, + garage_version: &'static str, #[serde(rename = "knownNodes")] known_nodes: HashMap<String, KnownNodeResp>, layout: GetClusterLayoutResponse, @@ -92,3 +99,67 @@ struct KnownNodeResp { last_seen_secs_ago: Option<u64>, hostname: String, } + +pub async fn handle_update_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + + let mut layout = garage.system.get_cluster_layout(); + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for (node, role) in updates { + let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; + let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; + + layout + .staging + .merge(&roles.update_mutator(node, NodeRoleV(role))); + } + + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_apply_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.apply_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_revert_cluster_layout( + garage: &Arc<Garage>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + + let layout = garage.system.get_cluster_layout(); + let layout = layout.revert_staged_changes(Some(param.version))?; + garage.system.update_cluster_layout(&layout).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>; + +#[derive(Deserialize)] +struct ApplyRevertLayoutRequest { + version: u64, +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index a994b82f..5e249dae 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,6 @@ +use hyper::{Body, Request}; use idna::domain_to_unicode; +use serde::Deserialize; use garage_util::data::*; @@ -163,6 +165,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { None } +pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 4ecddeb9..a97bd7f2 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -13,6 +13,7 @@ use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::error::*; +use crate::helpers::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( @@ -20,9 +21,7 @@ pub async fn handle_insert_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let items: Vec<InsertBatchItem> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; let mut items2 = vec![]; for it in items { @@ -52,9 +51,7 @@ pub async fn handle_read_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<ReadBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries @@ -149,9 +146,7 @@ pub async fn handle_delete_batch( bucket_id: Uuid, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - let queries: Vec<DeleteBatchQuery> = - serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; let resp_results = futures::future::join_all( queries diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 59566358..902f67f8 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -29,7 +29,6 @@ garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } bytes = "1.0" -git-version = "0.3.4" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index af0c3f22..1a58a613 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -696,11 +696,7 @@ impl AdminRpcHandler { writeln!( &mut ret, "\nGarage version: {}", - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) + self.garage.system.garage_version(), ) .unwrap(); diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 88941d78..cdd3869b 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,5 +1,4 @@ use garage_util::crdt::Crdt; -use garage_util::data::*; use garage_util::error::*; use garage_rpc::layout::*; @@ -211,31 +210,9 @@ pub async fn cmd_apply_layout( rpc_host: NodeID, apply_opt: ApplyLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match apply_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - - layout.version += 1; + let layout = layout.apply_staged_changes(apply_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -250,25 +227,9 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match revert_opt.version { - None => { - println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); - println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); - return Err(Error::Message("--version flag is missing".into())); - } - Some(v) => { - if v != layout.version + 1 { - return Err(Error::Message("Invalid value of --version flag".into())); - } - } - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + let layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.version += 1; + let layout = layout.revert_staged_changes(revert_opt.version)?; send_layout(rpc_cli, rpc_host, layout).await?; diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 80a1975c..73328993 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -19,6 +19,7 @@ garage_util = { version = "0.7.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" +git-version = "0.3.4" hex = "0.4" tracing = "0.1.30" rand = "0.8" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..f517f36f 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::error::*; use crate::ring::*; @@ -100,6 +101,61 @@ impl ClusterLayout { } } + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.roles.merge(&self.staging); + self.roles.retain(|(_, _, v)| v.0.is_some()); + + if !self.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + + pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + /// Returns a list of IDs of nodes that currently have /// a role in the cluster pub fn node_ids(&self) -> &[Uuid] { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 73c7b898..eb2f2e42 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -315,6 +315,14 @@ impl System { // ---- Administrative operations (directly available and // also available through RPC) ---- + pub fn garage_version(&self) -> &'static str { + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) + } + pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { let node_status = self.node_status.read().unwrap(); let known_nodes = self @@ -345,6 +353,14 @@ impl System { self.ring.borrow().layout.clone() } + pub async fn update_cluster_layout( + self: &Arc<Self>, + layout: &ClusterLayout, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + pub async fn connect(&self, node: &str) -> Result<(), Error> { let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { Error::Message(format!( @@ -495,7 +511,7 @@ impl System { } async fn handle_advertise_cluster_layout( - self: Arc<Self>, + self: &Arc<Self>, adv: &ClusterLayout, ) -> Result<SystemRpc, Error> { let update_ring = self.update_ring.lock().await; diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs index c155c3a8..91d24c7f 100644 --- a/src/util/crdt/lww_map.rs +++ b/src/util/crdt/lww_map.rs @@ -140,6 +140,11 @@ where self.vals.clear(); } + /// Retain only values that match a certain predicate + pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) { + self.vals.retain(pred); + } + /// Get a reference to the value assigned to a key pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { |