aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-06 17:14:09 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-10 13:25:10 +0200
commitdd54d0b2b13ecf1f95e60b107de9af20632335f6 (patch)
treeb90ae9da1fa9c1c7cad20f98f1eb77888b3f8b1f /src/rpc
parent01c4876fb447b70106e934ad09cf9b921f33682a (diff)
downloadgarage-dd54d0b2b13ecf1f95e60b107de9af20632335f6.tar.gz
garage-dd54d0b2b13ecf1f95e60b107de9af20632335f6.zip
Refactor code for apply/revert, implement Update/Apply/RevertLayout
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/layout.rs56
-rw-r--r--src/rpc/system.rs18
3 files changed, 74 insertions, 1 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 80a1975c..73328993 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -19,6 +19,7 @@ garage_util = { version = "0.7.0", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
+git-version = "0.3.4"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b9c02c21..f517f36f 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::error::*;
use crate::ring::*;
@@ -100,6 +101,61 @@ impl ClusterLayout {
}
}
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.roles.merge(&self.staging);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+
+ if !self.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging.clear();
+ self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 73c7b898..eb2f2e42 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -315,6 +315,14 @@ impl System {
// ---- Administrative operations (directly available and
// also available through RPC) ----
+ pub fn garage_version(&self) -> &'static str {
+ option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ))
+ }
+
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
let node_status = self.node_status.read().unwrap();
let known_nodes = self
@@ -345,6 +353,14 @@ impl System {
self.ring.borrow().layout.clone()
}
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &ClusterLayout,
+ ) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
Error::Message(format!(
@@ -495,7 +511,7 @@ impl System {
}
async fn handle_advertise_cluster_layout(
- self: Arc<Self>,
+ self: &Arc<Self>,
adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;