diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout.rs | 72 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/ring.rs | 164 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 14 | ||||
-rw-r--r-- | src/rpc/system.rs | 55 |
5 files changed, 100 insertions, 206 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 368a9d2c..2b5b6606 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -13,17 +13,39 @@ use garage_util::error::*; use crate::graph_algo::*; -use crate::ring::*; - use std::convert::TryInto; +// ---- defines: partitions ---- + +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 +pub const PARTITION_BITS: usize = 8; + const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; +// ---- defines: nodes ---- + +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; + +// ---- defines: other ---- + // The Message type will be used to collect information on the algorithm. -type Message = Vec<String>; +pub type Message = Vec<String>; mod v08 { - use crate::ring::CompactNodeType; + use super::CompactNodeType; use garage_util::crdt::LwwMap; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; @@ -76,7 +98,7 @@ mod v08 { mod v09 { use super::v08; - use crate::ring::CompactNodeType; + use super::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; @@ -334,6 +356,46 @@ impl ClusterLayout { )) } + /// Get the partition in which data would fall on + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + top >> (16 - PARTITION_BITS) + } + + /// Get the list of partitions and the first hash of a partition key that would fall in it + pub fn partitions(&self) -> Vec<(Partition, Hash)> { + (0..(1 << PARTITION_BITS)) + .map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) + .collect::<Vec<_>>() + } + + /// Walk the ring to find the n servers in which data should be replicated + pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> { + assert_eq!(n, self.replication_factor); + + let data = &self.ring_assignment_data; + + if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + warn!("Ring not yet ready, read/writes will be lost!"); + return vec![]; + } + + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + let partition_nodes = &data[partition_start..partition_end]; + + partition_nodes + .iter() + .map(|i| self.node_id_vec[*i as usize]) + .collect::<Vec<_>>() + } + // ===================== internal information extractors ====================== /// Returns the uuids of the non_gateway nodes in self.node_id_vec. diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index a5f8fc6e..1af8b78e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -14,7 +14,6 @@ mod kubernetes; pub mod graph_algo; pub mod layout; pub mod replication_mode; -pub mod ring; pub mod system; pub mod rpc_helper; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs deleted file mode 100644 index 6a2e5c72..00000000 --- a/src/rpc/ring.rs +++ /dev/null @@ -1,164 +0,0 @@ -//! Module containing types related to computing nodes which should receive a copy of data blocks -//! and metadata -use std::convert::TryInto; - -use garage_util::data::*; - -use crate::layout::ClusterLayout; - -/// A partition id, which is stored on 16 bits -/// i.e. we have up to 2**16 partitions. -/// (in practice we have exactly 2**PARTITION_BITS partitions) -pub type Partition = u16; - -// TODO: make this constant parametrizable in the config file -// For deployments with many nodes it might make sense to bump -// it up to 10. -// Maximum value : 16 -/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in -/// presence of numerous nodes, but exponentially bigger ring. Max 16 -pub const PARTITION_BITS: usize = 8; - -const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); - -/// A ring distributing fairly objects to nodes -#[derive(Clone)] -pub struct Ring { - /// The replication factor for this ring - pub replication_factor: usize, - - /// The network configuration used to generate this ring - pub layout: ClusterLayout, - - // Internal order of nodes used to make a more compact representation of the ring - nodes: Vec<Uuid>, - - // The list of entries in the ring - ring: Vec<RingEntry>, -} - -// Type to store compactly the id of a node in the system -// Change this to u16 the day we want to have more than 256 nodes in a cluster -pub type CompactNodeType = u8; -pub const MAX_NODE_NUMBER: usize = 256; - -// The maximum number of times an object might get replicated -// This must be at least 3 because Garage supports 3-way replication -// Here we use 6 so that the size of a ring entry is 8 bytes -// (2 bytes partition id, 6 bytes node numbers as u8s) -const MAX_REPLICATION: usize = 6; - -/// An entry in the ring -#[derive(Clone, Debug)] -struct RingEntry { - // The two first bytes of the first hash that goes in this partition - // (the next bytes are zeroes) - hash_prefix: u16, - // The nodes that store this partition, stored as a list of positions in the `nodes` - // field of the Ring structure - // Only items 0 up to ring.replication_factor - 1 are used, others are zeros - nodes_buf: [CompactNodeType; MAX_REPLICATION], -} - -impl Ring { - pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self { - if replication_factor != layout.replication_factor { - warn!("Could not build ring: replication factor does not match between local configuration and network role assignment."); - return Self::empty(layout, replication_factor); - } - - if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) { - warn!("Could not build ring: network role assignment data has invalid length"); - return Self::empty(layout, replication_factor); - } - - let nodes = layout.node_id_vec.clone(); - let ring = (0..(1 << PARTITION_BITS)) - .map(|i| { - let top = (i as u16) << (16 - PARTITION_BITS); - let mut nodes_buf = [0u8; MAX_REPLICATION]; - nodes_buf[..replication_factor].copy_from_slice( - &layout.ring_assignment_data - [replication_factor * i..replication_factor * (i + 1)], - ); - RingEntry { - hash_prefix: top, - nodes_buf, - } - }) - .collect::<Vec<_>>(); - - Self { - replication_factor, - layout, - nodes, - ring, - } - } - - fn empty(layout: ClusterLayout, replication_factor: usize) -> Self { - Self { - replication_factor, - layout, - nodes: vec![], - ring: vec![], - } - } - - /// Get the partition in which data would fall on - pub fn partition_of(&self, position: &Hash) -> Partition { - let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); - top >> (16 - PARTITION_BITS) - } - - /// Get the list of partitions and the first hash of a partition key that would fall in it - pub fn partitions(&self) -> Vec<(Partition, Hash)> { - let mut ret = vec![]; - - for (i, entry) in self.ring.iter().enumerate() { - let mut location = [0u8; 32]; - location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]); - ret.push((i as u16, location.into())); - } - if !ret.is_empty() { - assert_eq!(ret[0].1, [0u8; 32].into()); - } - - ret - } - - /// Walk the ring to find the n servers in which data should be replicated - pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> { - if self.ring.len() != 1 << PARTITION_BITS { - warn!("Ring not yet ready, read/writes will be lost!"); - return vec![]; - } - - let partition_idx = self.partition_of(position) as usize; - let partition = &self.ring[partition_idx]; - - let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); - // Check that we haven't messed up our partition table, i.e. that this partition - // table entrey indeed corresponds to the item we are storing - assert_eq!( - partition.hash_prefix & PARTITION_MASK_U16, - top & PARTITION_MASK_U16 - ); - - assert!(n <= self.replication_factor); - partition.nodes_buf[..n] - .iter() - .map(|i| self.nodes[*i as usize]) - .collect::<Vec<_>>() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_ring_entry_size() { - assert_eq!(std::mem::size_of::<RingEntry>(), 8); - } -} diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index e59c372a..56bef2f3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,8 +26,8 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use crate::layout::ClusterLayout; use crate::metrics::RpcMetrics; -use crate::ring::Ring; // Default RPC timeout = 5 minutes const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); @@ -91,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - ring: watch::Receiver<Arc<Ring>>, + layout_watch: watch::Receiver<Arc<ClusterLayout>>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +100,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - ring: watch::Receiver<Arc<Ring>>, + layout_watch: watch::Receiver<Arc<ClusterLayout>>, rpc_timeout: Option<Duration>, ) -> Self { let metrics = RpcMetrics::new(); @@ -108,7 +108,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - ring, + layout_watch, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) @@ -392,8 +392,8 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc<Ring> = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + let layout: Arc<ClusterLayout> = self.0.layout_watch.borrow().clone(); + let our_zone = match layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", }; @@ -407,7 +407,7 @@ impl RpcHelper { let mut nodes = nodes .iter() .map(|to| { - let peer_zone = match ring.layout.node_role(to) { + let peer_zone = match layout.node_role(to) { Some(pc) => &pc.zone, None => "", }; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b40bec4..106e9f8c 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -36,7 +36,6 @@ use crate::consul::ConsulDiscovery; use crate::kubernetes::*; use crate::layout::*; use crate::replication_mode::*; -use crate::ring::*; use crate::rpc_helper::*; use crate::system_metrics::*; @@ -112,9 +111,9 @@ pub struct System { replication_mode: ReplicationMode, replication_factor: usize, - /// The ring - pub ring: watch::Receiver<Arc<Ring>>, - update_ring: Mutex<watch::Sender<Arc<Ring>>>, + /// The layout + pub layout_watch: watch::Receiver<Arc<ClusterLayout>>, + update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -286,8 +285,7 @@ impl System { let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); - let ring = Ring::new(cluster_layout, replication_factor); - let (update_ring, ring) = watch::channel(Arc::new(ring)); + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); let rpc_public_addr = match &config.rpc_public_addr { Some(a_str) => { @@ -362,7 +360,7 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - ring.clone(), + layout_watch.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, @@ -378,8 +376,8 @@ impl System { kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, - ring, - update_ring: Mutex::new(update_ring), + layout_watch, + update_layout: Mutex::new(update_layout), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -426,7 +424,7 @@ impl System { } pub fn get_cluster_layout(&self) -> ClusterLayout { - self.ring.borrow().layout.clone() + self.layout_watch.borrow().as_ref().clone() } pub async fn update_cluster_layout( @@ -466,7 +464,7 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let ring: Arc<_> = self.ring.borrow().clone(); + let layout: Arc<_> = self.layout_watch.borrow().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -477,8 +475,7 @@ impl System { .collect::<HashMap<Uuid, _>>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); - let storage_nodes = ring - .layout + let storage_nodes = layout .roles .items() .iter() @@ -489,11 +486,11 @@ impl System { .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count(); - let partitions = ring.partitions(); + let partitions = layout.partitions(); let partitions_n_up = partitions .iter() .map(|(_, h)| { - let pn = ring.get_nodes(h, ring.replication_factor); + let pn = layout.nodes_of(h, layout.replication_factor); pn.iter() .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() @@ -584,9 +581,9 @@ impl System { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let ring: Arc<Ring> = self.ring.borrow().clone(); + let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone(); self.persist_cluster_layout - .save_async(&ring.layout) + .save_async(&layout) .await .expect("Cannot save current cluster layout"); Ok(()) @@ -595,9 +592,9 @@ impl System { fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let ring = self.ring.borrow(); - new_si.cluster_layout_version = ring.layout.version; - new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + let layout = self.layout_watch.borrow(); + new_si.cluster_layout_version = layout.version; + new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -612,8 +609,8 @@ impl System { } fn handle_pull_cluster_layout(&self) -> SystemRpc { - let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) } fn handle_get_known_nodes(&self) -> SystemRpc { @@ -663,8 +660,9 @@ impl System { return Err(Error::Message(msg)); } - let update_ring = self.update_ring.lock().await; - let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone(); let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { @@ -675,9 +673,8 @@ impl System { )); } - let ring = Ring::new(layout.clone(), self.replication_factor); - update_ring.send(Arc::new(ring))?; - drop(update_ring); + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); let self2 = self.clone(); tokio::spawn(async move { @@ -725,9 +722,9 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().layout.check().is_err(); + let not_configured = self.layout_watch.borrow().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.ring.borrow().layout.num_nodes(); + let expected_n_nodes = self.layout_watch.borrow().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() |