aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-05 16:22:29 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-05 16:22:29 +0100
commitd7e005251dad73eb7d6f2154c6082f61d16554d0 (patch)
treea160bf1dfff3555bcf8a3c52a40863cde2cc4a51 /src
parent3882d5ba36f48751fdf6e5b82eae0dd990238655 (diff)
downloadgarage-d7e005251dad73eb7d6f2154c6082f61d16554d0.tar.gz
garage-d7e005251dad73eb7d6f2154c6082f61d16554d0.zip
Not fully tested: new multi-dc MagLev
Diffstat (limited to 'src')
-rw-r--r--src/rpc/membership.rs12
-rw-r--r--src/rpc/ring.rs207
-rw-r--r--src/table/table_sharded.rs1
3 files changed, 139 insertions, 81 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index f9047b35..44d7122a 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -218,12 +218,7 @@ impl System {
.unwrap_or("<invalid utf-8>".to_string()),
};
- let mut ring = Ring {
- config: net_config,
- ring: Vec::new(),
- n_datacenters: 0,
- };
- ring.rebuild_ring();
+ let ring = Ring::new(net_config);
let (update_ring, ring) = watch::channel(Arc::new(ring));
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
@@ -531,10 +526,7 @@ impl System {
let ring: Arc<Ring> = self.ring.borrow().clone();
if adv.version > ring.config.version {
- let mut ring = ring.as_ref().clone();
-
- ring.config = adv.clone();
- ring.rebuild_ring();
+ let ring = Ring::new(adv.clone());
update_lock.1.broadcast(Arc::new(ring))?;
drop(update_lock);
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 5ca43ac9..906ab9f8 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -1,9 +1,22 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use std::convert::TryInto;
use serde::{Deserialize, Serialize};
use garage_util::data::*;
+// TODO: make this constant parametrizable in the config file
+// For deployments with many nodes it might make sense to bump
+// it up to 10.
+// Maximum value : 16
+pub const PARTITION_BITS: usize = 8;
+
+const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
+
+// TODO: make this constant paraetrizable in the config file
+// (most deployments use a replication factor of 3, so...)
+pub const MAX_REPLICATION: usize = 3;
+
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
pub members: HashMap<UUID, NetworkConfigEntry>,
@@ -30,96 +43,150 @@ pub struct NetworkConfigEntry {
pub struct Ring {
pub config: NetworkConfig,
pub ring: Vec<RingEntry>,
- pub n_datacenters: usize,
}
#[derive(Clone, Debug)]
pub struct RingEntry {
pub location: Hash,
- pub node: UUID,
- datacenter: usize,
+ pub nodes: [UUID; MAX_REPLICATION],
}
impl Ring {
- pub(crate) fn rebuild_ring(&mut self) {
- let mut new_ring = vec![];
- let mut datacenters = vec![];
-
- for (id, config) in self.config.members.iter() {
- let datacenter = &config.datacenter;
-
- if !datacenters.contains(datacenter) {
- datacenters.push(datacenter.to_string());
+ pub(crate) fn new(config: NetworkConfig) -> Self {
+ // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
+ let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
+
+ let datacenters = config
+ .members
+ .iter()
+ .map(|(_id, info)| info.datacenter.as_str())
+ .collect::<HashSet<&str>>();
+ let n_datacenters = datacenters.len();
+
+ // Prepare ring
+ let mut partitions: Vec<Vec<(&UUID, &NetworkConfigEntry)>> = partitions_idx
+ .iter()
+ .map(|_i| Vec::new())
+ .collect::<Vec<_>>();
+
+ // Create MagLev priority queues for each node
+ let mut queues = config
+ .members
+ .iter()
+ .map(|(node_id, node_info)| {
+ let mut parts = partitions_idx
+ .iter()
+ .map(|i| {
+ let part_data =
+ [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
+ (*i, fasthash(&part_data[..]))
+ })
+ .collect::<Vec<_>>();
+ parts.sort_by_key(|(_i, h)| *h);
+ let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
+ (node_id, node_info, parts_i, 0)
+ })
+ .collect::<Vec<_>>();
+
+ let max_toktok = config
+ .members
+ .iter()
+ .map(|(_, node_info)| node_info.n_tokens)
+ .fold(0, std::cmp::max);
+
+ // Fill up ring
+ for rep in 0..MAX_REPLICATION {
+ queues.sort_by_key(|(ni, _np, _q, _p)| {
+ let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
+ fasthash(&queue_data[..])
+ });
+
+ for (_, _, _, pos) in queues.iter_mut() {
+ *pos = 0;
}
- let datacenter_idx = datacenters
- .iter()
- .enumerate()
- .find(|(_, dc)| *dc == datacenter)
- .unwrap()
- .0;
-
- for i in 0..config.n_tokens {
- let location = sha256sum(format!("{} {}", hex::encode(&id), i).as_bytes());
-
- new_ring.push(RingEntry {
- location: location.into(),
- node: *id,
- datacenter: datacenter_idx,
- })
+
+ let mut remaining = partitions_idx.len();
+ while remaining > 0 {
+ let remaining0 = remaining;
+ for toktok in 0..max_toktok {
+ for (node_id, node_info, q, pos) in queues.iter_mut() {
+ if toktok >= node_info.n_tokens {
+ continue;
+ }
+ for pos2 in *pos..q.len() {
+ let qv = q[pos2];
+ if partitions[qv].len() != rep {
+ continue;
+ }
+ let p_dcs = partitions[qv]
+ .iter()
+ .map(|(_id, info)| info.datacenter.as_str())
+ .collect::<HashSet<&str>>();
+ if !partitions[qv]
+ .iter()
+ .any(|(_id, i)| *i.datacenter == node_info.datacenter)
+ || (p_dcs.len() == n_datacenters
+ && !partitions[qv].iter().any(|(id, _i)| id == node_id))
+ {
+ partitions[qv].push((node_id, node_info));
+ remaining -= 1;
+ *pos = pos2 + 1;
+ break;
+ }
+ }
+ }
+ }
+ if remaining == remaining0 {
+ // No progress made, exit
+ warn!("Could not build ring, not enough nodes configured.");
+ return Self {
+ config,
+ ring: vec![],
+ };
+ }
}
}
- new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
- self.ring = new_ring;
- self.n_datacenters = datacenters.len();
-
- // eprintln!("RING: --");
- // for e in self.ring.iter() {
- // eprintln!("{:?}", e);
- // }
- // eprintln!("END --");
- }
+ let ring = partitions
+ .iter()
+ .enumerate()
+ .map(|(i, nodes)| {
+ let top = (i as u16) << (16 - PARTITION_BITS);
+ let mut hash = [0u8; 32];
+ hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
+ let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<UUID>>();
+ RingEntry {
+ location: hash.into(),
+ nodes: nodes.try_into().unwrap(),
+ }
+ })
+ .collect::<Vec<_>>();
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
- if n >= self.config.members.len() {
- return self.config.members.keys().cloned().collect::<Vec<_>>();
+ eprintln!("RING: --");
+ for e in ring.iter() {
+ eprintln!("{:?}", e);
}
+ eprintln!("END --");
- let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
- Ok(i) => i,
- Err(i) => {
- if i == 0 {
- self.ring.len() - 1
- } else {
- i - 1
- }
- }
- };
-
- self.walk_ring_from_pos(start, n)
+ Self { config, ring }
}
- fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
- if n >= self.config.members.len() {
- return self.config.members.keys().cloned().collect::<Vec<_>>();
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ if self.ring.len() != 1 << PARTITION_BITS {
+ warn!("Ring not yet ready, read/writes will be lost");
+ return vec![];
}
- let mut ret = vec![];
- let mut datacenters = vec![];
+ let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
- let mut delta = 0;
- while ret.len() < n {
- let i = (start + delta) % self.ring.len();
- delta += 1;
+ let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
+ let partition = &self.ring[partition_idx];
- if !datacenters.contains(&self.ring[i].datacenter) {
- ret.push(self.ring[i].node);
- datacenters.push(self.ring[i].datacenter);
- } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
- ret.push(self.ring[i].node);
- }
- }
+ let partition_top =
+ u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
+ assert!(partition_top & PARTITION_MASK_U16 == top & PARTITION_MASK_U16);
- ret
+ assert!(n <= partition.nodes.len());
+ partition.nodes[..n].iter().cloned().collect::<Vec<_>>()
}
}
diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs
index 47bdfeaf..098637dd 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/table_sharded.rs
@@ -44,7 +44,6 @@ impl TableReplication for TableShardedReplication {
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
- ret.push([0u8; 32].into());
for entry in ring.ring.iter() {
ret.push(entry.location);
}