diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b40bec4..106e9f8c 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -36,7 +36,6 @@ use crate::consul::ConsulDiscovery; use crate::kubernetes::*; use crate::layout::*; use crate::replication_mode::*; -use crate::ring::*; use crate::rpc_helper::*; use crate::system_metrics::*; @@ -112,9 +111,9 @@ pub struct System { replication_mode: ReplicationMode, replication_factor: usize, - /// The ring - pub ring: watch::Receiver<Arc<Ring>>, - update_ring: Mutex<watch::Sender<Arc<Ring>>>, + /// The layout + pub layout_watch: watch::Receiver<Arc<ClusterLayout>>, + update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -286,8 +285,7 @@ impl System { let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); - let ring = Ring::new(cluster_layout, replication_factor); - let (update_ring, ring) = watch::channel(Arc::new(ring)); + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); let rpc_public_addr = match &config.rpc_public_addr { Some(a_str) => { @@ -362,7 +360,7 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - ring.clone(), + layout_watch.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, @@ -378,8 +376,8 @@ impl System { kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, - ring, - update_ring: Mutex::new(update_ring), + layout_watch, + update_layout: Mutex::new(update_layout), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -426,7 +424,7 @@ impl System { } pub fn get_cluster_layout(&self) -> ClusterLayout { - self.ring.borrow().layout.clone() + self.layout_watch.borrow().as_ref().clone() } pub async fn update_cluster_layout( @@ -466,7 +464,7 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let ring: Arc<_> = self.ring.borrow().clone(); + let layout: Arc<_> = self.layout_watch.borrow().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -477,8 +475,7 @@ impl System { .collect::<HashMap<Uuid, _>>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); - let storage_nodes = ring - .layout + let storage_nodes = layout .roles .items() .iter() @@ -489,11 +486,11 @@ impl System { .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count(); - let partitions = ring.partitions(); + let partitions = layout.partitions(); let partitions_n_up = partitions .iter() .map(|(_, h)| { - let pn = ring.get_nodes(h, ring.replication_factor); + let pn = layout.nodes_of(h, layout.replication_factor); pn.iter() .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() @@ -584,9 +581,9 @@ impl System { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let ring: Arc<Ring> = self.ring.borrow().clone(); + let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone(); self.persist_cluster_layout - .save_async(&ring.layout) + .save_async(&layout) .await .expect("Cannot save current cluster layout"); Ok(()) @@ -595,9 +592,9 @@ impl System { fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let ring = self.ring.borrow(); - new_si.cluster_layout_version = ring.layout.version; - new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + let layout = self.layout_watch.borrow(); + new_si.cluster_layout_version = layout.version; + new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -612,8 +609,8 @@ impl System { } fn handle_pull_cluster_layout(&self) -> SystemRpc { - let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) } fn handle_get_known_nodes(&self) -> SystemRpc { @@ -663,8 +660,9 @@ impl System { return Err(Error::Message(msg)); } - let update_ring = self.update_ring.lock().await; - let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone(); let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { @@ -675,9 +673,8 @@ impl System { )); } - let ring = Ring::new(layout.clone(), self.replication_factor); - update_ring.send(Arc::new(ring))?; - drop(update_ring); + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); let self2 = self.clone(); tokio::spawn(async move { @@ -725,9 +722,9 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().layout.check().is_err(); + let not_configured = self.layout_watch.borrow().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.ring.borrow().layout.num_nodes(); + let expected_n_nodes = self.layout_watch.borrow().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() |