aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
commitfe9af1dcaae31a117528a9cfa10c422c9a850201 (patch)
tree6e43dbb97d37d48f6af5398b4d067747e652108c
parent4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 (diff)
downloadgarage-fe9af1dcaae31a117528a9cfa10c422c9a850201.tar.gz
garage-fe9af1dcaae31a117528a9cfa10c422c9a850201.zip
WIP: garage_rpc: store layout version history
-rw-r--r--src/rpc/layout/history.rs170
-rw-r--r--src/rpc/layout/mod.rs32
-rw-r--r--src/rpc/layout/schema.rs286
-rw-r--r--src/rpc/layout/tracker.rs21
-rw-r--r--src/rpc/layout/version.rs (renamed from src/rpc/layout.rs)330
-rw-r--r--src/rpc/rpc_helper.rs12
-rw-r--r--src/rpc/system.rs44
7 files changed, 550 insertions, 345 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
new file mode 100644
index 00000000..b3019f58
--- /dev/null
+++ b/src/rpc/layout/history.rs
@@ -0,0 +1,170 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use garage_util::crdt::{Crdt, Lww, LwwMap};
+use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
+use garage_util::error::*;
+
+use super::schema::*;
+use super::*;
+
+impl LayoutHistory {
+ pub fn new(replication_factor: usize) -> Self {
+ let version = LayoutVersion::new(replication_factor);
+
+ let staging_parameters = Lww::<LayoutParameters>::new(version.parameters);
+ let empty_lwwmap = LwwMap::new();
+
+ let mut ret = LayoutHistory {
+ versions: vec![version].into_boxed_slice().into(),
+ update_trackers: Default::default(),
+ staging_parameters,
+ staging_roles: empty_lwwmap,
+ staging_hash: [0u8; 32].into(),
+ };
+ ret.staging_hash = ret.calculate_staging_hash();
+ ret
+ }
+
+ pub fn current(&self) -> &LayoutVersion {
+ self.versions.last().as_ref().unwrap()
+ }
+
+ pub(crate) fn calculate_staging_hash(&self) -> Hash {
+ let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
+ blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
+ }
+
+ // ================== updates to layout, public interface ===================
+
+ pub fn merge(&mut self, other: &LayoutHistory) -> bool {
+ let mut changed = false;
+
+ // Merge staged layout changes
+ match other.current().version.cmp(&self.current().version) {
+ Ordering::Greater => {
+ self.staging_parameters = other.staging_parameters.clone();
+ self.staging_roles = other.staging_roles.clone();
+ self.staging_hash = other.staging_hash;
+ changed = true;
+ }
+ Ordering::Equal => {
+ self.staging_parameters.merge(&other.staging_parameters);
+ self.staging_roles.merge(&other.staging_roles);
+
+ let new_staging_hash = self.calculate_staging_hash();
+ if new_staging_hash != self.staging_hash {
+ changed = true;
+ }
+
+ self.staging_hash = new_staging_hash;
+ }
+ Ordering::Less => (),
+ }
+
+ // Add any new versions to history
+ let mut versions = self.versions.to_vec();
+ for v2 in other.versions.iter() {
+ if let Some(v1) = versions.iter().find(|v| v.version == v2.version) {
+ if v1 != v2 {
+ error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
+ }
+ } else if versions.iter().all(|v| v.version != v2.version - 1) {
+ error!(
+ "Cannot receive new layout version {}, version {} is missing",
+ v2.version,
+ v2.version - 1
+ );
+ } else {
+ versions.push(v2.clone());
+ changed = true;
+ }
+ }
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ // Merge trackers
+ self.update_trackers.merge(&other.update_trackers);
+
+ changed
+ }
+
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.current().version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ let mut new_version = self.current().clone();
+ new_version.version += 1;
+
+ new_version.roles.merge(&self.staging_roles);
+ new_version.roles.retain(|(_, _, v)| v.0.is_some());
+ new_version.parameters = *self.staging_parameters.get();
+
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
+
+ let msg = new_version.calculate_partition_assignment()?;
+
+ let mut versions = self.versions.to_vec();
+ versions.push(new_version);
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ Ok((self, msg))
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.current().version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging_roles.clear();
+ self.staging_parameters.update(self.current().parameters);
+ self.staging_hash = self.calculate_staging_hash();
+
+ // TODO this is stupid, we should have a separate version counter/LWW
+ // for the staging params
+ let mut new_version = self.current().clone();
+ new_version.version += 1;
+
+ let mut versions = self.versions.to_vec();
+ versions.push(new_version);
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ Ok(self)
+ }
+
+ pub fn check(&self) -> Result<(), String> {
+ // Check that the hash of the staging data is correct
+ let staging_hash = self.calculate_staging_hash();
+ if staging_hash != self.staging_hash {
+ return Err("staging_hash is incorrect".into());
+ }
+
+ // TODO: anythign more ?
+
+ self.current().check()
+ }
+}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
new file mode 100644
index 00000000..122d4b65
--- /dev/null
+++ b/src/rpc/layout/mod.rs
@@ -0,0 +1,32 @@
+mod history;
+mod schema;
+mod tracker;
+mod version;
+
+pub use history::*;
+pub use schema::*;
+pub use version::*;
+
+// ---- defines: partitions ----
+
+/// A partition id, which is stored on 16 bits
+/// i.e. we have up to 2**16 partitions.
+/// (in practice we have exactly 2**PARTITION_BITS partitions)
+pub type Partition = u16;
+
+// TODO: make this constant parametrizable in the config file
+// For deployments with many nodes it might make sense to bump
+// it up to 10.
+// Maximum value : 16
+/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
+/// presence of numerous nodes, but exponentially bigger ring. Max 16
+pub const PARTITION_BITS: usize = 8;
+
+const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+
+// ---- defines: nodes ----
+
+// Type to store compactly the id of a node in the system
+// Change this to u16 the day we want to have more than 256 nodes in a cluster
+pub type CompactNodeType = u8;
+pub const MAX_NODE_NUMBER: usize = 256;
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
new file mode 100644
index 00000000..fa0822fa
--- /dev/null
+++ b/src/rpc/layout/schema.rs
@@ -0,0 +1,286 @@
+mod v08 {
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::LwwMap;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ /// node_id_vec: a vector of node IDs with a role assigned
+ /// in the system (this includes gateway nodes).
+ /// The order here is different than the vec stored by `roles`, because:
+ /// 1. non-gateway nodes are first so that they have lower numbers
+ /// 2. nodes that don't have a role are excluded (but they need to
+ /// stay in the CRDT as tombstones)
+ pub node_id_vec: Vec<Uuid>,
+ /// the assignation of data partitions to node, the values
+ /// are indices in node_id_vec
+ #[serde(with = "serde_bytes")]
+ pub ring_assignation_data: Vec<CompactNodeType>,
+
+ /// Role changes which are staged for the next version of the layout
+ pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRoleV(pub Option<NodeRole>);
+
+ /// The user-assigned roles of cluster nodes
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRole {
+ /// Datacenter at which this entry belong. This information is used to
+ /// perform a better geodistribution
+ pub zone: String,
+ /// The capacity of the node
+ /// If this is set to None, the node does not participate in storing data for the system
+ /// and is only active as an API gateway to other nodes
+ pub capacity: Option<u64>,
+ /// A set of tags to recognize the node
+ pub tags: Vec<String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for ClusterLayout {}
+}
+
+mod v09 {
+ use super::v08;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ pub use v08::{NodeRole, NodeRoleV};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ /// see comment in v08::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ /// see comment in v08::ClusterLayout
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+
+ /// Parameters to be used in the next partition assignment computation.
+ pub staging_parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ /// This struct is used to set the parameters to be used in the assignment computation
+ /// algorithm. It is stored as a Crdt.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct LayoutParameters {
+ pub zone_redundancy: ZoneRedundancy,
+ }
+
+ /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
+ /// of each partition on at least that number of different zones.
+ /// Otherwise, copies will be stored on the maximum possible number of zones.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub enum ZoneRedundancy {
+ AtLeast(usize),
+ Maximum,
+ }
+
+ impl garage_util::migrate::Migrate for ClusterLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09layout";
+
+ type Previous = v08::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ use itertools::Itertools;
+
+ // In the old layout, capacities are in an arbitrary unit,
+ // but in the new layout they are in bytes.
+ // Here we arbitrarily multiply everything by 1G,
+ // such that 1 old capacity unit = 1GB in the new units.
+ // This is totally arbitrary and won't work for most users.
+ let cap_mul = 1024 * 1024 * 1024;
+ let roles = multiply_all_capacities(previous.roles, cap_mul);
+ let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
+ let node_id_vec = previous.node_id_vec;
+
+ // Determine partition size
+ let mut tmp = previous.ring_assignation_data.clone();
+ tmp.sort();
+ let partition_size = tmp
+ .into_iter()
+ .dedup_with_count()
+ .map(|(npart, node)| {
+ roles
+ .get(&node_id_vec[node as usize])
+ .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
+ .unwrap_or(0) / npart as u64
+ })
+ .min()
+ .unwrap_or(0);
+
+ // By default, zone_redundancy is maximum possible value
+ let parameters = LayoutParameters {
+ zone_redundancy: ZoneRedundancy::Maximum,
+ };
+
+ Self {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size,
+ parameters,
+ roles,
+ node_id_vec,
+ ring_assignment_data: previous.ring_assignation_data,
+ staging_parameters: Lww::new(parameters),
+ staging_roles,
+ staging_hash: [0u8; 32].into(), // will be set in the next migration
+ }
+ }
+ }
+
+ fn multiply_all_capacities(
+ old_roles: LwwMap<Uuid, NodeRoleV>,
+ mul: u64,
+ ) -> LwwMap<Uuid, NodeRoleV> {
+ let mut new_roles = LwwMap::new();
+ for (node, ts, role) in old_roles.items() {
+ let mut role = role.clone();
+ if let NodeRoleV(Some(NodeRole {
+ capacity: Some(ref mut cap),
+ ..
+ })) = role
+ {
+ *cap *= mul;
+ }
+ new_roles.merge_raw(node, *ts, &role);
+ }
+ new_roles
+ }
+}
+
+mod v010 {
+ use super::v09;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ use std::collections::HashMap;
+ use std::sync::Arc;
+ pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutVersion {
+ pub version: u64,
+
+ pub replication_factor: usize,
+
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ /// see comment in v08::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ /// see comment in v08::ClusterLayout
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+ }
+
+ /// The history of cluster layouts
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct LayoutHistory {
+ /// The versions currently in use in the cluster
+ pub versions: Arc<[LayoutVersion]>,
+
+ /// Update trackers
+ pub update_trackers: UpdateTrackers,
+
+ /// Parameters to be used in the next partition assignment computation.
+ pub staging_parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ /// Hash of the serialized staging_parameters + staging_roles
+ pub staging_hash: Hash,
+ }
+
+ /// The tracker of acknowlegments and data syncs around the cluster
+ #[derive(Clone, Debug, Serialize, Deserialize, Default)]
+ pub struct UpdateTrackers {
+ /// The highest layout version number each node has ack'ed
+ pub ack_map: UpdateTracker,
+ /// The highest layout version number each node has synced data for
+ pub sync_map: UpdateTracker,
+ /// The highest layout version number each node has
+ /// ack'ed that all other nodes have synced data for
+ pub sync_ack_map: UpdateTracker,
+ }
+
+ /// The history of cluster layouts
+ #[derive(Clone, Debug, Serialize, Deserialize, Default)]
+ pub struct UpdateTracker(pub HashMap<Uuid, u64>);
+
+ impl garage_util::migrate::Migrate for LayoutHistory {
+ const VERSION_MARKER: &'static [u8] = b"G010lh";
+
+ type Previous = v09::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ let version = LayoutVersion {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size: previous.partition_size,
+ parameters: previous.parameters,
+ roles: previous.roles,
+ node_id_vec: previous.node_id_vec,
+ ring_assignment_data: previous.ring_assignment_data,
+ };
+ let update_tracker = UpdateTracker(
+ version
+ .nongateway_nodes()
+ .iter()
+ .map(|x| (*x, version.version))
+ .collect::<HashMap<Uuid, u64>>(),
+ );
+ let mut ret = Self {
+ versions: Arc::from(vec![version].into_boxed_slice()),
+ update_trackers: UpdateTrackers {
+ ack_map: update_tracker.clone(),
+ sync_map: update_tracker.clone(),
+ sync_ack_map: update_tracker.clone(),
+ },
+ staging_parameters: previous.staging_parameters,
+ staging_roles: previous.staging_roles,
+ staging_hash: [0u8; 32].into(),
+ };
+ ret.staging_hash = ret.calculate_staging_hash();
+ ret
+ }
+ }
+}
+
+pub use v010::*;
diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs
new file mode 100644
index 00000000..778121e4
--- /dev/null
+++ b/src/rpc/layout/tracker.rs
@@ -0,0 +1,21 @@
+use super::*;
+
+impl UpdateTracker {
+ fn merge(&mut self, other: &UpdateTracker) {
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
+ *v_mut = std::cmp::max(*v_mut, *v);
+ } else {
+ self.0.insert(*k, *v);
+ }
+ }
+ }
+}
+
+impl UpdateTrackers {
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
+ self.ack_map.merge(&other.ack_map);
+ self.sync_map.merge(&other.sync_map);
+ self.sync_ack_map.merge(&other.sync_ack_map);
+ }
+}
diff --git a/src/rpc/layout.rs b/src/rpc/layout/version.rs
index 2b5b6606..363bc204 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout/version.rs
@@ -1,4 +1,3 @@
-use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
@@ -6,227 +5,20 @@ use std::fmt;
use bytesize::ByteSize;
use itertools::Itertools;
-use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap};
+use garage_util::crdt::{AutoCrdt, LwwMap};
use garage_util::data::*;
-use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use crate::graph_algo::*;
use std::convert::TryInto;
-// ---- defines: partitions ----
-
-/// A partition id, which is stored on 16 bits
-/// i.e. we have up to 2**16 partitions.
-/// (in practice we have exactly 2**PARTITION_BITS partitions)
-pub type Partition = u16;
-
-// TODO: make this constant parametrizable in the config file
-// For deployments with many nodes it might make sense to bump
-// it up to 10.
-// Maximum value : 16
-/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
-/// presence of numerous nodes, but exponentially bigger ring. Max 16
-pub const PARTITION_BITS: usize = 8;
-
-const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
-
-// ---- defines: nodes ----
-
-// Type to store compactly the id of a node in the system
-// Change this to u16 the day we want to have more than 256 nodes in a cluster
-pub type CompactNodeType = u8;
-pub const MAX_NODE_NUMBER: usize = 256;
-
-// ---- defines: other ----
+use super::schema::*;
+use super::*;
// The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>;
-mod v08 {
- use super::CompactNodeType;
- use garage_util::crdt::LwwMap;
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// node_id_vec: a vector of node IDs with a role assigned
- /// in the system (this includes gateway nodes).
- /// The order here is different than the vec stored by `roles`, because:
- /// 1. non-gateway nodes are first so that they have lower numbers
- /// 2. nodes that don't have a role are excluded (but they need to
- /// stay in the CRDT as tombstones)
- pub node_id_vec: Vec<Uuid>,
- /// the assignation of data partitions to node, the values
- /// are indices in node_id_vec
- #[serde(with = "serde_bytes")]
- pub ring_assignation_data: Vec<CompactNodeType>,
-
- /// Role changes which are staged for the next version of the layout
- pub staging: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
-
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRoleV(pub Option<NodeRole>);
-
- /// The user-assigned roles of cluster nodes
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRole {
- /// Datacenter at which this entry belong. This information is used to
- /// perform a better geodistribution
- pub zone: String,
- /// The capacity of the node
- /// If this is set to None, the node does not participate in storing data for the system
- /// and is only active as an API gateway to other nodes
- pub capacity: Option<u64>,
- /// A set of tags to recognize the node
- pub tags: Vec<String>,
- }
-
- impl garage_util::migrate::InitialFormat for ClusterLayout {}
-}
-
-mod v09 {
- use super::v08;
- use super::CompactNodeType;
- use garage_util::crdt::{Lww, LwwMap};
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
- pub use v08::{NodeRole, NodeRoleV};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
-
- /// This attribute is only used to retain the previously computed partition size,
- /// to know to what extent does it change with the layout update.
- pub partition_size: u64,
- /// Parameters used to compute the assignment currently given by
- /// ring_assignment_data
- pub parameters: LayoutParameters,
-
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// see comment in v08::ClusterLayout
- pub node_id_vec: Vec<Uuid>,
- /// see comment in v08::ClusterLayout
- #[serde(with = "serde_bytes")]
- pub ring_assignment_data: Vec<CompactNodeType>,
-
- /// Parameters to be used in the next partition assignment computation.
- pub staging_parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub staging_roles: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
-
- /// This struct is used to set the parameters to be used in the assignment computation
- /// algorithm. It is stored as a Crdt.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub struct LayoutParameters {
- pub zone_redundancy: ZoneRedundancy,
- }
-
- /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
- /// of each partition on at least that number of different zones.
- /// Otherwise, copies will be stored on the maximum possible number of zones.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub enum ZoneRedundancy {
- AtLeast(usize),
- Maximum,
- }
-
- impl garage_util::migrate::Migrate for ClusterLayout {
- const VERSION_MARKER: &'static [u8] = b"G09layout";
-
- type Previous = v08::ClusterLayout;
-
- fn migrate(previous: Self::Previous) -> Self {
- use itertools::Itertools;
-
- // In the old layout, capacities are in an arbitrary unit,
- // but in the new layout they are in bytes.
- // Here we arbitrarily multiply everything by 1G,
- // such that 1 old capacity unit = 1GB in the new units.
- // This is totally arbitrary and won't work for most users.
- let cap_mul = 1024 * 1024 * 1024;
- let roles = multiply_all_capacities(previous.roles, cap_mul);
- let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
- let node_id_vec = previous.node_id_vec;
-
- // Determine partition size
- let mut tmp = previous.ring_assignation_data.clone();
- tmp.sort();
- let partition_size = tmp
- .into_iter()
- .dedup_with_count()
- .map(|(npart, node)| {
- roles
- .get(&node_id_vec[node as usize])
- .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
- .unwrap_or(0) / npart as u64
- })
- .min()
- .unwrap_or(0);
-
- // By default, zone_redundancy is maximum possible value
- let parameters = LayoutParameters {
- zone_redundancy: ZoneRedundancy::Maximum,
- };
-
- let mut res = Self {
- version: previous.version,
- replication_factor: previous.replication_factor,
- partition_size,
- parameters,
- roles,
- node_id_vec,
- ring_assignment_data: previous.ring_assignation_data,
- staging_parameters: Lww::new(parameters),
- staging_roles,
- staging_hash: [0u8; 32].into(),
- };
- res.staging_hash = res.calculate_staging_hash();
- res
- }
- }
-
- fn multiply_all_capacities(
- old_roles: LwwMap<Uuid, NodeRoleV>,
- mul: u64,
- ) -> LwwMap<Uuid, NodeRoleV> {
- let mut new_roles = LwwMap::new();
- for (node, ts, role) in old_roles.items() {
- let mut role = role.clone();
- if let NodeRoleV(Some(NodeRole {
- capacity: Some(ref mut cap),
- ..
- })) = role
- {
- *cap *= mul;
- }
- new_roles.merge_raw(node, *ts, &role);
- }
- new_roles
- }
-}
-
-pub use v09::*;
-
impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -272,19 +64,15 @@ impl core::str::FromStr for ZoneRedundancy {
}
}
-// Implementation of the ClusterLayout methods unrelated to the assignment algorithm.
-impl ClusterLayout {
+impl LayoutVersion {
pub fn new(replication_factor: usize) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum
// possible value will be used depending on the cluster topology
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
- let staging_parameters = Lww::<LayoutParameters>::new(parameters);
- let empty_lwwmap = LwwMap::new();
-
- let mut ret = ClusterLayout {
+ LayoutVersion {
version: 0,
replication_factor,
partition_size: 0,
@@ -292,12 +80,7 @@ impl ClusterLayout {
node_id_vec: Vec::new(),
ring_assignment_data: Vec::new(),
parameters,
- staging_parameters,
- staging_roles: empty_lwwmap,
- staging_hash: [0u8; 32].into(),
- };
- ret.staging_hash = ret.calculate_staging_hash();
- ret
+ }
}
// ===================== accessors ======================
@@ -399,7 +182,7 @@ impl ClusterLayout {
// ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
- fn nongateway_nodes(&self) -> Vec<Uuid> {
+ pub(crate) fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) {
@@ -446,99 +229,10 @@ impl ClusterLayout {
}
}
- fn calculate_staging_hash(&self) -> Hash {
- let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
- blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
- }
-
- // ================== updates to layout, public interface ===================
-
- pub fn merge(&mut self, other: &ClusterLayout) -> bool {
- match other.version.cmp(&self.version) {
- Ordering::Greater => {
- *self = other.clone();
- true
- }
- Ordering::Equal => {
- self.staging_parameters.merge(&other.staging_parameters);
- self.staging_roles.merge(&other.staging_roles);
-
- let new_staging_hash = self.calculate_staging_hash();
- let changed = new_staging_hash != self.staging_hash;
-
- self.staging_hash = new_staging_hash;
-
- changed
- }
- Ordering::Less => false,
- }
- }
-
- pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.roles.merge(&self.staging_roles);
- self.roles.retain(|(_, _, v)| v.0.is_some());
- self.parameters = *self.staging_parameters.get();
-
- self.staging_roles.clear();
- self.staging_hash = self.calculate_staging_hash();
-
- let msg = self.calculate_partition_assignment()?;
-
- self.version += 1;
-
- Ok((self, msg))
- }
-
- pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.staging_roles.clear();
- self.staging_parameters.update(self.parameters);
- self.staging_hash = self.calculate_staging_hash();
-
- self.version += 1;
-
- Ok(self)
- }
-
/// Check a cluster layout for internal consistency
/// (assignment, roles, parameters, partition size)
/// returns true if consistent, false if error
pub fn check(&self) -> Result<(), String> {
- // Check that the hash of the staging data is correct
- let staging_hash = self.calculate_staging_hash();
- if staging_hash != self.staging_hash {
- return Err("staging_hash is incorrect".into());
- }
-
// Check that node_id_vec contains the correct list of nodes
let mut expected_nodes = self
.roles
@@ -654,7 +348,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// data to be moved.
/// Staged role changes must be merged with nodes roles before calling this function,
/// hence it must only be called from apply_staged_changes() and hence is not public.
- fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
+ pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
// We update the node ids, since the node role list might have changed with the
// changes in the layout. We retrieve the old_assignment reframed with new ids
let old_assignment_opt = self.update_node_id_vec()?;
@@ -911,7 +605,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
let vertices =
- ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
+ LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS {
@@ -1214,7 +908,7 @@ mod tests {
// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
// With these parameters, the naive algo fails, whereas there is a solution:
// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
- fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
+ fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
let over_size = cl.partition_size + 1;
let mut zone_token = HashMap::<String, usize>::new();
@@ -1280,7 +974,7 @@ mod tests {
}
fn update_layout(
- cl: &mut ClusterLayout,
+ cl: &mut LayoutVersion,
node_id_vec: &Vec<u8>,
node_capacity_vec: &Vec<u64>,
node_zone_vec: &Vec<String>,
@@ -1316,7 +1010,7 @@ mod tests {
.map(|x| x.to_string())
.collect();
- let mut cl = ClusterLayout::new(3);
+ let mut cl = LayoutVersion::new(3);
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 56bef2f3..3fdb4acd 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -26,7 +26,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
-use crate::layout::ClusterLayout;
+use crate::layout::LayoutHistory;
use crate::metrics::RpcMetrics;
// Default RPC timeout = 5 minutes
@@ -91,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<ClusterLayout>>,
+ layout_watch: watch::Receiver<Arc<LayoutHistory>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -100,7 +100,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<ClusterLayout>>,
+ layout_watch: watch::Receiver<Arc<LayoutHistory>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
@@ -392,8 +392,8 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
- let layout: Arc<ClusterLayout> = self.0.layout_watch.borrow().clone();
- let our_zone = match layout.node_role(&self.0.our_node_id) {
+ let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone();
+ let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
@@ -407,7 +407,7 @@ impl RpcHelper {
let mut nodes = nodes
.iter()
.map(|to| {
- let peer_zone = match layout.node_role(to) {
+ let peer_zone = match layout.current().node_role(to) {
Some(pc) => &pc.zone,
None => "",
};
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 93144e39..86d724f1 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -64,7 +64,7 @@ pub enum SystemRpc {
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(ClusterLayout),
+ AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
@@ -84,7 +84,7 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<ClusterLayout>,
+ persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -112,8 +112,8 @@ pub struct System {
replication_factor: usize,
/// The layout
- pub layout_watch: watch::Receiver<Arc<ClusterLayout>>,
- update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>,
+ pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -256,16 +256,16 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<ClusterLayout> =
+ let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
- if x.replication_factor != replication_factor {
+ if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.replication_factor,
+ x.current().replication_factor,
replication_factor
)));
}
@@ -276,7 +276,7 @@ impl System {
"No valid previous cluster layout stored ({}), starting fresh.",
e
);
- ClusterLayout::new(replication_factor)
+ LayoutHistory::new(replication_factor)
}
};
@@ -423,13 +423,13 @@ impl System {
known_nodes
}
- pub fn cluster_layout(&self) -> watch::Ref<Arc<ClusterLayout>> {
+ pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow()
}
pub async fn update_cluster_layout(
self: &Arc<Self>,
- layout: &ClusterLayout,
+ layout: &LayoutHistory,
) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
@@ -475,7 +475,9 @@ impl System {
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ // TODO: not only layout.current()
let storage_nodes = layout
+ .current()
.roles
.items()
.iter()
@@ -486,11 +488,11 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
- let partitions = layout.partitions();
+ let partitions = layout.current().partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
- let pn = layout.nodes_of(h, layout.replication_factor);
+ let pn = layout.current().nodes_of(h, replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
@@ -581,7 +583,7 @@ impl System {
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone();
+ let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
@@ -593,7 +595,7 @@ impl System {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let layout = self.layout_watch.borrow();
- new_si.cluster_layout_version = layout.version;
+ new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -648,12 +650,12 @@ impl System {
async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
- adv: &ClusterLayout,
+ adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
- if adv.replication_factor != self.replication_factor {
+ if adv.current().replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.replication_factor,
+ adv.current().replication_factor,
self.replication_factor
);
error!("{}", msg);
@@ -662,7 +664,7 @@ impl System {
let update_layout = self.update_layout.lock().await;
// TODO: don't clone each time an AdvertiseClusterLayout is received
- let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone();
+ let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
@@ -724,7 +726,7 @@ impl System {
while !*stop_signal.borrow() {
let not_configured = self.layout_watch.borrow().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.layout_watch.borrow().num_nodes();
+ let expected_n_nodes = self.layout_watch.borrow().current().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
@@ -863,13 +865,13 @@ impl EndpointHandler<SystemRpc> for System {
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self {
NodeStatus {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor,
- cluster_layout_version: layout.version,
+ cluster_layout_version: layout.current().version,
cluster_layout_staging_hash: layout.staging_hash,
meta_disk_avail: None,
data_disk_avail: None,