From 01a0bd54106941156ca998be1a44b8ac2c3aa74a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 13:32:13 +0100 Subject: [next-0.10] remove impl Deref for LayoutHelper --- src/api/admin/cluster.rs | 10 ++++----- src/rpc/layout/helper.rs | 54 +++++++++++++++++++++++------------------------ src/rpc/layout/manager.rs | 22 +++++++++---------- src/rpc/rpc_helper.rs | 20 ++++++++---------- src/rpc/system.rs | 6 +++--- 5 files changed, 55 insertions(+), 57 deletions(-) diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 8c9cb1e5..e5877fcd 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -78,7 +78,7 @@ pub async fn handle_get_cluster_status(garage: &Arc) -> Result) -> Result, Error> { - let res = format_cluster_layout(&garage.system.cluster_layout()); + let res = format_cluster_layout(garage.system.cluster_layout().inner()); Ok(json_ok_response(&res)?) } @@ -295,7 +295,7 @@ pub async fn handle_update_cluster_layout( ) -> Result, Error> { let updates = parse_json_body::(req).await?; - let mut layout = garage.system.cluster_layout().clone(); + let mut layout = garage.system.cluster_layout().inner().clone(); let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging.get().roles); @@ -341,7 +341,7 @@ pub async fn handle_apply_cluster_layout( ) -> Result, Error> { let param = parse_json_body::(req).await?; - let layout = garage.system.cluster_layout().clone(); + let layout = garage.system.cluster_layout().inner().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; garage @@ -360,7 +360,7 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout( garage: &Arc, ) -> Result, Error> { - let layout = garage.system.cluster_layout().clone(); + let layout = garage.system.cluster_layout().inner().clone(); let layout = layout.revert_staged_changes()?; garage .system diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index e3096945..ddf8fd44 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use serde::{Deserialize, Serialize}; @@ -49,13 +48,6 @@ pub struct LayoutHelper { pub(crate) ack_lock: HashMap, } -impl Deref for LayoutHelper { - type Target = LayoutHistory; - fn deref(&self) -> &LayoutHistory { - self.layout() - } -} - impl LayoutHelper { pub fn new( replication_factor: ReplicationFactor, @@ -131,10 +123,6 @@ impl LayoutHelper { // ------------------ single updating function -------------- - fn layout(&self) -> &LayoutHistory { - self.layout.as_ref().unwrap() - } - pub(crate) fn update(&mut self, f: F) -> bool where F: FnOnce(&mut LayoutHistory) -> bool, @@ -153,6 +141,18 @@ impl LayoutHelper { // ------------------ read helpers --------------- + pub fn inner(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() + } + + pub fn current(&self) -> &LayoutVersion { + self.inner().current() + } + + pub fn versions(&self) -> &[LayoutVersion] { + &self.inner().versions + } + /// Return all nodes that have a role (gateway or storage) /// in one of the currently active layout versions pub fn all_nodes(&self) -> &[Uuid] { @@ -175,20 +175,19 @@ impl LayoutHelper { pub fn sync_digest(&self) -> SyncLayoutDigest { SyncLayoutDigest { - current: self.layout().current().version, + current: self.current().version, ack_map_min: self.ack_map_min(), - min_stored: self.layout().min_stored(), + min_stored: self.inner().min_stored(), } } pub fn read_nodes_of(&self, position: &Hash) -> Vec { let sync_min = self.sync_map_min; let version = self - .layout() - .versions + .versions() .iter() .find(|x| x.version == sync_min) - .or(self.layout().versions.last()) + .or(self.versions().last()) .unwrap(); version .nodes_of(position, version.replication_factor) @@ -196,8 +195,7 @@ impl LayoutHelper { } pub fn storage_sets_of(&self, position: &Hash) -> Vec> { - self.layout() - .versions + self.versions() .iter() .map(|x| x.nodes_of(position, x.replication_factor).collect()) .collect() @@ -205,7 +203,7 @@ impl LayoutHelper { pub fn storage_nodes_of(&self, position: &Hash) -> Vec { let mut ret = vec![]; - for version in self.layout().versions.iter() { + for version in self.versions().iter() { ret.extend(version.nodes_of(position, version.replication_factor)); } ret.sort(); @@ -224,7 +222,7 @@ impl LayoutHelper { pub fn digest(&self) -> RpcLayoutDigest { RpcLayoutDigest { current_version: self.current().version, - active_versions: self.versions.len(), + active_versions: self.versions().len(), trackers_hash: self.trackers_hash, staging_hash: self.staging_hash, } @@ -246,13 +244,16 @@ impl LayoutHelper { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(local_node_id); - debug!("ack_map: {:?}", self.update_trackers.ack_map); - debug!("sync_map: {:?}", self.update_trackers.sync_map); - debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + debug!("ack_map: {:?}", self.inner().update_trackers.ack_map); + debug!("sync_map: {:?}", self.inner().update_trackers.sync_map); + debug!( + "sync_ack_map: {:?}", + self.inner().update_trackers.sync_ack_map + ); } fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.min_stored(); + let first_version = self.inner().min_stored(); self.update(|layout| { layout .update_trackers @@ -286,8 +287,7 @@ impl LayoutHelper { } pub(crate) fn max_free_ack(&self) -> u64 { - self.layout() - .versions + self.versions() .iter() .map(|x| x.version) .skip_while(|v| { diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 8a6eb1c3..3866f867 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -109,7 +109,7 @@ impl LayoutManager { } pub fn add_table(&self, table_name: &'static str) { - let first_version = self.layout().versions.first().unwrap().version; + let first_version = self.layout().versions().first().unwrap().version; self.table_sync_version .lock() @@ -127,7 +127,7 @@ impl LayoutManager { if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { info!("sync_until updated to {}", sync_until); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( - layout.update_trackers.clone(), + layout.inner().update_trackers.clone(), )); } } @@ -136,7 +136,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.ack_max_free(self.node_id) { self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( - layout.update_trackers.clone(), + layout.inner().update_trackers.clone(), )); } } @@ -160,16 +160,16 @@ impl LayoutManager { fn merge_layout(&self, adv: &LayoutHistory) -> Option { let mut layout = self.layout.write().unwrap(); let prev_digest = layout.digest(); - let prev_layout_check = layout.check().is_ok(); + let prev_layout_check = layout.inner().check().is_ok(); if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { layout.update_trackers(self.node_id); - if prev_layout_check && layout.check().is_err() { + if prev_layout_check && layout.inner().check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } assert!(layout.digest() != prev_digest); - return Some(layout.clone()); + return Some(layout.inner().clone()); } } @@ -180,11 +180,11 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); let prev_digest = layout.digest(); - if layout.update_trackers != *adv { + if layout.inner().update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers(self.node_id); assert!(layout.digest() != prev_digest); - return Some(layout.update_trackers.clone()); + return Some(layout.inner().update_trackers.clone()); } } @@ -230,7 +230,7 @@ impl LayoutManager { /// Save cluster layout data to disk async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout = self.layout.read().unwrap().clone(); + let layout = self.layout.read().unwrap().inner().clone(); self.persist_cluster_layout .save_async(&layout) .await @@ -278,13 +278,13 @@ impl LayoutManager { } pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout.read().unwrap().clone(); + let layout = self.layout.read().unwrap().inner().clone(); SystemRpc::AdvertiseClusterLayout(layout) } pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc { let layout = self.layout.read().unwrap(); - SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone()) + SystemRpc::AdvertiseClusterLayoutTrackers(layout.inner().update_trackers.clone()) } pub(crate) async fn handle_advertise_cluster_layout( diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 977c6ed8..05fdcce4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,7 +26,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; -use crate::layout::{LayoutHelper, LayoutHistory}; +use crate::layout::{LayoutHelper, LayoutVersion}; use crate::metrics::RpcMetrics; // Default RPC timeout = 5 minutes @@ -304,7 +304,8 @@ impl RpcHelper { // preemptively send an additional request to any remaining nodes. // Reorder requests to priorize closeness / low latency - let request_order = self.request_order(&self.0.layout.read().unwrap(), to.iter().copied()); + let request_order = + self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); // Build future for each request @@ -497,16 +498,16 @@ impl RpcHelper { let mut ret = Vec::with_capacity(12); let ver_iter = layout - .versions + .versions() .iter() .rev() - .chain(layout.old_versions.iter().rev()); + .chain(layout.inner().old_versions.iter().rev()); for ver in ver_iter { if ver.version > layout.sync_map_min() { continue; } let nodes = ver.nodes_of(position, ver.replication_factor); - for node in rpc_helper.request_order(&layout, nodes) { + for node in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { ret.push(node); } @@ -517,15 +518,12 @@ impl RpcHelper { fn request_order( &self, - layout: &LayoutHistory, + layout: &LayoutVersion, nodes: impl Iterator, ) -> Vec { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.peering.get_peer_list(); - let our_zone = layout - .current() - .get_node_zone(&self.0.our_node_id) - .unwrap_or(""); + let our_zone = layout.get_node_zone(&self.0.our_node_id).unwrap_or(""); // Augment requests with some information used to sort them. // The tuples are as follows: @@ -535,7 +533,7 @@ impl RpcHelper { // and within a same zone we priorize nodes with the lowest latency. let mut nodes = nodes .map(|to| { - let peer_zone = layout.current().get_node_zone(&to).unwrap_or(""); + let peer_zone = layout.get_node_zone(&to).unwrap_or(""); let peer_avg_ping = peer_list .iter() .find(|x| x.id.as_ref() == to.as_slice()) diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 9da1b176..b38e2e01 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -451,7 +451,7 @@ impl System { // Obtain information about nodes that have a role as storage nodes // in one of the active layout versions let mut storage_nodes = HashSet::::with_capacity(16); - for ver in layout.versions.iter() { + for ver in layout.versions().iter() { storage_nodes.extend( ver.roles .items() @@ -470,7 +470,7 @@ impl System { let mut partitions_all_ok = 0; for (_, hash) in partitions.iter() { let mut write_sets = layout - .versions + .versions() .iter() .map(|x| x.nodes_of(hash, x.replication_factor)); let has_quorum = write_sets @@ -634,7 +634,7 @@ impl System { .filter(|p| p.is_up()) .count(); - let not_configured = self.cluster_layout().check().is_err(); + let not_configured = self.cluster_layout().inner().check().is_err(); let no_peers = n_connected < self.replication_factor.into(); let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = n_connected != expected_n_nodes; -- cgit v1.2.3