aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/history.rs51
-rw-r--r--src/rpc/layout/manager.rs140
-rw-r--r--src/rpc/layout/schema.rs23
-rw-r--r--src/rpc/system.rs15
4 files changed, 168 insertions, 61 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 9ae28887..357b9d62 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -18,10 +18,11 @@ impl LayoutHistory {
let mut ret = LayoutHistory {
versions: vec![version].into_boxed_slice().into(),
update_trackers: Default::default(),
+ trackers_hash: [0u8; 32].into(),
staging: Lww::raw(0, staging),
staging_hash: [0u8; 32].into(),
};
- ret.staging_hash = ret.calculate_staging_hash();
+ ret.update_hashes();
ret
}
@@ -29,6 +30,15 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
+ pub(crate) fn update_hashes(&mut self) {
+ self.trackers_hash = self.calculate_trackers_hash();
+ self.staging_hash = self.calculate_staging_hash();
+ }
+
+ pub(crate) fn calculate_trackers_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
+ }
+
pub(crate) fn calculate_staging_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
@@ -38,12 +48,6 @@ impl LayoutHistory {
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
let mut changed = false;
- // Merge staged layout changes
- if self.staging != other.staging {
- changed = true;
- }
- self.staging.merge(&other.staging);
-
// Add any new versions to history
for v2 in other.versions.iter() {
if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
@@ -63,7 +67,21 @@ impl LayoutHistory {
}
// Merge trackers
- self.update_trackers.merge(&other.update_trackers);
+ if self.update_trackers != other.update_trackers {
+ let c = self.update_trackers.merge(&other.update_trackers);
+ changed = changed || c;
+ }
+
+ // Merge staged layout changes
+ if self.staging != other.staging {
+ self.staging.merge(&other.staging);
+ changed = true;
+ }
+
+ // Update hashes if there are changes
+ if changed {
+ self.update_hashes();
+ }
changed
}
@@ -100,7 +118,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: self.staging.get().parameters.clone(),
roles: LwwMap::new(),
});
- self.staging_hash = self.calculate_staging_hash();
+ self.update_hashes();
Ok((self, msg))
}
@@ -110,20 +128,25 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: Lww::new(self.current().parameters.clone()),
roles: LwwMap::new(),
});
- self.staging_hash = self.calculate_staging_hash();
+ self.update_hashes();
Ok(self)
}
pub fn check(&self) -> Result<(), String> {
// Check that the hash of the staging data is correct
- let staging_hash = self.calculate_staging_hash();
- if staging_hash != self.staging_hash {
+ if self.trackers_hash != self.calculate_trackers_hash() {
+ return Err("trackers_hash is incorrect".into());
+ }
+ if self.staging_hash != self.calculate_staging_hash() {
return Err("staging_hash is incorrect".into());
}
- // TODO: anythign more ?
+ for version in self.versions.iter() {
+ version.check()?;
+ }
- self.current().check()
+ // TODO: anythign more ?
+ Ok(())
}
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index c021039b..a2502f58 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -19,6 +19,7 @@ use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
+ node_id: Uuid,
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
@@ -34,7 +35,7 @@ pub struct LayoutStatus {
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
- // (TODO) pub cluster_layout_trackers_hash: Hash,
+ pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
}
@@ -81,6 +82,7 @@ impl LayoutManager {
);
Ok(Arc::new(Self {
+ node_id: node_id.into(),
replication_factor,
persist_cluster_layout,
layout,
@@ -92,10 +94,15 @@ impl LayoutManager {
// ---- PUBLIC INTERFACE ----
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ self.layout.read().unwrap()
+ }
+
pub fn status(&self) -> LayoutStatus {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
+ cluster_layout_trackers_hash: layout.trackers_hash,
cluster_layout_staging_hash: layout.staging_hash,
}
}
@@ -108,11 +115,35 @@ impl LayoutManager {
Ok(())
}
- pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
- self.layout.read().unwrap()
+ // ---- INTERNALS ---
+
+ fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
+ let mut layout = self.layout.write().unwrap();
+ let prev_layout_check = layout.check().is_ok();
+
+ if !prev_layout_check || adv.check().is_ok() {
+ if layout.merge(adv) {
+ if prev_layout_check && layout.check().is_err() {
+ panic!("Merged two correct layouts and got an incorrect layout.");
+ }
+
+ return Some(layout.clone());
+ }
+ }
+ None
}
- pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
+ fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
+ let mut layout = self.layout.write().unwrap();
+ if layout.update_trackers != *adv {
+ if layout.update_trackers.merge(adv) {
+ return Some(layout.update_trackers.clone());
+ }
+ }
+ None
+ }
+
+ async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
@@ -123,15 +154,35 @@ impl LayoutManager {
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
+ if let Err(e) = self.handle_advertise_cluster_layout(&layout).await {
+ warn!("In pull_cluster_layout: {}", e);
+ }
}
}
- // ---- INTERNALS ---
+ async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) {
+ let resp = self
+ .rpc_helper
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullClusterLayoutTrackers,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp {
+ if let Err(e) = self
+ .handle_advertise_cluster_layout_trackers(&trackers)
+ .await
+ {
+ warn!("In pull_cluster_layout_trackers: {}", e);
+ }
+ }
+ }
- /// Save network configuration to disc
+ /// Save cluster layout data to disk
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
+ let layout = self.layout.read().unwrap().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
@@ -139,33 +190,41 @@ impl LayoutManager {
Ok(())
}
- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
- let mut layout = self.layout.write().unwrap();
- let prev_layout_check = layout.check().is_ok();
-
- if !prev_layout_check || adv.check().is_ok() {
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- panic!("Merged two correct layouts and got an incorrect layout.");
+ fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) {
+ tokio::spawn({
+ let this = self.clone();
+ async move {
+ if let Err(e) = this
+ .rpc_helper
+ .broadcast(
+ &this.system_endpoint,
+ rpc,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
}
-
- return Some(layout.clone());
}
- }
- None
+ });
}
// ---- RPC HANDLERS ----
- pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
- let local_status = self.status();
- if status.cluster_layout_version > local_status.cluster_layout_version
- || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) {
+ let local = self.status();
+ if remote.cluster_layout_version > local.cluster_layout_version
+ || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash
{
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout(from).await }
});
+ } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash {
+ tokio::spawn({
+ let this = self.clone();
+ async move { this.pull_cluster_layout_trackers(from).await }
+ });
}
}
@@ -174,6 +233,11 @@ impl LayoutManager {
SystemRpc::AdvertiseClusterLayout(layout)
}
+ pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc {
+ let layout = self.layout.read().unwrap();
+ SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone())
+ }
+
pub(crate) async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
adv: &LayoutHistory,
@@ -190,24 +254,20 @@ impl LayoutManager {
if let Some(new_layout) = self.merge_layout(adv) {
self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout));
+ self.save_cluster_layout().await?;
+ }
- tokio::spawn({
- let this = self.clone();
- async move {
- if let Err(e) = this
- .rpc_helper
- .broadcast(
- &this.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(new_layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- }
- });
+ Ok(SystemRpc::Ok)
+ }
+ pub(crate) async fn handle_advertise_cluster_layout_trackers(
+ self: &Arc<Self>,
+ trackers: &UpdateTrackers,
+ ) -> Result<SystemRpc, Error> {
+ if let Some(new_trackers) = self.merge_layout_trackers(trackers) {
+ self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers));
self.save_cluster_layout().await?;
}
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index d587a6cb..abae5bd8 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -233,6 +233,8 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
+ /// Hash of the update trackers
+ pub trackers_hash: Hash,
/// Staged changes for the next version
pub staging: Lww<LayoutStaging>,
@@ -289,10 +291,12 @@ mod v010 {
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
+ trackers_hash: [0u8; 32].into(),
staging: Lww::raw(previous.version, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
+ ret.trackers_hash = ret.calculate_trackers_hash();
ret
}
}
@@ -355,14 +359,20 @@ impl core::str::FromStr for ZoneRedundancy {
}
impl UpdateTracker {
- fn merge(&mut self, other: &UpdateTracker) {
+ fn merge(&mut self, other: &UpdateTracker) -> bool {
+ let mut changed = false;
for (k, v) in other.0.iter() {
if let Some(v_mut) = self.0.get_mut(k) {
- *v_mut = std::cmp::max(*v_mut, *v);
+ if *v > *v_mut {
+ *v_mut = *v;
+ changed = true;
+ }
} else {
self.0.insert(*k, *v);
+ changed = true;
}
}
+ changed
}
pub(crate) fn min(&self) -> u64 {
@@ -371,9 +381,10 @@ impl UpdateTracker {
}
impl UpdateTrackers {
- pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
- self.ack_map.merge(&other.ack_map);
- self.sync_map.merge(&other.sync_map);
- self.sync_ack_map.merge(&other.sync_ack_map);
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
+ let c1 = self.ack_map.merge(&other.ack_map);
+ let c2 = self.sync_map.merge(&other.sync_map);
+ let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
+ c1 || c2 || c3
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index cb3af3fe..6ce13d0d 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::manager::{LayoutManager, LayoutStatus};
-use crate::layout::*;
+use crate::layout::{self, LayoutHistory, NodeRoleV};
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -65,10 +65,15 @@ pub enum SystemRpc {
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
AdvertiseClusterLayout(LayoutHistory),
+ /// Ask other node its cluster layout update trackers.
+ PullClusterLayoutTrackers,
+ /// Advertisement of cluster layout update trackers.
+ AdvertiseClusterLayoutTrackers(layout::UpdateTrackers),
}
impl Rpc for SystemRpc {
@@ -727,6 +732,14 @@ impl EndpointHandler<SystemRpc> for System {
.handle_advertise_cluster_layout(adv)
.await
}
+ SystemRpc::PullClusterLayoutTrackers => {
+ Ok(self.layout_manager.handle_pull_cluster_layout_trackers())
+ }
+ SystemRpc::AdvertiseClusterLayoutTrackers(adv) => {
+ self.layout_manager
+ .handle_advertise_cluster_layout_trackers(adv)
+ .await
+ }
// ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),