aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout.rs72
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/ring.rs164
-rw-r--r--src/rpc/rpc_helper.rs14
-rw-r--r--src/rpc/system.rs55
5 files changed, 100 insertions, 206 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 368a9d2c..2b5b6606 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -13,17 +13,39 @@ use garage_util::error::*;
use crate::graph_algo::*;
-use crate::ring::*;
-
use std::convert::TryInto;
+// ---- defines: partitions ----
+
+/// A partition id, which is stored on 16 bits
+/// i.e. we have up to 2**16 partitions.
+/// (in practice we have exactly 2**PARTITION_BITS partitions)
+pub type Partition = u16;
+
+// TODO: make this constant parametrizable in the config file
+// For deployments with many nodes it might make sense to bump
+// it up to 10.
+// Maximum value : 16
+/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
+/// presence of numerous nodes, but exponentially bigger ring. Max 16
+pub const PARTITION_BITS: usize = 8;
+
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+// ---- defines: nodes ----
+
+// Type to store compactly the id of a node in the system
+// Change this to u16 the day we want to have more than 256 nodes in a cluster
+pub type CompactNodeType = u8;
+pub const MAX_NODE_NUMBER: usize = 256;
+
+// ---- defines: other ----
+
// The Message type will be used to collect information on the algorithm.
-type Message = Vec<String>;
+pub type Message = Vec<String>;
mod v08 {
- use crate::ring::CompactNodeType;
+ use super::CompactNodeType;
use garage_util::crdt::LwwMap;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
@@ -76,7 +98,7 @@ mod v08 {
mod v09 {
use super::v08;
- use crate::ring::CompactNodeType;
+ use super::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
@@ -334,6 +356,46 @@ impl ClusterLayout {
))
}
+ /// Get the partition in which data would fall on
+ pub fn partition_of(&self, position: &Hash) -> Partition {
+ let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
+ top >> (16 - PARTITION_BITS)
+ }
+
+ /// Get the list of partitions and the first hash of a partition key that would fall in it
+ pub fn partitions(&self) -> Vec<(Partition, Hash)> {
+ (0..(1 << PARTITION_BITS))
+ .map(|i| {
+ let top = (i as u16) << (16 - PARTITION_BITS);
+ let mut location = [0u8; 32];
+ location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
+ (i as u16, Hash::from(location))
+ })
+ .collect::<Vec<_>>()
+ }
+
+ /// Walk the ring to find the n servers in which data should be replicated
+ pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
+ assert_eq!(n, self.replication_factor);
+
+ let data = &self.ring_assignment_data;
+
+ if data.len() != self.replication_factor * (1 << PARTITION_BITS) {
+ warn!("Ring not yet ready, read/writes will be lost!");
+ return vec![];
+ }
+
+ let partition_idx = self.partition_of(position) as usize;
+ let partition_start = partition_idx * self.replication_factor;
+ let partition_end = (partition_idx + 1) * self.replication_factor;
+ let partition_nodes = &data[partition_start..partition_end];
+
+ partition_nodes
+ .iter()
+ .map(|i| self.node_id_vec[*i as usize])
+ .collect::<Vec<_>>()
+ }
+
// ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index a5f8fc6e..1af8b78e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -14,7 +14,6 @@ mod kubernetes;
pub mod graph_algo;
pub mod layout;
pub mod replication_mode;
-pub mod ring;
pub mod system;
pub mod rpc_helper;
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
deleted file mode 100644
index 6a2e5c72..00000000
--- a/src/rpc/ring.rs
+++ /dev/null
@@ -1,164 +0,0 @@
-//! Module containing types related to computing nodes which should receive a copy of data blocks
-//! and metadata
-use std::convert::TryInto;
-
-use garage_util::data::*;
-
-use crate::layout::ClusterLayout;
-
-/// A partition id, which is stored on 16 bits
-/// i.e. we have up to 2**16 partitions.
-/// (in practice we have exactly 2**PARTITION_BITS partitions)
-pub type Partition = u16;
-
-// TODO: make this constant parametrizable in the config file
-// For deployments with many nodes it might make sense to bump
-// it up to 10.
-// Maximum value : 16
-/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
-/// presence of numerous nodes, but exponentially bigger ring. Max 16
-pub const PARTITION_BITS: usize = 8;
-
-const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
-
-/// A ring distributing fairly objects to nodes
-#[derive(Clone)]
-pub struct Ring {
- /// The replication factor for this ring
- pub replication_factor: usize,
-
- /// The network configuration used to generate this ring
- pub layout: ClusterLayout,
-
- // Internal order of nodes used to make a more compact representation of the ring
- nodes: Vec<Uuid>,
-
- // The list of entries in the ring
- ring: Vec<RingEntry>,
-}
-
-// Type to store compactly the id of a node in the system
-// Change this to u16 the day we want to have more than 256 nodes in a cluster
-pub type CompactNodeType = u8;
-pub const MAX_NODE_NUMBER: usize = 256;
-
-// The maximum number of times an object might get replicated
-// This must be at least 3 because Garage supports 3-way replication
-// Here we use 6 so that the size of a ring entry is 8 bytes
-// (2 bytes partition id, 6 bytes node numbers as u8s)
-const MAX_REPLICATION: usize = 6;
-
-/// An entry in the ring
-#[derive(Clone, Debug)]
-struct RingEntry {
- // The two first bytes of the first hash that goes in this partition
- // (the next bytes are zeroes)
- hash_prefix: u16,
- // The nodes that store this partition, stored as a list of positions in the `nodes`
- // field of the Ring structure
- // Only items 0 up to ring.replication_factor - 1 are used, others are zeros
- nodes_buf: [CompactNodeType; MAX_REPLICATION],
-}
-
-impl Ring {
- pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self {
- if replication_factor != layout.replication_factor {
- warn!("Could not build ring: replication factor does not match between local configuration and network role assignment.");
- return Self::empty(layout, replication_factor);
- }
-
- if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) {
- warn!("Could not build ring: network role assignment data has invalid length");
- return Self::empty(layout, replication_factor);
- }
-
- let nodes = layout.node_id_vec.clone();
- let ring = (0..(1 << PARTITION_BITS))
- .map(|i| {
- let top = (i as u16) << (16 - PARTITION_BITS);
- let mut nodes_buf = [0u8; MAX_REPLICATION];
- nodes_buf[..replication_factor].copy_from_slice(
- &layout.ring_assignment_data
- [replication_factor * i..replication_factor * (i + 1)],
- );
- RingEntry {
- hash_prefix: top,
- nodes_buf,
- }
- })
- .collect::<Vec<_>>();
-
- Self {
- replication_factor,
- layout,
- nodes,
- ring,
- }
- }
-
- fn empty(layout: ClusterLayout, replication_factor: usize) -> Self {
- Self {
- replication_factor,
- layout,
- nodes: vec![],
- ring: vec![],
- }
- }
-
- /// Get the partition in which data would fall on
- pub fn partition_of(&self, position: &Hash) -> Partition {
- let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
- top >> (16 - PARTITION_BITS)
- }
-
- /// Get the list of partitions and the first hash of a partition key that would fall in it
- pub fn partitions(&self) -> Vec<(Partition, Hash)> {
- let mut ret = vec![];
-
- for (i, entry) in self.ring.iter().enumerate() {
- let mut location = [0u8; 32];
- location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
- ret.push((i as u16, location.into()));
- }
- if !ret.is_empty() {
- assert_eq!(ret[0].1, [0u8; 32].into());
- }
-
- ret
- }
-
- /// Walk the ring to find the n servers in which data should be replicated
- pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
- if self.ring.len() != 1 << PARTITION_BITS {
- warn!("Ring not yet ready, read/writes will be lost!");
- return vec![];
- }
-
- let partition_idx = self.partition_of(position) as usize;
- let partition = &self.ring[partition_idx];
-
- let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
- // Check that we haven't messed up our partition table, i.e. that this partition
- // table entrey indeed corresponds to the item we are storing
- assert_eq!(
- partition.hash_prefix & PARTITION_MASK_U16,
- top & PARTITION_MASK_U16
- );
-
- assert!(n <= self.replication_factor);
- partition.nodes_buf[..n]
- .iter()
- .map(|i| self.nodes[*i as usize])
- .collect::<Vec<_>>()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_ring_entry_size() {
- assert_eq!(std::mem::size_of::<RingEntry>(), 8);
- }
-}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index e59c372a..56bef2f3 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -26,8 +26,8 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use crate::layout::ClusterLayout;
use crate::metrics::RpcMetrics;
-use crate::ring::Ring;
// Default RPC timeout = 5 minutes
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
@@ -91,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- ring: watch::Receiver<Arc<Ring>>,
+ layout_watch: watch::Receiver<Arc<ClusterLayout>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -100,7 +100,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- ring: watch::Receiver<Arc<Ring>>,
+ layout_watch: watch::Receiver<Arc<ClusterLayout>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
@@ -108,7 +108,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- ring,
+ layout_watch,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
@@ -392,8 +392,8 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
- let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
+ let layout: Arc<ClusterLayout> = self.0.layout_watch.borrow().clone();
+ let our_zone = match layout.node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
@@ -407,7 +407,7 @@ impl RpcHelper {
let mut nodes = nodes
.iter()
.map(|to| {
- let peer_zone = match ring.layout.node_role(to) {
+ let peer_zone = match layout.node_role(to) {
Some(pc) => &pc.zone,
None => "",
};
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 4b40bec4..106e9f8c 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -36,7 +36,6 @@ use crate::consul::ConsulDiscovery;
use crate::kubernetes::*;
use crate::layout::*;
use crate::replication_mode::*;
-use crate::ring::*;
use crate::rpc_helper::*;
use crate::system_metrics::*;
@@ -112,9 +111,9 @@ pub struct System {
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
- update_ring: Mutex<watch::Sender<Arc<Ring>>>,
+ /// The layout
+ pub layout_watch: watch::Receiver<Arc<ClusterLayout>>,
+ update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -286,8 +285,7 @@ impl System {
let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
- let ring = Ring::new(cluster_layout, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
+ let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
let rpc_public_addr = match &config.rpc_public_addr {
Some(a_str) => {
@@ -362,7 +360,7 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- ring.clone(),
+ layout_watch.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
@@ -378,8 +376,8 @@ impl System {
kubernetes_discovery: config.kubernetes_discovery.clone(),
metrics,
- ring,
- update_ring: Mutex::new(update_ring),
+ layout_watch,
+ update_layout: Mutex::new(update_layout),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -426,7 +424,7 @@ impl System {
}
pub fn get_cluster_layout(&self) -> ClusterLayout {
- self.ring.borrow().layout.clone()
+ self.layout_watch.borrow().as_ref().clone()
}
pub async fn update_cluster_layout(
@@ -466,7 +464,7 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let ring: Arc<_> = self.ring.borrow().clone();
+ let layout: Arc<_> = self.layout_watch.borrow().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
@@ -477,8 +475,7 @@ impl System {
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
- let storage_nodes = ring
- .layout
+ let storage_nodes = layout
.roles
.items()
.iter()
@@ -489,11 +486,11 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
- let partitions = ring.partitions();
+ let partitions = layout.partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
+ let pn = layout.nodes_of(h, layout.replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
@@ -584,9 +581,9 @@ impl System {
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let ring: Arc<Ring> = self.ring.borrow().clone();
+ let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
- .save_async(&ring.layout)
+ .save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
@@ -595,9 +592,9 @@ impl System {
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let ring = self.ring.borrow();
- new_si.cluster_layout_version = ring.layout.version;
- new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+ let layout = self.layout_watch.borrow();
+ new_si.cluster_layout_version = layout.version;
+ new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -612,8 +609,8 @@ impl System {
}
fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
+ let layout = self.layout_watch.borrow().as_ref().clone();
+ SystemRpc::AdvertiseClusterLayout(layout)
}
fn handle_get_known_nodes(&self) -> SystemRpc {
@@ -663,8 +660,9 @@ impl System {
return Err(Error::Message(msg));
}
- let update_ring = self.update_ring.lock().await;
- let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
+ let update_layout = self.update_layout.lock().await;
+ // TODO: don't clone each time an AdvertiseClusterLayout is received
+ let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
@@ -675,9 +673,8 @@ impl System {
));
}
- let ring = Ring::new(layout.clone(), self.replication_factor);
- update_ring.send(Arc::new(ring))?;
- drop(update_ring);
+ update_layout.send(Arc::new(layout.clone()))?;
+ drop(update_layout);
let self2 = self.clone();
tokio::spawn(async move {
@@ -725,9 +722,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().layout.check().is_err();
+ let not_configured = self.layout_watch.borrow().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.ring.borrow().layout.num_nodes();
+ let expected_n_nodes = self.layout_watch.borrow().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()