aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/layout.rs56
-rw-r--r--src/rpc/system.rs131
3 files changed, 140 insertions, 49 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index bed7f44a..73328993 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,11 +15,11 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
-garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
+git-version = "0.3.4"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b9c02c21..f517f36f 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::error::*;
use crate::ring::*;
@@ -100,6 +101,61 @@ impl ClusterLayout {
}
}
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.roles.merge(&self.staging);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+
+ if !self.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 68d94ea5..eb2f2e42 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -312,6 +312,83 @@ impl System {
);
}
+ // ---- Administrative operations (directly available and
+ // also available through RPC) ----
+
+ pub fn garage_version(&self) -> &'static str {
+ option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ))
+ }
+
+ pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
+ let node_status = self.node_status.read().unwrap();
+ let known_nodes = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
+ status: node_status
+ .get(&n.id.into())
+ .cloned()
+ .map(|(_, st)| st)
+ .unwrap_or(NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ }),
+ })
+ .collect::<Vec<_>>();
+ known_nodes
+ }
+
+ pub fn get_cluster_layout(&self) -> ClusterLayout {
+ self.ring.borrow().layout.clone()
+ }
+
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &ClusterLayout,
+ ) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
+ pub async fn connect(&self, node: &str) -> Result<(), Error> {
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
+ let mut errors = vec![];
+ for ip in addrs.iter() {
+ match self
+ .netapp
+ .clone()
+ .try_connect(*ip, pubkey)
+ .await
+ .err_context(CONNECT_ERROR_MESSAGE)
+ {
+ Ok(()) => return Ok(()),
+ Err(e) => {
+ errors.push((*ip, e));
+ }
+ }
+ }
+ return Err(Error::Message(format!(
+ "Could not connect to specified peers. Errors: {:?}",
+ errors
+ )));
+ }
+
// ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
@@ -384,32 +461,11 @@ impl System {
self.local_status.swap(Arc::new(new_si));
}
+ // --- RPC HANDLERS ---
+
async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
- Error::Message(format!(
- "Unable to parse or resolve node specification: {}",
- node
- ))
- })?;
- let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
- Ok(()) => return Ok(SystemRpc::Ok),
- Err(e) => {
- errors.push((*ip, e));
- }
- }
- }
- return Err(Error::Message(format!(
- "Could not connect to specified peers. Errors: {:?}",
- errors
- )));
+ self.connect(node).await?;
+ Ok(SystemRpc::Ok)
}
fn handle_pull_cluster_layout(&self) -> SystemRpc {
@@ -418,28 +474,7 @@ impl System {
}
fn handle_get_known_nodes(&self) -> SystemRpc {
- let node_status = self.node_status.read().unwrap();
- let known_nodes = self
- .fullmesh
- .get_peer_list()
- .iter()
- .map(|n| KnownNodeInfo {
- id: n.id.into(),
- addr: n.addr,
- is_up: n.is_up(),
- last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
- status: node_status
- .get(&n.id.into())
- .cloned()
- .map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
- })
- .collect::<Vec<_>>();
+ let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
}
@@ -476,7 +511,7 @@ impl System {
}
async fn handle_advertise_cluster_layout(
- self: Arc<Self>,
+ self: &Arc<Self>,
adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;