aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
committerAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
commit73ed9c74039448c69ebe382e361acf3ecbfef70b (patch)
tree7fb21a559e53557d5dea5efd2b7dafe9f9751367 /src/rpc
parent582b0761790b7958a3ba10c4b549b466997d2dcd (diff)
parent1d5bdc17a46648eb3494ff629d0d360d0217c1e2 (diff)
downloadgarage-73ed9c74039448c69ebe382e361acf3ecbfef70b.tar.gz
garage-73ed9c74039448c69ebe382e361acf3ecbfef70b.zip
Merge pull request 'Refactor how things are migrated' (#461) from format-migration into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/461
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/layout.rs13
-rw-r--r--src/rpc/system.rs18
3 files changed, 21 insertions, 11 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index b87374ad..e9a0929a 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -25,7 +25,6 @@ rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 2fd5acfc..1030e3a6 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use crate::ring::*;
@@ -35,6 +36,8 @@ pub struct ClusterLayout {
pub staging_hash: Hash,
}
+impl garage_util::migrate::InitialFormat for ClusterLayout {}
+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
@@ -68,7 +71,7 @@ impl NodeRole {
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
let empty_lwwmap = LwwMap::new();
- let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
+ let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]);
ClusterLayout {
version: 0,
@@ -90,7 +93,7 @@ impl ClusterLayout {
Ordering::Equal => {
self.staging.merge(&other.staging);
- let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
@@ -125,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -149,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -178,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// returns true if consistent, false if error
pub fn check(&self) -> bool {
// Check that the hash of the staging data is correct
- let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
if staging_hash != self.staging_hash {
return false;
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 8f753b7f..22e23e55 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -73,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -721,7 +725,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -801,12 +805,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {