diff options
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r-- | src/rpc/layout/manager.rs | 140 |
1 files changed, 100 insertions, 40 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c021039b..a2502f58 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -19,6 +19,7 @@ use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { + node_id: Uuid, replication_factor: usize, persist_cluster_layout: Persister<LayoutHistory>, @@ -34,7 +35,7 @@ pub struct LayoutStatus { /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout update trackers - // (TODO) pub cluster_layout_trackers_hash: Hash, + pub cluster_layout_trackers_hash: Hash, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, } @@ -81,6 +82,7 @@ impl LayoutManager { ); Ok(Arc::new(Self { + node_id: node_id.into(), replication_factor, persist_cluster_layout, layout, @@ -92,10 +94,15 @@ impl LayoutManager { // ---- PUBLIC INTERFACE ---- + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + self.layout.read().unwrap() + } + pub fn status(&self) -> LayoutStatus { let layout = self.layout(); LayoutStatus { cluster_layout_version: layout.current().version, + cluster_layout_trackers_hash: layout.trackers_hash, cluster_layout_staging_hash: layout.staging_hash, } } @@ -108,11 +115,35 @@ impl LayoutManager { Ok(()) } - pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { - self.layout.read().unwrap() + // ---- INTERNALS --- + + fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { + let mut layout = self.layout.write().unwrap(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + + return Some(layout.clone()); + } + } + None } - pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) { + fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> { + let mut layout = self.layout.write().unwrap(); + if layout.update_trackers != *adv { + if layout.update_trackers.merge(adv) { + return Some(layout.update_trackers.clone()); + } + } + None + } + + async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) { let resp = self .rpc_helper .call( @@ -123,15 +154,35 @@ impl LayoutManager { ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { - let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; + if let Err(e) = self.handle_advertise_cluster_layout(&layout).await { + warn!("In pull_cluster_layout: {}", e); + } } } - // ---- INTERNALS --- + async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayoutTrackers, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp { + if let Err(e) = self + .handle_advertise_cluster_layout_trackers(&trackers) + .await + { + warn!("In pull_cluster_layout_trackers: {}", e); + } + } + } - /// Save network configuration to disc + /// Save cluster layout data to disk async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning + let layout = self.layout.read().unwrap().clone(); self.persist_cluster_layout .save_async(&layout) .await @@ -139,33 +190,41 @@ impl LayoutManager { Ok(()) } - fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { - let mut layout = self.layout.write().unwrap(); - let prev_layout_check = layout.check().is_ok(); - - if !prev_layout_check || adv.check().is_ok() { - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - panic!("Merged two correct layouts and got an incorrect layout."); + fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) { + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + rpc, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); } - - return Some(layout.clone()); } - } - None + }); } // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) { - let local_status = self.status(); - if status.cluster_layout_version > local_status.cluster_layout_version - || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash + pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) { + let local = self.status(); + if remote.cluster_layout_version > local.cluster_layout_version + || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash { tokio::spawn({ let this = self.clone(); async move { this.pull_cluster_layout(from).await } }); + } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout_trackers(from).await } + }); } } @@ -174,6 +233,11 @@ impl LayoutManager { SystemRpc::AdvertiseClusterLayout(layout) } + pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc { + let layout = self.layout.read().unwrap(); + SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone()) + } + pub(crate) async fn handle_advertise_cluster_layout( self: &Arc<Self>, adv: &LayoutHistory, @@ -190,24 +254,20 @@ impl LayoutManager { if let Some(new_layout) = self.merge_layout(adv) { self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); + self.save_cluster_layout().await?; + } - tokio::spawn({ - let this = self.clone(); - async move { - if let Err(e) = this - .rpc_helper - .broadcast( - &this.system_endpoint, - SystemRpc::AdvertiseClusterLayout(new_layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } - } - }); + Ok(SystemRpc::Ok) + } + pub(crate) async fn handle_advertise_cluster_layout_trackers( + self: &Arc<Self>, + trackers: &UpdateTrackers, + ) -> Result<SystemRpc, Error> { + if let Some(new_trackers) = self.merge_layout_trackers(trackers) { + self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); self.save_cluster_layout().await?; } |