aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs55
1 files changed, 26 insertions, 29 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 4b40bec4..106e9f8c 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -36,7 +36,6 @@ use crate::consul::ConsulDiscovery;
use crate::kubernetes::*;
use crate::layout::*;
use crate::replication_mode::*;
-use crate::ring::*;
use crate::rpc_helper::*;
use crate::system_metrics::*;
@@ -112,9 +111,9 @@ pub struct System {
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
- update_ring: Mutex<watch::Sender<Arc<Ring>>>,
+ /// The layout
+ pub layout_watch: watch::Receiver<Arc<ClusterLayout>>,
+ update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -286,8 +285,7 @@ impl System {
let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
- let ring = Ring::new(cluster_layout, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
+ let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
let rpc_public_addr = match &config.rpc_public_addr {
Some(a_str) => {
@@ -362,7 +360,7 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- ring.clone(),
+ layout_watch.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
@@ -378,8 +376,8 @@ impl System {
kubernetes_discovery: config.kubernetes_discovery.clone(),
metrics,
- ring,
- update_ring: Mutex::new(update_ring),
+ layout_watch,
+ update_layout: Mutex::new(update_layout),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -426,7 +424,7 @@ impl System {
}
pub fn get_cluster_layout(&self) -> ClusterLayout {
- self.ring.borrow().layout.clone()
+ self.layout_watch.borrow().as_ref().clone()
}
pub async fn update_cluster_layout(
@@ -466,7 +464,7 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let ring: Arc<_> = self.ring.borrow().clone();
+ let layout: Arc<_> = self.layout_watch.borrow().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
@@ -477,8 +475,7 @@ impl System {
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
- let storage_nodes = ring
- .layout
+ let storage_nodes = layout
.roles
.items()
.iter()
@@ -489,11 +486,11 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
- let partitions = ring.partitions();
+ let partitions = layout.partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
+ let pn = layout.nodes_of(h, layout.replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
@@ -584,9 +581,9 @@ impl System {
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let ring: Arc<Ring> = self.ring.borrow().clone();
+ let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
- .save_async(&ring.layout)
+ .save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
@@ -595,9 +592,9 @@ impl System {
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let ring = self.ring.borrow();
- new_si.cluster_layout_version = ring.layout.version;
- new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+ let layout = self.layout_watch.borrow();
+ new_si.cluster_layout_version = layout.version;
+ new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -612,8 +609,8 @@ impl System {
}
fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
+ let layout = self.layout_watch.borrow().as_ref().clone();
+ SystemRpc::AdvertiseClusterLayout(layout)
}
fn handle_get_known_nodes(&self) -> SystemRpc {
@@ -663,8 +660,9 @@ impl System {
return Err(Error::Message(msg));
}
- let update_ring = self.update_ring.lock().await;
- let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
+ let update_layout = self.update_layout.lock().await;
+ // TODO: don't clone each time an AdvertiseClusterLayout is received
+ let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
@@ -675,9 +673,8 @@ impl System {
));
}
- let ring = Ring::new(layout.clone(), self.replication_factor);
- update_ring.send(Arc::new(ring))?;
- drop(update_ring);
+ update_layout.send(Arc::new(layout.clone()))?;
+ drop(update_layout);
let self2 = self.clone();
tokio::spawn(async move {
@@ -725,9 +722,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().layout.check().is_err();
+ let not_configured = self.layout_watch.borrow().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.ring.borrow().layout.num_nodes();
+ let expected_n_nodes = self.layout_watch.borrow().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()