aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/manager.rs93
-rw-r--r--src/rpc/rpc_helper.rs11
-rw-r--r--src/rpc/system.rs15
3 files changed, 60 insertions, 59 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 351e0959..c021039b 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,10 +1,9 @@
-use std::sync::Arc;
+use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
-use tokio::sync::watch;
-use tokio::sync::Mutex;
+use tokio::sync::Notify;
use netapp::endpoint::Endpoint;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@@ -23,8 +22,8 @@ pub struct LayoutManager {
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
- pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
- update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
+ layout: Arc<RwLock<LayoutHistory>>,
+ pub(crate) change_notify: Arc<Notify>,
pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
@@ -71,20 +70,21 @@ impl LayoutManager {
}
};
- let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
+ let layout = Arc::new(RwLock::new(cluster_layout));
+ let change_notify = Arc::new(Notify::new());
let rpc_helper = RpcHelper::new(
node_id.into(),
fullmesh,
- layout_watch.clone(),
+ layout.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
);
Ok(Arc::new(Self {
replication_factor,
persist_cluster_layout,
- layout_watch,
- update_layout: Mutex::new(update_layout),
+ layout,
+ change_notify,
system_endpoint,
rpc_helper,
}))
@@ -108,8 +108,8 @@ impl LayoutManager {
Ok(())
}
- pub fn layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
- self.layout_watch.borrow()
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ self.layout.read().unwrap()
}
pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
@@ -131,7 +131,7 @@ impl LayoutManager {
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
+ let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
self.persist_cluster_layout
.save_async(&layout)
.await
@@ -139,6 +139,22 @@ impl LayoutManager {
Ok(())
}
+ fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
+ let mut layout = self.layout.write().unwrap();
+ let prev_layout_check = layout.check().is_ok();
+
+ if !prev_layout_check || adv.check().is_ok() {
+ if layout.merge(adv) {
+ if prev_layout_check && layout.check().is_err() {
+ panic!("Merged two correct layouts and got an incorrect layout.");
+ }
+
+ return Some(layout.clone());
+ }
+ }
+ None
+ }
+
// ---- RPC HANDLERS ----
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
@@ -154,7 +170,7 @@ impl LayoutManager {
}
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let layout = self.layout_watch.borrow().clone();
+ let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
SystemRpc::AdvertiseClusterLayout(layout)
}
@@ -172,42 +188,27 @@ impl LayoutManager {
return Err(Error::Message(msg));
}
- if *adv != **self.layout_watch.borrow() {
- let update_layout = self.update_layout.lock().await;
- let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
+ if let Some(new_layout) = self.merge_layout(adv) {
+ self.change_notify.notify_waiters();
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- let layout = Arc::new(layout);
- update_layout.send(layout.clone())?;
- drop(update_layout); // release mutex
-
- tokio::spawn({
- let this = self.clone();
- async move {
- if let Err(e) = this
- .rpc_helper
- .broadcast(
- &this.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
+ tokio::spawn({
+ let this = self.clone();
+ async move {
+ if let Err(e) = this
+ .rpc_helper
+ .broadcast(
+ &this.system_endpoint,
+ SystemRpc::AdvertiseClusterLayout(new_layout),
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
}
- });
+ }
+ });
- self.save_cluster_layout().await?;
- }
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 3fdb4acd..ce291068 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,12 +1,11 @@
//! Contain structs related to making RPCs
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use tokio::select;
-use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -91,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHistory>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -100,7 +99,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHistory>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
@@ -108,7 +107,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- layout_watch,
+ layout,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
@@ -392,7 +391,7 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
- let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone();
+ let layout = self.0.layout.read().unwrap();
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 88c4d443..cb3af3fe 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -4,7 +4,7 @@ use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
@@ -13,7 +13,7 @@ use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
-use tokio::sync::watch;
+use tokio::sync::{watch, Notify};
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
@@ -68,7 +68,7 @@ pub enum SystemRpc {
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(Arc<LayoutHistory>),
+ AdvertiseClusterLayout(LayoutHistory),
}
impl Rpc for SystemRpc {
@@ -345,12 +345,12 @@ impl System {
// ---- Public utilities / accessors ----
- pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
+ pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
self.layout_manager.layout()
}
- pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
- self.layout_manager.layout_watch.clone()
+ pub fn layout_notify(&self) -> Arc<Notify> {
+ self.layout_manager.change_notify.clone()
}
pub fn rpc_helper(&self) -> &RpcHelper {
@@ -412,7 +412,6 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let layout: Arc<_> = self.cluster_layout().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
@@ -423,6 +422,8 @@ impl System {
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ let layout = self.cluster_layout(); // acquires a rwlock
+
// TODO: not only layout.current()
let storage_nodes = layout
.current()