From 4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 16:41:00 +0100 Subject: avoid using layout_watch in System directly --- src/api/admin/bucket.rs | 4 ++-- src/api/admin/cluster.rs | 10 +++++----- src/api/k2v/index.rs | 2 +- src/api/s3/put.rs | 2 +- src/garage/admin/bucket.rs | 4 ++-- src/garage/admin/mod.rs | 10 +++++----- src/model/helper/bucket.rs | 2 +- src/rpc/system.rs | 4 ++-- src/table/replication/fullcopy.rs | 4 ++-- src/table/replication/sharded.rs | 8 ++++---- src/table/sync.rs | 2 +- 11 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 6bff7e9f..65929d61 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -122,7 +122,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mpu_counters = garage @@ -130,7 +130,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index c8107b82..01ff9885 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -33,7 +33,7 @@ pub async fn handle_get_cluster_status(garage: &Arc) -> Result) -> Result, Error> { - let res = format_cluster_layout(&garage.system.get_cluster_layout()); + let res = format_cluster_layout(&garage.system.cluster_layout()); Ok(json_ok_response(&res)?) } @@ -207,7 +207,7 @@ pub async fn handle_update_cluster_layout( ) -> Result, Error> { let updates = parse_json_body::(req).await?; - let mut layout = garage.system.get_cluster_layout(); + let mut layout = garage.system.cluster_layout().as_ref().clone(); let mut roles = layout.roles.clone(); roles.merge(&layout.staging_roles); @@ -247,7 +247,7 @@ pub async fn handle_apply_cluster_layout( ) -> Result, Error> { let param = parse_json_body::(req).await?; - let layout = garage.system.get_cluster_layout(); + let layout = garage.system.cluster_layout().as_ref().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; @@ -265,7 +265,7 @@ pub async fn handle_revert_cluster_layout( ) -> Result, Error> { let param = parse_json_body::(req).await?; - let layout = garage.system.get_cluster_layout(); + let layout = garage.system.cluster_layout().as_ref().clone(); let layout = layout.revert_staged_changes(Some(param.version))?; garage.system.update_cluster_layout(&layout).await?; diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index ff8beda3..3fc39de6 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -26,7 +26,7 @@ pub async fn handle_read_index( ) -> Result, Error> { let reverse = reverse.unwrap_or(false); - let layout: Arc = garage.system.layout_watch.borrow().clone(); + let layout: Arc = garage.system.cluster_layout().clone(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index fc17ed03..d1c88a76 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -253,7 +253,7 @@ pub(crate) async fn check_quotas( .await?; let counters = counters - .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let (prev_cnt_obj, prev_cnt_size) = match prev_object { diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 34e48292..9e642f57 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -70,7 +70,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow())) + .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mpu_counters = self @@ -79,7 +79,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow())) + .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 006f71cd..c3fa801a 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -126,7 +126,7 @@ impl AdminRpcHandler { opt_to_send.all_nodes = false; let mut failures = vec![]; - let layout = self.garage.system.layout_watch.borrow().clone(); + let layout = self.garage.system.cluster_layout().clone(); for node in layout.node_ids().iter() { let node = (*node).into(); let resp = self @@ -163,7 +163,7 @@ impl AdminRpcHandler { async fn handle_stats(&self, opt: StatsOpt) -> Result { if opt.all_nodes { let mut ret = String::new(); - let layout = self.garage.system.layout_watch.borrow().clone(); + let layout = self.garage.system.cluster_layout().clone(); for node in layout.node_ids().iter() { let mut opt = opt.clone(); @@ -275,7 +275,7 @@ impl AdminRpcHandler { let mut ret = String::new(); // Gather storage node and free space statistics - let layout = &self.garage.system.layout_watch.borrow(); + let layout = &self.garage.system.cluster_layout(); let mut node_partition_count = HashMap::::new(); for short_id in layout.ring_assignment_data.iter() { let id = layout.node_id_vec[*short_id as usize]; @@ -440,7 +440,7 @@ impl AdminRpcHandler { ) -> Result { if all_nodes { let mut ret = vec![]; - let layout = self.garage.system.layout_watch.borrow().clone(); + let layout = self.garage.system.cluster_layout().clone(); for node in layout.node_ids().iter() { let node = (*node).into(); match self @@ -488,7 +488,7 @@ impl AdminRpcHandler { ) -> Result { if all_nodes { let mut ret = vec![]; - let layout = self.garage.system.layout_watch.borrow().clone(); + let layout = self.garage.system.cluster_layout().clone(); for node in layout.node_ids().iter() { let node = (*node).into(); match self diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index d43d7e96..8cd5b27b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -453,7 +453,7 @@ impl<'a> BucketHelper<'a> { use garage_rpc::layout::ClusterLayout; use std::sync::Arc; - let layout: Arc = self.0.system.layout_watch.borrow().clone(); + let layout: Arc = self.0.system.cluster_layout().clone(); let k2vindexes = self .0 .k2v diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 106e9f8c..93144e39 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -423,8 +423,8 @@ impl System { known_nodes } - pub fn get_cluster_layout(&self) -> ClusterLayout { - self.layout_watch.borrow().as_ref().clone() + pub fn cluster_layout(&self) -> watch::Ref> { + self.layout_watch.borrow() } pub async fn update_cluster_layout( diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index f8b7cacc..34807e3d 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,11 +27,11 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.layout_watch.borrow().node_ids().len(); + let nmembers = self.system.cluster_layout().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 95901a5a..60c95cb4 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -26,7 +26,7 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { @@ -34,7 +34,7 @@ impl TableReplication for TableShardedReplication { } fn write_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { @@ -45,9 +45,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.layout_watch.borrow().partition_of(hash) + self.system.cluster_layout().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.layout_watch.borrow().partitions() + self.system.cluster_layout().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index b2600013..65eff7cd 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -92,7 +92,7 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_watch: self.system.layout_watch.clone(), - layout: self.system.layout_watch.borrow().clone(), + layout: self.system.cluster_layout().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), -- cgit v1.2.3