aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r--src/rpc/layout/manager.rs140
1 files changed, 100 insertions, 40 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index c021039b..a2502f58 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -19,6 +19,7 @@ use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
+ node_id: Uuid,
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
@@ -34,7 +35,7 @@ pub struct LayoutStatus {
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
- // (TODO) pub cluster_layout_trackers_hash: Hash,
+ pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
}
@@ -81,6 +82,7 @@ impl LayoutManager {
);
Ok(Arc::new(Self {
+ node_id: node_id.into(),
replication_factor,
persist_cluster_layout,
layout,
@@ -92,10 +94,15 @@ impl LayoutManager {
// ---- PUBLIC INTERFACE ----
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ self.layout.read().unwrap()
+ }
+
pub fn status(&self) -> LayoutStatus {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
+ cluster_layout_trackers_hash: layout.trackers_hash,
cluster_layout_staging_hash: layout.staging_hash,
}
}
@@ -108,11 +115,35 @@ impl LayoutManager {
Ok(())
}
- pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
- self.layout.read().unwrap()
+ // ---- INTERNALS ---
+
+ fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
+ let mut layout = self.layout.write().unwrap();
+ let prev_layout_check = layout.check().is_ok();
+
+ if !prev_layout_check || adv.check().is_ok() {
+ if layout.merge(adv) {
+ if prev_layout_check && layout.check().is_err() {
+ panic!("Merged two correct layouts and got an incorrect layout.");
+ }
+
+ return Some(layout.clone());
+ }
+ }
+ None
}
- pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
+ fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
+ let mut layout = self.layout.write().unwrap();
+ if layout.update_trackers != *adv {
+ if layout.update_trackers.merge(adv) {
+ return Some(layout.update_trackers.clone());
+ }
+ }
+ None
+ }
+
+ async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
@@ -123,15 +154,35 @@ impl LayoutManager {
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
+ if let Err(e) = self.handle_advertise_cluster_layout(&layout).await {
+ warn!("In pull_cluster_layout: {}", e);
+ }
}
}
- // ---- INTERNALS ---
+ async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) {
+ let resp = self
+ .rpc_helper
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullClusterLayoutTrackers,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp {
+ if let Err(e) = self
+ .handle_advertise_cluster_layout_trackers(&trackers)
+ .await
+ {
+ warn!("In pull_cluster_layout_trackers: {}", e);
+ }
+ }
+ }
- /// Save network configuration to disc
+ /// Save cluster layout data to disk
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
+ let layout = self.layout.read().unwrap().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
@@ -139,33 +190,41 @@ impl LayoutManager {
Ok(())
}
- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
- let mut layout = self.layout.write().unwrap();
- let prev_layout_check = layout.check().is_ok();
-
- if !prev_layout_check || adv.check().is_ok() {
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- panic!("Merged two correct layouts and got an incorrect layout.");
+ fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) {
+ tokio::spawn({
+ let this = self.clone();
+ async move {
+ if let Err(e) = this
+ .rpc_helper
+ .broadcast(
+ &this.system_endpoint,
+ rpc,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
}
-
- return Some(layout.clone());
}
- }
- None
+ });
}
// ---- RPC HANDLERS ----
- pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
- let local_status = self.status();
- if status.cluster_layout_version > local_status.cluster_layout_version
- || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) {
+ let local = self.status();
+ if remote.cluster_layout_version > local.cluster_layout_version
+ || remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash
{
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout(from).await }
});
+ } else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash {
+ tokio::spawn({
+ let this = self.clone();
+ async move { this.pull_cluster_layout_trackers(from).await }
+ });
}
}
@@ -174,6 +233,11 @@ impl LayoutManager {
SystemRpc::AdvertiseClusterLayout(layout)
}
+ pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc {
+ let layout = self.layout.read().unwrap();
+ SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone())
+ }
+
pub(crate) async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
adv: &LayoutHistory,
@@ -190,24 +254,20 @@ impl LayoutManager {
if let Some(new_layout) = self.merge_layout(adv) {
self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout));
+ self.save_cluster_layout().await?;
+ }
- tokio::spawn({
- let this = self.clone();
- async move {
- if let Err(e) = this
- .rpc_helper
- .broadcast(
- &this.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(new_layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- }
- });
+ Ok(SystemRpc::Ok)
+ }
+ pub(crate) async fn handle_advertise_cluster_layout_trackers(
+ self: &Arc<Self>,
+ trackers: &UpdateTrackers,
+ ) -> Result<SystemRpc, Error> {
+ if let Some(new_trackers) = self.merge_layout_trackers(trackers) {
+ self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers));
self.save_cluster_layout().await?;
}