diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 2 | ||||
-rw-r--r-- | src/rpc/layout.rs | 56 | ||||
-rw-r--r-- | src/rpc/system.rs | 131 |
3 files changed, 140 insertions, 49 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index bed7f44a..73328993 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,11 +15,11 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.7.0", path = "../util" } -garage_admin = { version = "0.7.0", path = "../admin" } arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" +git-version = "0.3.4" hex = "0.4" tracing = "0.1.30" rand = "0.8" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..f517f36f 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::error::*; use crate::ring::*; @@ -100,6 +101,61 @@ impl ClusterLayout { } } + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.roles.merge(&self.staging); + self.roles.retain(|(_, _, v)| v.0.is_some()); + + if !self.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + + pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + /// Returns a list of IDs of nodes that currently have /// a role in the cluster pub fn node_ids(&self) -> &[Uuid] { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68d94ea5..eb2f2e42 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -312,6 +312,83 @@ impl System { ); } + // ---- Administrative operations (directly available and + // also available through RPC) ---- + + pub fn garage_version(&self) -> &'static str { + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) + } + + pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { + let node_status = self.node_status.read().unwrap(); + let known_nodes = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), + status: node_status + .get(&n.id.into()) + .cloned() + .map(|(_, st)| st) + .unwrap_or(NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + }), + }) + .collect::<Vec<_>>(); + known_nodes + } + + pub fn get_cluster_layout(&self) -> ClusterLayout { + self.ring.borrow().layout.clone() + } + + pub async fn update_cluster_layout( + self: &Arc<Self>, + layout: &ClusterLayout, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub async fn connect(&self, node: &str) -> Result<(), Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { + Ok(()) => return Ok(()), + Err(e) => { + errors.push((*ip, e)); + } + } + } + return Err(Error::Message(format!( + "Could not connect to specified peers. Errors: {:?}", + errors + ))); + } + // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { @@ -384,32 +461,11 @@ impl System { self.local_status.swap(Arc::new(new_si)); } + // --- RPC HANDLERS --- + async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; - let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { - Ok(()) => return Ok(SystemRpc::Ok), - Err(e) => { - errors.push((*ip, e)); - } - } - } - return Err(Error::Message(format!( - "Could not connect to specified peers. Errors: {:?}", - errors - ))); + self.connect(node).await?; + Ok(SystemRpc::Ok) } fn handle_pull_cluster_layout(&self) -> SystemRpc { @@ -418,28 +474,7 @@ impl System { } fn handle_get_known_nodes(&self) -> SystemRpc { - let node_status = self.node_status.read().unwrap(); - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| KnownNodeInfo { - id: n.id.into(), - addr: n.addr, - is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), - status: node_status - .get(&n.id.into()) - .cloned() - .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), - }) - .collect::<Vec<_>>(); + let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) } @@ -476,7 +511,7 @@ impl System { } async fn handle_advertise_cluster_layout( - self: Arc<Self>, + self: &Arc<Self>, adv: &ClusterLayout, ) -> Result<SystemRpc, Error> { let update_ring = self.update_ring.lock().await; |