From 12d1dbfc6b884be488e2d79c0b9e3c47490f5442 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 15:41:24 +0100 Subject: remove Ring and use ClusterLayout everywhere --- src/api/admin/bucket.rs | 4 +- src/api/k2v/index.rs | 8 +- src/api/s3/put.rs | 2 +- src/garage/admin/bucket.rs | 4 +- src/garage/admin/mod.rs | 20 ++--- src/model/helper/bucket.rs | 6 +- src/model/index_counter.rs | 6 +- src/rpc/layout.rs | 72 ++++++++++++++-- src/rpc/lib.rs | 1 - src/rpc/ring.rs | 164 ------------------------------------ src/rpc/rpc_helper.rs | 14 +-- src/rpc/system.rs | 55 ++++++------ src/table/merkle.rs | 2 +- src/table/replication/fullcopy.rs | 8 +- src/table/replication/parameters.rs | 2 +- src/table/replication/sharded.rs | 14 +-- src/table/sync.rs | 20 ++--- 17 files changed, 148 insertions(+), 254 deletions(-) delete mode 100644 src/rpc/ring.rs (limited to 'src') diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 17f46c30..6bff7e9f 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -122,7 +122,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) .unwrap_or_default(); let mpu_counters = garage @@ -130,7 +130,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 6c1d4a91..ff8beda3 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,7 +5,7 @@ use serde::Serialize; use garage_util::data::*; -use garage_rpc::ring::Ring; +use garage_rpc::layout::ClusterLayout; use garage_table::util::*; use garage_model::garage::Garage; @@ -26,7 +26,7 @@ pub async fn handle_read_index( ) -> Result, Error> { let reverse = reverse.unwrap_or(false); - let ring: Arc = garage.system.ring.borrow().clone(); + let layout: Arc = garage.system.layout_watch.borrow().clone(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, @@ -35,7 +35,7 @@ pub async fn handle_read_index( &start, &end, limit, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), EnumerationOrder::from_reverse(reverse), ) .await?; @@ -54,7 +54,7 @@ pub async fn handle_read_index( partition_keys: partition_keys .into_iter() .map(|part| { - let vals = part.filtered_values(&ring); + let vals = part.filtered_values(&layout); ReadIndexResponseEntry { pk: part.sk, entries: *vals.get(&s_entries).unwrap_or(&0), diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 606facc4..fc17ed03 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -253,7 +253,7 @@ pub(crate) async fn check_quotas( .await?; let counters = counters - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.layout_watch.borrow())) .unwrap_or_default(); let (prev_cnt_obj, prev_cnt_size) = match prev_object { diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 0781cb8b..34e48292 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -70,7 +70,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow())) .unwrap_or_default(); let mpu_counters = self @@ -79,7 +79,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index b6f9c426..006f71cd 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -18,7 +18,7 @@ use garage_util::error::Error as GarageError; use garage_table::replication::*; use garage_table::*; -use garage_rpc::ring::PARTITION_BITS; +use garage_rpc::layout::PARTITION_BITS; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; @@ -126,8 +126,8 @@ impl AdminRpcHandler { opt_to_send.all_nodes = false; let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let layout = self.garage.system.layout_watch.borrow().clone(); + for node in layout.node_ids().iter() { let node = (*node).into(); let resp = self .endpoint @@ -163,9 +163,9 @@ impl AdminRpcHandler { async fn handle_stats(&self, opt: StatsOpt) -> Result { if opt.all_nodes { let mut ret = String::new(); - let ring = self.garage.system.ring.borrow().clone(); + let layout = self.garage.system.layout_watch.borrow().clone(); - for node in ring.layout.node_ids().iter() { + for node in layout.node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; opt.skip_global = true; @@ -275,7 +275,7 @@ impl AdminRpcHandler { let mut ret = String::new(); // Gather storage node and free space statistics - let layout = &self.garage.system.ring.borrow().layout; + let layout = &self.garage.system.layout_watch.borrow(); let mut node_partition_count = HashMap::::new(); for short_id in layout.ring_assignment_data.iter() { let id = layout.node_id_vec[*short_id as usize]; @@ -440,8 +440,8 @@ impl AdminRpcHandler { ) -> Result { if all_nodes { let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let layout = self.garage.system.layout_watch.borrow().clone(); + for node in layout.node_ids().iter() { let node = (*node).into(); match self .endpoint @@ -488,8 +488,8 @@ impl AdminRpcHandler { ) -> Result { if all_nodes { let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let layout = self.garage.system.layout_watch.borrow().clone(); + for node in layout.node_ids().iter() { let node = (*node).into(); match self .endpoint diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 576d03f3..d43d7e96 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,10 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::ring::Ring; + use garage_rpc::layout::ClusterLayout; use std::sync::Arc; - let ring: Arc = self.0.system.ring.borrow().clone(); + let layout: Arc = self.0.system.layout_watch.borrow().clone(); let k2vindexes = self .0 .k2v @@ -462,7 +462,7 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), 10, EnumerationOrder::Forward, ) diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index a46c165f..d514cb06 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::ring::Ring; +use garage_rpc::layout::ClusterLayout; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,8 +83,8 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, ring: &Ring) -> HashMap { - let nodes = &ring.layout.node_id_vec[..]; + pub fn filtered_values(&self, layout: &ClusterLayout) -> HashMap { + let nodes = &layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) } diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 368a9d2c..2b5b6606 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -13,17 +13,39 @@ use garage_util::error::*; use crate::graph_algo::*; -use crate::ring::*; - use std::convert::TryInto; +// ---- defines: partitions ---- + +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 +pub const PARTITION_BITS: usize = 8; + const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; +// ---- defines: nodes ---- + +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; + +// ---- defines: other ---- + // The Message type will be used to collect information on the algorithm. -type Message = Vec; +pub type Message = Vec; mod v08 { - use crate::ring::CompactNodeType; + use super::CompactNodeType; use garage_util::crdt::LwwMap; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; @@ -76,7 +98,7 @@ mod v08 { mod v09 { use super::v08; - use crate::ring::CompactNodeType; + use super::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; @@ -334,6 +356,46 @@ impl ClusterLayout { )) } + /// Get the partition in which data would fall on + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + top >> (16 - PARTITION_BITS) + } + + /// Get the list of partitions and the first hash of a partition key that would fall in it + pub fn partitions(&self) -> Vec<(Partition, Hash)> { + (0..(1 << PARTITION_BITS)) + .map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) + .collect::>() + } + + /// Walk the ring to find the n servers in which data should be replicated + pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec { + assert_eq!(n, self.replication_factor); + + let data = &self.ring_assignment_data; + + if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + warn!("Ring not yet ready, read/writes will be lost!"); + return vec![]; + } + + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + let partition_nodes = &data[partition_start..partition_end]; + + partition_nodes + .iter() + .map(|i| self.node_id_vec[*i as usize]) + .collect::>() + } + // ===================== internal information extractors ====================== /// Returns the uuids of the non_gateway nodes in self.node_id_vec. diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index a5f8fc6e..1af8b78e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -14,7 +14,6 @@ mod kubernetes; pub mod graph_algo; pub mod layout; pub mod replication_mode; -pub mod ring; pub mod system; pub mod rpc_helper; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs deleted file mode 100644 index 6a2e5c72..00000000 --- a/src/rpc/ring.rs +++ /dev/null @@ -1,164 +0,0 @@ -//! Module containing types related to computing nodes which should receive a copy of data blocks -//! and metadata -use std::convert::TryInto; - -use garage_util::data::*; - -use crate::layout::ClusterLayout; - -/// A partition id, which is stored on 16 bits -/// i.e. we have up to 2**16 partitions. -/// (in practice we have exactly 2**PARTITION_BITS partitions) -pub type Partition = u16; - -// TODO: make this constant parametrizable in the config file -// For deployments with many nodes it might make sense to bump -// it up to 10. -// Maximum value : 16 -/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in -/// presence of numerous nodes, but exponentially bigger ring. Max 16 -pub const PARTITION_BITS: usize = 8; - -const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); - -/// A ring distributing fairly objects to nodes -#[derive(Clone)] -pub struct Ring { - /// The replication factor for this ring - pub replication_factor: usize, - - /// The network configuration used to generate this ring - pub layout: ClusterLayout, - - // Internal order of nodes used to make a more compact representation of the ring - nodes: Vec, - - // The list of entries in the ring - ring: Vec, -} - -// Type to store compactly the id of a node in the system -// Change this to u16 the day we want to have more than 256 nodes in a cluster -pub type CompactNodeType = u8; -pub const MAX_NODE_NUMBER: usize = 256; - -// The maximum number of times an object might get replicated -// This must be at least 3 because Garage supports 3-way replication -// Here we use 6 so that the size of a ring entry is 8 bytes -// (2 bytes partition id, 6 bytes node numbers as u8s) -const MAX_REPLICATION: usize = 6; - -/// An entry in the ring -#[derive(Clone, Debug)] -struct RingEntry { - // The two first bytes of the first hash that goes in this partition - // (the next bytes are zeroes) - hash_prefix: u16, - // The nodes that store this partition, stored as a list of positions in the `nodes` - // field of the Ring structure - // Only items 0 up to ring.replication_factor - 1 are used, others are zeros - nodes_buf: [CompactNodeType; MAX_REPLICATION], -} - -impl Ring { - pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self { - if replication_factor != layout.replication_factor { - warn!("Could not build ring: replication factor does not match between local configuration and network role assignment."); - return Self::empty(layout, replication_factor); - } - - if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) { - warn!("Could not build ring: network role assignment data has invalid length"); - return Self::empty(layout, replication_factor); - } - - let nodes = layout.node_id_vec.clone(); - let ring = (0..(1 << PARTITION_BITS)) - .map(|i| { - let top = (i as u16) << (16 - PARTITION_BITS); - let mut nodes_buf = [0u8; MAX_REPLICATION]; - nodes_buf[..replication_factor].copy_from_slice( - &layout.ring_assignment_data - [replication_factor * i..replication_factor * (i + 1)], - ); - RingEntry { - hash_prefix: top, - nodes_buf, - } - }) - .collect::>(); - - Self { - replication_factor, - layout, - nodes, - ring, - } - } - - fn empty(layout: ClusterLayout, replication_factor: usize) -> Self { - Self { - replication_factor, - layout, - nodes: vec![], - ring: vec![], - } - } - - /// Get the partition in which data would fall on - pub fn partition_of(&self, position: &Hash) -> Partition { - let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); - top >> (16 - PARTITION_BITS) - } - - /// Get the list of partitions and the first hash of a partition key that would fall in it - pub fn partitions(&self) -> Vec<(Partition, Hash)> { - let mut ret = vec![]; - - for (i, entry) in self.ring.iter().enumerate() { - let mut location = [0u8; 32]; - location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]); - ret.push((i as u16, location.into())); - } - if !ret.is_empty() { - assert_eq!(ret[0].1, [0u8; 32].into()); - } - - ret - } - - /// Walk the ring to find the n servers in which data should be replicated - pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec { - if self.ring.len() != 1 << PARTITION_BITS { - warn!("Ring not yet ready, read/writes will be lost!"); - return vec![]; - } - - let partition_idx = self.partition_of(position) as usize; - let partition = &self.ring[partition_idx]; - - let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); - // Check that we haven't messed up our partition table, i.e. that this partition - // table entrey indeed corresponds to the item we are storing - assert_eq!( - partition.hash_prefix & PARTITION_MASK_U16, - top & PARTITION_MASK_U16 - ); - - assert!(n <= self.replication_factor); - partition.nodes_buf[..n] - .iter() - .map(|i| self.nodes[*i as usize]) - .collect::>() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_ring_entry_size() { - assert_eq!(std::mem::size_of::(), 8); - } -} diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index e59c372a..56bef2f3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,8 +26,8 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use crate::layout::ClusterLayout; use crate::metrics::RpcMetrics; -use crate::ring::Ring; // Default RPC timeout = 5 minutes const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); @@ -91,7 +91,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - ring: watch::Receiver>, + layout_watch: watch::Receiver>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +100,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - ring: watch::Receiver>, + layout_watch: watch::Receiver>, rpc_timeout: Option, ) -> Self { let metrics = RpcMetrics::new(); @@ -108,7 +108,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - ring, + layout_watch, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) @@ -392,8 +392,8 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + let layout: Arc = self.0.layout_watch.borrow().clone(); + let our_zone = match layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", }; @@ -407,7 +407,7 @@ impl RpcHelper { let mut nodes = nodes .iter() .map(|to| { - let peer_zone = match ring.layout.node_role(to) { + let peer_zone = match layout.node_role(to) { Some(pc) => &pc.zone, None => "", }; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b40bec4..106e9f8c 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -36,7 +36,6 @@ use crate::consul::ConsulDiscovery; use crate::kubernetes::*; use crate::layout::*; use crate::replication_mode::*; -use crate::ring::*; use crate::rpc_helper::*; use crate::system_metrics::*; @@ -112,9 +111,9 @@ pub struct System { replication_mode: ReplicationMode, replication_factor: usize, - /// The ring - pub ring: watch::Receiver>, - update_ring: Mutex>>, + /// The layout + pub layout_watch: watch::Receiver>, + update_layout: Mutex>>, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -286,8 +285,7 @@ impl System { let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); - let ring = Ring::new(cluster_layout, replication_factor); - let (update_ring, ring) = watch::channel(Arc::new(ring)); + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); let rpc_public_addr = match &config.rpc_public_addr { Some(a_str) => { @@ -362,7 +360,7 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - ring.clone(), + layout_watch.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, @@ -378,8 +376,8 @@ impl System { kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, - ring, - update_ring: Mutex::new(update_ring), + layout_watch, + update_layout: Mutex::new(update_layout), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -426,7 +424,7 @@ impl System { } pub fn get_cluster_layout(&self) -> ClusterLayout { - self.ring.borrow().layout.clone() + self.layout_watch.borrow().as_ref().clone() } pub async fn update_cluster_layout( @@ -466,7 +464,7 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let ring: Arc<_> = self.ring.borrow().clone(); + let layout: Arc<_> = self.layout_watch.borrow().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -477,8 +475,7 @@ impl System { .collect::>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); - let storage_nodes = ring - .layout + let storage_nodes = layout .roles .items() .iter() @@ -489,11 +486,11 @@ impl System { .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count(); - let partitions = ring.partitions(); + let partitions = layout.partitions(); let partitions_n_up = partitions .iter() .map(|(_, h)| { - let pn = ring.get_nodes(h, ring.replication_factor); + let pn = layout.nodes_of(h, layout.replication_factor); pn.iter() .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() @@ -584,9 +581,9 @@ impl System { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let ring: Arc = self.ring.borrow().clone(); + let layout: Arc = self.layout_watch.borrow().clone(); self.persist_cluster_layout - .save_async(&ring.layout) + .save_async(&layout) .await .expect("Cannot save current cluster layout"); Ok(()) @@ -595,9 +592,9 @@ impl System { fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let ring = self.ring.borrow(); - new_si.cluster_layout_version = ring.layout.version; - new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + let layout = self.layout_watch.borrow(); + new_si.cluster_layout_version = layout.version; + new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -612,8 +609,8 @@ impl System { } fn handle_pull_cluster_layout(&self) -> SystemRpc { - let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) } fn handle_get_known_nodes(&self) -> SystemRpc { @@ -663,8 +660,9 @@ impl System { return Err(Error::Message(msg)); } - let update_ring = self.update_ring.lock().await; - let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone(); let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { @@ -675,9 +673,8 @@ impl System { )); } - let ring = Ring::new(layout.clone(), self.replication_factor); - update_ring.send(Arc::new(ring))?; - drop(update_ring); + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); let self2 = self.clone(); tokio::spawn(async move { @@ -725,9 +722,9 @@ impl System { async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().layout.check().is_err(); + let not_configured = self.layout_watch.borrow().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.ring.borrow().layout.num_nodes(); + let expected_n_nodes = self.layout_watch.borrow().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 4577f872..01271c58 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -13,7 +13,7 @@ use garage_util::data::*; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use crate::data::*; use crate::replication::*; diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 18682ace..f8b7cacc 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_util::data::*; @@ -27,11 +27,11 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.layout.node_ids().to_vec() + let layout = self.system.layout_watch.borrow(); + layout.node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.ring.borrow().layout.node_ids().len(); + let nmembers = self.system.layout_watch.borrow().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index f00815a2..19b306f2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,4 @@ -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 1cf964af..95901a5a 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_util::data::*; @@ -26,16 +26,16 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.get_nodes(hash, self.replication_factor) + let layout = self.system.layout_watch.borrow(); + layout.nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } fn write_nodes(&self, hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.get_nodes(hash, self.replication_factor) + let layout = self.system.layout_watch.borrow(); + layout.nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { self.write_quorum @@ -45,9 +45,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.ring.borrow().partition_of(hash) + self.system.layout_watch.borrow().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.ring.borrow().partitions() + self.system.layout_watch.borrow().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 92a353c6..b2600013 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -17,7 +17,7 @@ use garage_util::data::*; use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_rpc::*; @@ -91,8 +91,8 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - ring_recv: self.system.ring.clone(), - ring: self.system.ring.borrow().clone(), + layout_watch: self.system.layout_watch.clone(), + layout: self.system.layout_watch.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - ring_recv: watch::Receiver>, - ring: Arc, + layout_watch: watch::Receiver>, + layout: Arc, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, @@ -593,11 +593,11 @@ impl Worker for SyncWorker { self.add_full_sync(); } }, - _ = self.ring_recv.changed() => { - let new_ring = self.ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &self.ring) { - self.ring = new_ring.clone(); - drop(new_ring); + _ = self.layout_watch.changed() => { + let new_layout = self.layout_watch.borrow(); + if !Arc::ptr_eq(&new_layout, &self.layout) { + self.layout = new_layout.clone(); + drop(new_layout); debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } -- cgit v1.2.3