aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/helper.rs27
-rw-r--r--src/rpc/layout/history.rs1
-rw-r--r--src/rpc/layout/manager.rs36
-rw-r--r--src/rpc/layout/mod.rs2
4 files changed, 36 insertions, 30 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index ed3da498..0d746ea3 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -2,10 +2,24 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
+use serde::{Deserialize, Serialize};
+
use garage_util::data::*;
use super::schema::*;
+#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
+pub struct LayoutDigest {
+ /// Cluster layout version
+ pub current_version: u64,
+ /// Number of active layout versions
+ pub active_versions: usize,
+ /// Hash of cluster layout update trackers
+ pub trackers_hash: Hash,
+ /// Hash of cluster layout staging data
+ pub staging_hash: Hash,
+}
+
pub struct LayoutHelper {
layout: Option<LayoutHistory>,
@@ -16,8 +30,8 @@ pub struct LayoutHelper {
all_nodes: Vec<Uuid>,
all_nongateway_nodes: Vec<Uuid>,
- pub(crate) trackers_hash: Hash,
- pub(crate) staging_hash: Hash,
+ trackers_hash: Hash,
+ staging_hash: Hash,
// ack lock: counts in-progress write operations for each
// layout version ; we don't increase the ack update tracker
@@ -152,6 +166,15 @@ impl LayoutHelper {
self.staging_hash
}
+ pub fn digest(&self) -> LayoutDigest {
+ LayoutDigest {
+ current_version: self.current().version,
+ active_versions: self.versions.len(),
+ trackers_hash: self.trackers_hash,
+ staging_hash: self.staging_hash,
+ }
+ }
+
// ------------------ helpers for update tracking ---------------
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 0a139549..653d2a48 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -5,7 +5,6 @@ use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
-use super::schema::*;
use super::*;
impl LayoutHistory {
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 85d94ffa..c65831a2 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -2,8 +2,6 @@ use std::collections::HashMap;
use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
-use serde::{Deserialize, Serialize};
-
use tokio::sync::Notify;
use netapp::endpoint::Endpoint;
@@ -33,16 +31,6 @@ pub struct LayoutManager {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
-#[derive(Debug, Clone, Serialize, Deserialize, Default)]
-pub struct LayoutStatus {
- /// Cluster layout version
- pub cluster_layout_version: u64,
- /// Hash of cluster layout update trackers
- pub cluster_layout_trackers_hash: Hash,
- /// Hash of cluster layout staging data
- pub cluster_layout_staging_hash: Hash,
-}
-
impl LayoutManager {
pub fn new(
config: &Config,
@@ -105,15 +93,6 @@ impl LayoutManager {
self.layout.read().unwrap()
}
- pub fn status(&self) -> LayoutStatus {
- let layout = self.layout();
- LayoutStatus {
- cluster_layout_version: layout.current().version,
- cluster_layout_trackers_hash: layout.trackers_hash(),
- cluster_layout_staging_hash: layout.staging_hash(),
- }
- }
-
pub async fn update_cluster_layout(
self: &Arc<Self>,
layout: &LayoutHistory,
@@ -173,6 +152,7 @@ impl LayoutManager {
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
let mut layout = self.layout.write().unwrap();
+ let prev_digest = layout.digest();
let prev_layout_check = layout.check().is_ok();
if !prev_layout_check || adv.check().is_ok() {
@@ -181,6 +161,7 @@ impl LayoutManager {
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
}
+ assert!(layout.digest() != prev_digest);
return Some(layout.clone());
}
}
@@ -190,10 +171,12 @@ impl LayoutManager {
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
+ let prev_digest = layout.digest();
if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers(self.node_id);
+ assert!(layout.digest() != prev_digest);
return Some(layout.update_trackers.clone());
}
}
@@ -269,16 +252,17 @@ impl LayoutManager {
// ---- RPC HANDLERS ----
- pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) {
- let local = self.status();
- if remote.cluster_layout_version > local.cluster_layout_version
- || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) {
+ let local = self.layout().digest();
+ if remote.current_version > local.current_version
+ || remote.active_versions != local.active_versions
+ || remote.staging_hash != local.staging_hash
{
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout(from).await }
});
- } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash {
+ } else if remote.trackers_hash != local.trackers_hash {
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout_trackers(from).await }
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 91151ab4..eb127fda 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -11,7 +11,7 @@ pub mod manager;
// ---- re-exports ----
-pub use helper::LayoutHelper;
+pub use helper::{LayoutDigest, LayoutHelper};
pub use manager::WriteLock;
pub use schema::*;
pub use version::*;