diff options
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/manager.rs | 93 |
1 files changed, 47 insertions, 46 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 351e0959..c021039b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,10 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; -use tokio::sync::watch; -use tokio::sync::Mutex; +use tokio::sync::Notify; use netapp::endpoint::Endpoint; use netapp::peering::fullmesh::FullMeshPeeringStrategy; @@ -23,8 +22,8 @@ pub struct LayoutManager { replication_factor: usize, persist_cluster_layout: Persister<LayoutHistory>, - pub layout_watch: watch::Receiver<Arc<LayoutHistory>>, - update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>, + layout: Arc<RwLock<LayoutHistory>>, + pub(crate) change_notify: Arc<Notify>, pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc<Endpoint<SystemRpc, System>>, @@ -71,20 +70,21 @@ impl LayoutManager { } }; - let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + let layout = Arc::new(RwLock::new(cluster_layout)); + let change_notify = Arc::new(Notify::new()); let rpc_helper = RpcHelper::new( node_id.into(), fullmesh, - layout_watch.clone(), + layout.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ); Ok(Arc::new(Self { replication_factor, persist_cluster_layout, - layout_watch, - update_layout: Mutex::new(update_layout), + layout, + change_notify, system_endpoint, rpc_helper, })) @@ -108,8 +108,8 @@ impl LayoutManager { Ok(()) } - pub fn layout(&self) -> watch::Ref<Arc<LayoutHistory>> { - self.layout_watch.borrow() + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + self.layout.read().unwrap() } pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) { @@ -131,7 +131,7 @@ impl LayoutManager { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning self.persist_cluster_layout .save_async(&layout) .await @@ -139,6 +139,22 @@ impl LayoutManager { Ok(()) } + fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { + let mut layout = self.layout.write().unwrap(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + + return Some(layout.clone()); + } + } + None + } + // ---- RPC HANDLERS ---- pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) { @@ -154,7 +170,7 @@ impl LayoutManager { } pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning SystemRpc::AdvertiseClusterLayout(layout) } @@ -172,42 +188,27 @@ impl LayoutManager { return Err(Error::Message(msg)); } - if *adv != **self.layout_watch.borrow() { - let update_layout = self.update_layout.lock().await; - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + if let Some(new_layout) = self.merge_layout(adv) { + self.change_notify.notify_waiters(); - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - let layout = Arc::new(layout); - update_layout.send(layout.clone())?; - drop(update_layout); // release mutex - - tokio::spawn({ - let this = self.clone(); - async move { - if let Err(e) = this - .rpc_helper - .broadcast( - &this.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + SystemRpc::AdvertiseClusterLayout(new_layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); } - }); + } + }); - self.save_cluster_layout().await?; - } + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) |