diff options
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r-- | src/rpc/layout/manager.rs | 36 |
1 files changed, 10 insertions, 26 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 85d94ffa..c65831a2 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -2,8 +2,6 @@ use std::collections::HashMap; use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; -use serde::{Deserialize, Serialize}; - use tokio::sync::Notify; use netapp::endpoint::Endpoint; @@ -33,16 +31,6 @@ pub struct LayoutManager { system_endpoint: Arc<Endpoint<SystemRpc, System>>, } -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct LayoutStatus { - /// Cluster layout version - pub cluster_layout_version: u64, - /// Hash of cluster layout update trackers - pub cluster_layout_trackers_hash: Hash, - /// Hash of cluster layout staging data - pub cluster_layout_staging_hash: Hash, -} - impl LayoutManager { pub fn new( config: &Config, @@ -105,15 +93,6 @@ impl LayoutManager { self.layout.read().unwrap() } - pub fn status(&self) -> LayoutStatus { - let layout = self.layout(); - LayoutStatus { - cluster_layout_version: layout.current().version, - cluster_layout_trackers_hash: layout.trackers_hash(), - cluster_layout_staging_hash: layout.staging_hash(), - } - } - pub async fn update_cluster_layout( self: &Arc<Self>, layout: &LayoutHistory, @@ -173,6 +152,7 @@ impl LayoutManager { fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); let prev_layout_check = layout.check().is_ok(); if !prev_layout_check || adv.check().is_ok() { @@ -181,6 +161,7 @@ impl LayoutManager { if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } + assert!(layout.digest() != prev_digest); return Some(layout.clone()); } } @@ -190,10 +171,12 @@ impl LayoutManager { fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> { let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); if layout.update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers(self.node_id); + assert!(layout.digest() != prev_digest); return Some(layout.update_trackers.clone()); } } @@ -269,16 +252,17 @@ impl LayoutManager { // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) { - let local = self.status(); - if remote.cluster_layout_version > local.cluster_layout_version - || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash + pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) { + let local = self.layout().digest(); + if remote.current_version > local.current_version + || remote.active_versions != local.active_versions + || remote.staging_hash != local.staging_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout(from).await } }); - } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash { + } else if remote.trackers_hash != local.trackers_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout_trackers(from).await } |