aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/ring.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/ring.rs')
-rw-r--r--src/rpc/ring.rs153
1 files changed, 109 insertions, 44 deletions
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 0f94d0f6..daeb25d8 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -7,10 +7,9 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
-// A partition number is encoded on 16 bits,
-// i.e. we have up to 2**16 partitions.
-// (in practice we have exactly 2**PARTITION_BITS partitions)
-/// A partition id, stored on 16 bits
+/// A partition id, which is stored on 16 bits
+/// i.e. we have up to 2**16 partitions.
+/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
@@ -23,11 +22,6 @@ pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
-// TODO: make this constant paraetrizable in the config file
-// (most deployments use a replication factor of 3, so...)
-/// The maximum number of time an object might get replicated
-pub const MAX_REPLICATION: usize = 3;
-
/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
@@ -53,40 +47,72 @@ pub struct NetworkConfigEntry {
/// geodistribution
pub datacenter: String,
/// The (relative) capacity of the node
- pub capacity: u32,
+ /// If this is set to None, the node does not participate in storing data for the system
+ /// and is only active as an API gateway to other nodes
+ pub capacity: Option<u32>,
/// A tag to recognize the entry, not used for other things than display
pub tag: String,
}
+impl NetworkConfigEntry {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => format!("{}", c),
+ None => "gateway".to_string(),
+ }
+ }
+}
+
/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
+ /// The replication factor for this ring
+ pub replication_factor: usize,
+
/// The network configuration used to generate this ring
pub config: NetworkConfig,
- /// The list of entries in the ring
- pub ring: Vec<RingEntry>,
+
+ // Internal order of nodes used to make a more compact representation of the ring
+ nodes: Vec<Uuid>,
+
+ // The list of entries in the ring
+ ring: Vec<RingEntry>,
}
+// Type to store compactly the id of a node in the system
+// Change this to u16 the day we want to have more than 256 nodes in a cluster
+type CompactNodeType = u8;
+
+// The maximum number of times an object might get replicated
+// This must be at least 3 because Garage supports 3-way replication
+// Here we use 6 so that the size of a ring entry is 8 bytes
+// (2 bytes partition id, 6 bytes node numbers as u8s)
+const MAX_REPLICATION: usize = 6;
+
/// An entry in the ring
#[derive(Clone, Debug)]
-pub struct RingEntry {
- /// The prefix of the Hash of object which should use this entry
- pub location: Hash,
- /// The nodes in which a matching object should get stored
- pub nodes: [Uuid; MAX_REPLICATION],
+struct RingEntry {
+ // The two first bytes of the first hash that goes in this partition
+ // (the next bytes are zeroes)
+ hash_prefix: u16,
+ // The nodes that store this partition, stored as a list of positions in the `nodes`
+ // field of the Ring structure
+ // Only items 0 up to ring.replication_factor - 1 are used, others are zeros
+ nodes_buf: [CompactNodeType; MAX_REPLICATION],
}
impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
- pub(crate) fn new(config: NetworkConfig) -> Self {
+ pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
let datacenters = config
.members
.iter()
+ .filter(|(_id, info)| info.capacity.is_some())
.map(|(_id, info)| info.datacenter.as_str())
.collect::<HashSet<&str>>();
let n_datacenters = datacenters.len();
@@ -101,6 +127,7 @@ impl Ring {
let mut queues = config
.members
.iter()
+ .filter(|(_id, info)| info.capacity.is_some())
.map(|(node_id, node_info)| {
let mut parts = partitions_idx
.iter()
@@ -119,11 +146,13 @@ impl Ring {
let max_capacity = config
.members
.iter()
- .map(|(_, node_info)| node_info.capacity)
+ .filter_map(|(_, node_info)| node_info.capacity)
.fold(0, std::cmp::max);
+ assert!(replication_factor <= MAX_REPLICATION);
+
// Fill up ring
- for rep in 0..MAX_REPLICATION {
+ for rep in 0..replication_factor {
queues.sort_by_key(|(ni, _np, _q, _p)| {
let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
fasthash(&queue_data[..])
@@ -138,7 +167,7 @@ impl Ring {
let remaining0 = remaining;
for i_round in 0..max_capacity {
for (node_id, node_info, q, pos) in queues.iter_mut() {
- if i_round >= node_info.capacity {
+ if i_round >= node_info.capacity.unwrap() {
continue;
}
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
@@ -166,34 +195,58 @@ impl Ring {
// No progress made, exit
warn!("Could not build ring, not enough nodes configured.");
return Self {
+ replication_factor,
config,
+ nodes: vec![],
ring: vec![],
};
}
}
}
+ // Make a canonical order for nodes
+ let nodes = config
+ .members
+ .iter()
+ .filter(|(_id, info)| info.capacity.is_some())
+ .map(|(id, _)| *id)
+ .collect::<Vec<_>>();
+ let nodes_rev = nodes
+ .iter()
+ .enumerate()
+ .map(|(i, id)| (*id, i as CompactNodeType))
+ .collect::<HashMap<Uuid, CompactNodeType>>();
+
let ring = partitions
.iter()
.enumerate()
.map(|(i, nodes)| {
let top = (i as u16) << (16 - PARTITION_BITS);
- let mut hash = [0u8; 32];
- hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
- let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<Uuid>>();
+ let nodes = nodes
+ .iter()
+ .map(|(id, _info)| *nodes_rev.get(id).unwrap())
+ .collect::<Vec<CompactNodeType>>();
+ assert!(nodes.len() == replication_factor);
+ let mut nodes_buf = [0u8; MAX_REPLICATION];
+ nodes_buf[..replication_factor].copy_from_slice(&nodes[..]);
RingEntry {
- location: hash.into(),
- nodes: nodes.try_into().unwrap(),
+ hash_prefix: top,
+ nodes_buf,
}
})
.collect::<Vec<_>>();
- Self { config, ring }
+ Self {
+ replication_factor,
+ config,
+ nodes,
+ ring,
+ }
}
/// Get the partition in which data would fall on
- pub fn partition_of(&self, from: &Hash) -> Partition {
- let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
+ pub fn partition_of(&self, position: &Hash) -> Partition {
+ let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS)
}
@@ -202,7 +255,9 @@ impl Ring {
let mut ret = vec![];
for (i, entry) in self.ring.iter().enumerate() {
- ret.push((i as u16, entry.location));
+ let mut location = [0u8; 32];
+ location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
+ ret.push((i as u16, location.into()));
}
if !ret.is_empty() {
assert_eq!(ret[0].1, [0u8; 32].into());
@@ -211,28 +266,38 @@ impl Ring {
ret
}
- // TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<Uuid> {
+ pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
}
- let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
- let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
- // TODO why computing two time in the same way and asserting?
- assert_eq!(partition_idx, self.partition_of(from) as usize);
-
+ let partition_idx = self.partition_of(position) as usize;
let partition = &self.ring[partition_idx];
- let partition_top =
- u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
- // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
- // probably be a test more than a runtime assertion
- assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
+ let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
+ // Check that we haven't messed up our partition table, i.e. that this partition
+ // table entrey indeed corresponds to the item we are storing
+ assert_eq!(
+ partition.hash_prefix & PARTITION_MASK_U16,
+ top & PARTITION_MASK_U16
+ );
+
+ assert!(n <= self.replication_factor);
+ partition.nodes_buf[..n]
+ .iter()
+ .map(|i| self.nodes[*i as usize])
+ .collect::<Vec<_>>()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
- assert!(n <= partition.nodes.len());
- partition.nodes[..n].to_vec()
+ #[test]
+ fn test_ring_entry_size() {
+ assert_eq!(std::mem::size_of::<RingEntry>(), 8);
}
}