From b490ebc7f6058719bd22c86fd0db95b09dc027d6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 28 May 2021 12:36:22 +0200 Subject: Many improvements on ring/replication and its configuration: - Explicit "replication_mode" configuration parameters that takes either "none", "2" or "3" as values, instead of letting user configure replication factor themselves. These are presets whose corresponding replication/quorum values can be found in replication/mode.rs - Explicit support for single-node and two-node deployments (number of nodes must be at least "replication_mode", with "none" we can have only one node) - Ring is now stored much more compactly with 256*8 + n*32 bytes, instead of 256*32 bytes - Support for gateway-only nodes that do not store data (these nodes still need a metadata_directory to store the list of bucket and keys since those are stored on all nodes; it also technically needs a data_directory to start but it will stay empty unless we have bugs) --- src/rpc/membership.rs | 7 ++- src/rpc/ring.rs | 153 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 114 insertions(+), 46 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index da7dcf8f..37cf8105 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -95,6 +95,7 @@ pub struct System { rpc_http_client: Arc, rpc_client: Arc>, + replication_factor: usize, pub(crate) status: watch::Receiver>, /// The ring pub ring: watch::Receiver>, @@ -228,6 +229,7 @@ impl System { rpc_http_client: Arc, background: Arc, rpc_server: &mut RpcServer, + replication_factor: usize, ) -> Arc { let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); info!("Node ID: {}", hex::encode(&id)); @@ -259,7 +261,7 @@ impl System { .unwrap_or_else(|_| "".to_string()), }; - let ring = Ring::new(net_config); + let ring = Ring::new(net_config, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); @@ -277,6 +279,7 @@ impl System { state_info, rpc_http_client, rpc_client, + replication_factor, status, ring, update_lock: Mutex::new(Updaters { @@ -543,7 +546,7 @@ impl System { let ring: Arc = self.ring.borrow().clone(); if adv.version > ring.config.version { - let ring = Ring::new(adv.clone()); + let ring = Ring::new(adv.clone(), self.replication_factor); update_lock.update_ring.send(Arc::new(ring))?; drop(update_lock); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 0f94d0f6..daeb25d8 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -7,10 +7,9 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -// A partition number is encoded on 16 bits, -// i.e. we have up to 2**16 partitions. -// (in practice we have exactly 2**PARTITION_BITS partitions) -/// A partition id, stored on 16 bits +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) pub type Partition = u16; // TODO: make this constant parametrizable in the config file @@ -23,11 +22,6 @@ pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); -// TODO: make this constant paraetrizable in the config file -// (most deployments use a replication factor of 3, so...) -/// The maximum number of time an object might get replicated -pub const MAX_REPLICATION: usize = 3; - /// The user-defined configuration of the cluster's nodes #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { @@ -53,40 +47,72 @@ pub struct NetworkConfigEntry { /// geodistribution pub datacenter: String, /// The (relative) capacity of the node - pub capacity: u32, + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, /// A tag to recognize the entry, not used for other things than display pub tag: String, } +impl NetworkConfigEntry { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => format!("{}", c), + None => "gateway".to_string(), + } + } +} + /// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The replication factor for this ring + pub replication_factor: usize, + /// The network configuration used to generate this ring pub config: NetworkConfig, - /// The list of entries in the ring - pub ring: Vec, + + // Internal order of nodes used to make a more compact representation of the ring + nodes: Vec, + + // The list of entries in the ring + ring: Vec, } +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +type CompactNodeType = u8; + +// The maximum number of times an object might get replicated +// This must be at least 3 because Garage supports 3-way replication +// Here we use 6 so that the size of a ring entry is 8 bytes +// (2 bytes partition id, 6 bytes node numbers as u8s) +const MAX_REPLICATION: usize = 6; + /// An entry in the ring #[derive(Clone, Debug)] -pub struct RingEntry { - /// The prefix of the Hash of object which should use this entry - pub location: Hash, - /// The nodes in which a matching object should get stored - pub nodes: [Uuid; MAX_REPLICATION], +struct RingEntry { + // The two first bytes of the first hash that goes in this partition + // (the next bytes are zeroes) + hash_prefix: u16, + // The nodes that store this partition, stored as a list of positions in the `nodes` + // field of the Ring structure + // Only items 0 up to ring.replication_factor - 1 are used, others are zeros + nodes_buf: [CompactNodeType; MAX_REPLICATION], } impl Ring { // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 // levels of imbrication. It is basically impossible to test, maintain, or understand for an // outsider. - pub(crate) fn new(config: NetworkConfig) -> Self { + pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); let datacenters = config .members .iter() + .filter(|(_id, info)| info.capacity.is_some()) .map(|(_id, info)| info.datacenter.as_str()) .collect::>(); let n_datacenters = datacenters.len(); @@ -101,6 +127,7 @@ impl Ring { let mut queues = config .members .iter() + .filter(|(_id, info)| info.capacity.is_some()) .map(|(node_id, node_info)| { let mut parts = partitions_idx .iter() @@ -119,11 +146,13 @@ impl Ring { let max_capacity = config .members .iter() - .map(|(_, node_info)| node_info.capacity) + .filter_map(|(_, node_info)| node_info.capacity) .fold(0, std::cmp::max); + assert!(replication_factor <= MAX_REPLICATION); + // Fill up ring - for rep in 0..MAX_REPLICATION { + for rep in 0..replication_factor { queues.sort_by_key(|(ni, _np, _q, _p)| { let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); fasthash(&queue_data[..]) @@ -138,7 +167,7 @@ impl Ring { let remaining0 = remaining; for i_round in 0..max_capacity { for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity { + if i_round >= node_info.capacity.unwrap() { continue; } for (pos2, &qv) in q.iter().enumerate().skip(*pos) { @@ -166,34 +195,58 @@ impl Ring { // No progress made, exit warn!("Could not build ring, not enough nodes configured."); return Self { + replication_factor, config, + nodes: vec![], ring: vec![], }; } } } + // Make a canonical order for nodes + let nodes = config + .members + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(id, _)| *id) + .collect::>(); + let nodes_rev = nodes + .iter() + .enumerate() + .map(|(i, id)| (*id, i as CompactNodeType)) + .collect::>(); + let ring = partitions .iter() .enumerate() .map(|(i, nodes)| { let top = (i as u16) << (16 - PARTITION_BITS); - let mut hash = [0u8; 32]; - hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - let nodes = nodes.iter().map(|(id, _info)| **id).collect::>(); + let nodes = nodes + .iter() + .map(|(id, _info)| *nodes_rev.get(id).unwrap()) + .collect::>(); + assert!(nodes.len() == replication_factor); + let mut nodes_buf = [0u8; MAX_REPLICATION]; + nodes_buf[..replication_factor].copy_from_slice(&nodes[..]); RingEntry { - location: hash.into(), - nodes: nodes.try_into().unwrap(), + hash_prefix: top, + nodes_buf, } }) .collect::>(); - Self { config, ring } + Self { + replication_factor, + config, + nodes, + ring, + } } /// Get the partition in which data would fall on - pub fn partition_of(&self, from: &Hash) -> Partition { - let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } @@ -202,7 +255,9 @@ impl Ring { let mut ret = vec![]; for (i, entry) in self.ring.iter().enumerate() { - ret.push((i as u16, entry.location)); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]); + ret.push((i as u16, location.into())); } if !ret.is_empty() { assert_eq!(ret[0].1, [0u8; 32].into()); @@ -211,28 +266,38 @@ impl Ring { ret } - // TODO rename this function as it no longer walk the ring /// Walk the ring to find the n servers in which data should be replicated - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; } - let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); - let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; - // TODO why computing two time in the same way and asserting? - assert_eq!(partition_idx, self.partition_of(from) as usize); - + let partition_idx = self.partition_of(position) as usize; let partition = &self.ring[partition_idx]; - let partition_top = - u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); - // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should - // probably be a test more than a runtime assertion - assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + // Check that we haven't messed up our partition table, i.e. that this partition + // table entrey indeed corresponds to the item we are storing + assert_eq!( + partition.hash_prefix & PARTITION_MASK_U16, + top & PARTITION_MASK_U16 + ); + + assert!(n <= self.replication_factor); + partition.nodes_buf[..n] + .iter() + .map(|i| self.nodes[*i as usize]) + .collect::>() + } +} + +#[cfg(test)] +mod tests { + use super::*; - assert!(n <= partition.nodes.len()); - partition.nodes[..n].to_vec() + #[test] + fn test_ring_entry_size() { + assert_eq!(std::mem::size_of::(), 8); } } -- cgit v1.2.3