aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-12-08 14:15:52 +0100
committerAlex Auvolat <alex@adnab.me>2023-12-08 14:15:52 +0100
commit4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f (patch)
treef01ed9873593fb12d0bd8c83258d2771774c5bab /src/rpc/layout
parent64a6e557a4ff6aa1ad833a1b25ef8c85cf9ee3f3 (diff)
downloadgarage-4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f.tar.gz
garage-4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f.zip
layout: refactoring, merge two files
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/helper.rs2
-rw-r--r--src/rpc/layout/mod.rs441
-rw-r--r--src/rpc/layout/schema.rs431
-rw-r--r--src/rpc/layout/version.rs1
4 files changed, 440 insertions, 435 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 147c8b4f..2ba010b8 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use super::schema::*;
+use super::*;
use crate::replication_mode::ReplicationMode;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index eb127fda..facdb2ce 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -1,7 +1,13 @@
+use std::fmt;
+
+use bytesize::ByteSize;
+
+use garage_util::crdt::{AutoCrdt, Crdt};
+use garage_util::data::Uuid;
+
mod graph_algo;
mod helper;
mod history;
-mod schema;
mod version;
#[cfg(test)]
@@ -13,7 +19,6 @@ pub mod manager;
pub use helper::{LayoutDigest, LayoutHelper};
pub use manager::WriteLock;
-pub use schema::*;
pub use version::*;
// ---- defines: partitions ----
@@ -39,3 +44,435 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
// Change this to u16 the day we want to have more than 256 nodes in a cluster
pub type CompactNodeType = u8;
pub const MAX_NODE_NUMBER: usize = 256;
+
+// ======== actual data structures for the layout data ========
+// ======== that is persisted to disk ========
+// some small utility impls are at the end of this file,
+// but most of the code that actually computes stuff is in
+// version.rs, history.rs and helper.rs
+
+mod v08 {
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::LwwMap;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ // see comments in v010::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ #[serde(with = "serde_bytes")]
+ pub ring_assignation_data: Vec<CompactNodeType>,
+
+ /// Role changes which are staged for the next version of the layout
+ pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRoleV(pub Option<NodeRole>);
+
+ /// The user-assigned roles of cluster nodes
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRole {
+ /// Datacenter at which this entry belong. This information is used to
+ /// perform a better geodistribution
+ pub zone: String,
+ /// The capacity of the node
+ /// If this is set to None, the node does not participate in storing data for the system
+ /// and is only active as an API gateway to other nodes
+ pub capacity: Option<u64>,
+ /// A set of tags to recognize the node
+ pub tags: Vec<String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for ClusterLayout {}
+}
+
+mod v09 {
+ use super::v08;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ pub use v08::{NodeRole, NodeRoleV};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ // see comments in v010::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+
+ /// Parameters to be used in the next partition assignment computation.
+ pub staging_parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ /// This struct is used to set the parameters to be used in the assignment computation
+ /// algorithm. It is stored as a Crdt.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct LayoutParameters {
+ pub zone_redundancy: ZoneRedundancy,
+ }
+
+ /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
+ /// of each partition on at least that number of different zones.
+ /// Otherwise, copies will be stored on the maximum possible number of zones.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub enum ZoneRedundancy {
+ AtLeast(usize),
+ Maximum,
+ }
+
+ impl garage_util::migrate::Migrate for ClusterLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09layout";
+
+ type Previous = v08::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ use itertools::Itertools;
+
+ // In the old layout, capacities are in an arbitrary unit,
+ // but in the new layout they are in bytes.
+ // Here we arbitrarily multiply everything by 1G,
+ // such that 1 old capacity unit = 1GB in the new units.
+ // This is totally arbitrary and won't work for most users.
+ let cap_mul = 1024 * 1024 * 1024;
+ let roles = multiply_all_capacities(previous.roles, cap_mul);
+ let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
+ let node_id_vec = previous.node_id_vec;
+
+ // Determine partition size
+ let mut tmp = previous.ring_assignation_data.clone();
+ tmp.sort();
+ let partition_size = tmp
+ .into_iter()
+ .dedup_with_count()
+ .map(|(npart, node)| {
+ roles
+ .get(&node_id_vec[node as usize])
+ .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
+ .unwrap_or(0) / npart as u64
+ })
+ .min()
+ .unwrap_or(0);
+
+ // By default, zone_redundancy is maximum possible value
+ let parameters = LayoutParameters {
+ zone_redundancy: ZoneRedundancy::Maximum,
+ };
+
+ Self {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size,
+ parameters,
+ roles,
+ node_id_vec,
+ ring_assignment_data: previous.ring_assignation_data,
+ staging_parameters: Lww::new(parameters),
+ staging_roles,
+ staging_hash: [0u8; 32].into(), // will be set in the next migration
+ }
+ }
+ }
+
+ fn multiply_all_capacities(
+ old_roles: LwwMap<Uuid, NodeRoleV>,
+ mul: u64,
+ ) -> LwwMap<Uuid, NodeRoleV> {
+ let mut new_roles = LwwMap::new();
+ for (node, ts, role) in old_roles.items() {
+ let mut role = role.clone();
+ if let NodeRoleV(Some(NodeRole {
+ capacity: Some(ref mut cap),
+ ..
+ })) = role
+ {
+ *cap *= mul;
+ }
+ new_roles.merge_raw(node, *ts, &role);
+ }
+ new_roles
+ }
+}
+
+mod v010 {
+ use super::v09;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+ pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
+
+ /// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
+ pub const OLD_VERSION_COUNT: usize = 5;
+
+ /// The history of cluster layouts, with trackers to keep a record
+ /// of which nodes are up-to-date to current cluster data
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutHistory {
+ /// The versions currently in use in the cluster
+ pub versions: Vec<LayoutVersion>,
+ /// At most 5 of the previous versions, not used by the garage_table
+ /// module, but usefull for the garage_block module to find data blocks
+ /// that have not yet been moved
+ pub old_versions: Vec<LayoutVersion>,
+
+ /// Update trackers
+ pub update_trackers: UpdateTrackers,
+
+ /// Staged changes for the next version
+ pub staging: Lww<LayoutStaging>,
+ }
+
+ /// A version of the layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutVersion {
+ /// The number of this version
+ pub version: u64,
+
+ /// Roles assigned to nodes in this version
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ /// The number of replicas for each data partition
+ pub replication_factor: usize,
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+
+ /// node_id_vec: a vector of node IDs with a role assigned
+ /// in the system (this includes gateway nodes).
+ /// The order here is different than the vec stored by `roles`, because:
+ /// 1. non-gateway nodes are first so that they have lower numbers
+ /// 2. nodes that don't have a role are excluded (but they need to
+ /// stay in the CRDT as tombstones)
+ pub node_id_vec: Vec<Uuid>,
+ /// number of non-gateway nodes, which are the first ids in node_id_vec
+ pub nongateway_node_count: usize,
+ /// The assignation of data partitions to nodes, the values
+ /// are indices in node_id_vec
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+ }
+
+ /// The staged changes for the next layout version
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutStaging {
+ /// Parameters to be used in the next partition assignment computation.
+ pub parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ }
+
+ /// The tracker of acknowlegments and data syncs around the cluster
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
+ pub struct UpdateTrackers {
+ /// The highest layout version number each node has ack'ed
+ pub ack_map: UpdateTracker,
+ /// The highest layout version number each node has synced data for
+ pub sync_map: UpdateTracker,
+ /// The highest layout version number each node has
+ /// ack'ed that all other nodes have synced data for
+ pub sync_ack_map: UpdateTracker,
+ }
+
+ /// Generic update tracker struct
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
+ pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
+
+ impl garage_util::migrate::Migrate for LayoutHistory {
+ const VERSION_MARKER: &'static [u8] = b"G010lh";
+
+ type Previous = v09::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ let nongateway_node_count = previous
+ .node_id_vec
+ .iter()
+ .enumerate()
+ .filter(|(_, uuid)| {
+ let role = previous.roles.get(uuid);
+ matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
+ })
+ .map(|(i, _)| i + 1)
+ .max()
+ .unwrap_or(0);
+
+ let version = LayoutVersion {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size: previous.partition_size,
+ parameters: previous.parameters,
+ roles: previous.roles,
+ node_id_vec: previous.node_id_vec,
+ nongateway_node_count,
+ ring_assignment_data: previous.ring_assignment_data,
+ };
+ let update_tracker = UpdateTracker(
+ version
+ .nongateway_nodes()
+ .iter()
+ .copied()
+ .map(|x| (x, version.version))
+ .collect::<BTreeMap<Uuid, u64>>(),
+ );
+ let staging = LayoutStaging {
+ parameters: previous.staging_parameters,
+ roles: previous.staging_roles,
+ };
+ Self {
+ versions: vec![version],
+ old_versions: vec![],
+ update_trackers: UpdateTrackers {
+ ack_map: update_tracker.clone(),
+ sync_map: update_tracker.clone(),
+ sync_ack_map: update_tracker.clone(),
+ },
+ staging: Lww::raw(previous.version, staging),
+ }
+ }
+ }
+}
+
+pub use v010::*;
+
+// ---- utility functions ----
+
+impl AutoCrdt for LayoutParameters {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for NodeRoleV {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for LayoutStaging {
+ fn merge(&mut self, other: &LayoutStaging) {
+ self.parameters.merge(&other.parameters);
+ self.roles.merge(&other.roles);
+ }
+}
+
+impl NodeRole {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => ByteSize::b(c).to_string_as(false),
+ None => "gateway".to_string(),
+ }
+ }
+
+ pub fn tags_string(&self) -> String {
+ self.tags.join(",")
+ }
+}
+
+impl fmt::Display for ZoneRedundancy {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ ZoneRedundancy::Maximum => write!(f, "maximum"),
+ ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
+ }
+ }
+}
+
+impl core::str::FromStr for ZoneRedundancy {
+ type Err = &'static str;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
+ x => {
+ let v = x
+ .parse::<usize>()
+ .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
+ Ok(ZoneRedundancy::AtLeast(v))
+ }
+ }
+ }
+}
+
+impl UpdateTracker {
+ fn merge(&mut self, other: &UpdateTracker) -> bool {
+ let mut changed = false;
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
+ if *v > *v_mut {
+ *v_mut = *v;
+ changed = true;
+ }
+ } else {
+ self.0.insert(*k, *v);
+ changed = true;
+ }
+ }
+ changed
+ }
+
+ /// This bumps the update tracker for a given node up to the specified value.
+ /// This has potential impacts on the correctness of Garage and should only
+ /// be used in very specific circumstances.
+ pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
+ match self.0.get_mut(&peer) {
+ Some(e) if *e < value => {
+ *e = value;
+ true
+ }
+ None => {
+ self.0.insert(peer, value);
+ true
+ }
+ _ => false,
+ }
+ }
+
+ pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
+ storage_nodes
+ .iter()
+ .map(|x| self.get(x, min_version))
+ .min()
+ .unwrap_or(min_version)
+ }
+
+ pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
+ self.0.get(node).copied().unwrap_or(min_version)
+ }
+}
+
+impl UpdateTrackers {
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
+ let c1 = self.ack_map.merge(&other.ack_map);
+ let c2 = self.sync_map.merge(&other.sync_map);
+ let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
+ c1 || c2 || c3
+ }
+}
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
deleted file mode 100644
index df949906..00000000
--- a/src/rpc/layout/schema.rs
+++ /dev/null
@@ -1,431 +0,0 @@
-use std::fmt;
-
-use bytesize::ByteSize;
-
-use garage_util::crdt::{AutoCrdt, Crdt};
-use garage_util::data::Uuid;
-
-mod v08 {
- use crate::layout::CompactNodeType;
- use garage_util::crdt::LwwMap;
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// node_id_vec: a vector of node IDs with a role assigned
- /// in the system (this includes gateway nodes).
- /// The order here is different than the vec stored by `roles`, because:
- /// 1. non-gateway nodes are first so that they have lower numbers
- /// 2. nodes that don't have a role are excluded (but they need to
- /// stay in the CRDT as tombstones)
- pub node_id_vec: Vec<Uuid>,
- /// the assignation of data partitions to node, the values
- /// are indices in node_id_vec
- #[serde(with = "serde_bytes")]
- pub ring_assignation_data: Vec<CompactNodeType>,
-
- /// Role changes which are staged for the next version of the layout
- pub staging: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
-
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRoleV(pub Option<NodeRole>);
-
- /// The user-assigned roles of cluster nodes
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRole {
- /// Datacenter at which this entry belong. This information is used to
- /// perform a better geodistribution
- pub zone: String,
- /// The capacity of the node
- /// If this is set to None, the node does not participate in storing data for the system
- /// and is only active as an API gateway to other nodes
- pub capacity: Option<u64>,
- /// A set of tags to recognize the node
- pub tags: Vec<String>,
- }
-
- impl garage_util::migrate::InitialFormat for ClusterLayout {}
-}
-
-mod v09 {
- use super::v08;
- use crate::layout::CompactNodeType;
- use garage_util::crdt::{Lww, LwwMap};
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
- pub use v08::{NodeRole, NodeRoleV};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
-
- /// This attribute is only used to retain the previously computed partition size,
- /// to know to what extent does it change with the layout update.
- pub partition_size: u64,
- /// Parameters used to compute the assignment currently given by
- /// ring_assignment_data
- pub parameters: LayoutParameters,
-
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// see comment in v08::ClusterLayout
- pub node_id_vec: Vec<Uuid>,
- /// see comment in v08::ClusterLayout
- #[serde(with = "serde_bytes")]
- pub ring_assignment_data: Vec<CompactNodeType>,
-
- /// Parameters to be used in the next partition assignment computation.
- pub staging_parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub staging_roles: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
-
- /// This struct is used to set the parameters to be used in the assignment computation
- /// algorithm. It is stored as a Crdt.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub struct LayoutParameters {
- pub zone_redundancy: ZoneRedundancy,
- }
-
- /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
- /// of each partition on at least that number of different zones.
- /// Otherwise, copies will be stored on the maximum possible number of zones.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub enum ZoneRedundancy {
- AtLeast(usize),
- Maximum,
- }
-
- impl garage_util::migrate::Migrate for ClusterLayout {
- const VERSION_MARKER: &'static [u8] = b"G09layout";
-
- type Previous = v08::ClusterLayout;
-
- fn migrate(previous: Self::Previous) -> Self {
- use itertools::Itertools;
-
- // In the old layout, capacities are in an arbitrary unit,
- // but in the new layout they are in bytes.
- // Here we arbitrarily multiply everything by 1G,
- // such that 1 old capacity unit = 1GB in the new units.
- // This is totally arbitrary and won't work for most users.
- let cap_mul = 1024 * 1024 * 1024;
- let roles = multiply_all_capacities(previous.roles, cap_mul);
- let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
- let node_id_vec = previous.node_id_vec;
-
- // Determine partition size
- let mut tmp = previous.ring_assignation_data.clone();
- tmp.sort();
- let partition_size = tmp
- .into_iter()
- .dedup_with_count()
- .map(|(npart, node)| {
- roles
- .get(&node_id_vec[node as usize])
- .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
- .unwrap_or(0) / npart as u64
- })
- .min()
- .unwrap_or(0);
-
- // By default, zone_redundancy is maximum possible value
- let parameters = LayoutParameters {
- zone_redundancy: ZoneRedundancy::Maximum,
- };
-
- Self {
- version: previous.version,
- replication_factor: previous.replication_factor,
- partition_size,
- parameters,
- roles,
- node_id_vec,
- ring_assignment_data: previous.ring_assignation_data,
- staging_parameters: Lww::new(parameters),
- staging_roles,
- staging_hash: [0u8; 32].into(), // will be set in the next migration
- }
- }
- }
-
- fn multiply_all_capacities(
- old_roles: LwwMap<Uuid, NodeRoleV>,
- mul: u64,
- ) -> LwwMap<Uuid, NodeRoleV> {
- let mut new_roles = LwwMap::new();
- for (node, ts, role) in old_roles.items() {
- let mut role = role.clone();
- if let NodeRoleV(Some(NodeRole {
- capacity: Some(ref mut cap),
- ..
- })) = role
- {
- *cap *= mul;
- }
- new_roles.merge_raw(node, *ts, &role);
- }
- new_roles
- }
-}
-
-mod v010 {
- use super::v09;
- use crate::layout::CompactNodeType;
- use garage_util::crdt::{Lww, LwwMap};
- use garage_util::data::Uuid;
- use serde::{Deserialize, Serialize};
- use std::collections::BTreeMap;
- pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
-
- pub const OLD_VERSION_COUNT: usize = 5;
-
- /// The history of cluster layouts, with trackers to keep a record
- /// of which nodes are up-to-date to current cluster data
- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
- pub struct LayoutHistory {
- /// The versions currently in use in the cluster
- pub versions: Vec<LayoutVersion>,
- /// At most 5 of the previous versions, not used by the garage_table
- /// module, but usefull for the garage_block module to find data blocks
- /// that have not yet been moved
- pub old_versions: Vec<LayoutVersion>,
-
- /// Update trackers
- pub update_trackers: UpdateTrackers,
-
- /// Staged changes for the next version
- pub staging: Lww<LayoutStaging>,
- }
-
- /// A version of the layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
- pub struct LayoutVersion {
- pub version: u64,
-
- pub replication_factor: usize,
-
- /// This attribute is only used to retain the previously computed partition size,
- /// to know to what extent does it change with the layout update.
- pub partition_size: u64,
- /// Parameters used to compute the assignment currently given by
- /// ring_assignment_data
- pub parameters: LayoutParameters,
-
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// see comment in v08::ClusterLayout
- pub node_id_vec: Vec<Uuid>,
- /// number of non-gateway nodes, which are the first ids in node_id_vec
- pub nongateway_node_count: usize,
- /// see comment in v08::ClusterLayout
- #[serde(with = "serde_bytes")]
- pub ring_assignment_data: Vec<CompactNodeType>,
- }
-
- /// The staged changes for the next layout version
- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
- pub struct LayoutStaging {
- /// Parameters to be used in the next partition assignment computation.
- pub parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub roles: LwwMap<Uuid, NodeRoleV>,
- }
-
- /// The tracker of acknowlegments and data syncs around the cluster
- #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
- pub struct UpdateTrackers {
- /// The highest layout version number each node has ack'ed
- pub ack_map: UpdateTracker,
- /// The highest layout version number each node has synced data for
- pub sync_map: UpdateTracker,
- /// The highest layout version number each node has
- /// ack'ed that all other nodes have synced data for
- pub sync_ack_map: UpdateTracker,
- }
-
- /// The history of cluster layouts
- #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
- pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
-
- impl garage_util::migrate::Migrate for LayoutHistory {
- const VERSION_MARKER: &'static [u8] = b"G010lh";
-
- type Previous = v09::ClusterLayout;
-
- fn migrate(previous: Self::Previous) -> Self {
- let nongateway_node_count = previous
- .node_id_vec
- .iter()
- .enumerate()
- .filter(|(_, uuid)| {
- let role = previous.roles.get(uuid);
- matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
- })
- .map(|(i, _)| i + 1)
- .max()
- .unwrap_or(0);
-
- let version = LayoutVersion {
- version: previous.version,
- replication_factor: previous.replication_factor,
- partition_size: previous.partition_size,
- parameters: previous.parameters,
- roles: previous.roles,
- node_id_vec: previous.node_id_vec,
- nongateway_node_count,
- ring_assignment_data: previous.ring_assignment_data,
- };
- let update_tracker = UpdateTracker(
- version
- .nongateway_nodes()
- .iter()
- .copied()
- .map(|x| (x, version.version))
- .collect::<BTreeMap<Uuid, u64>>(),
- );
- let staging = LayoutStaging {
- parameters: previous.staging_parameters,
- roles: previous.staging_roles,
- };
- Self {
- versions: vec![version],
- old_versions: vec![],
- update_trackers: UpdateTrackers {
- ack_map: update_tracker.clone(),
- sync_map: update_tracker.clone(),
- sync_ack_map: update_tracker.clone(),
- },
- staging: Lww::raw(previous.version, staging),
- }
- }
- }
-}
-
-pub use v010::*;
-
-// ---- utility functions ----
-
-impl AutoCrdt for LayoutParameters {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl AutoCrdt for NodeRoleV {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for LayoutStaging {
- fn merge(&mut self, other: &LayoutStaging) {
- self.parameters.merge(&other.parameters);
- self.roles.merge(&other.roles);
- }
-}
-
-impl NodeRole {
- pub fn capacity_string(&self) -> String {
- match self.capacity {
- Some(c) => ByteSize::b(c).to_string_as(false),
- None => "gateway".to_string(),
- }
- }
-
- pub fn tags_string(&self) -> String {
- self.tags.join(",")
- }
-}
-
-impl fmt::Display for ZoneRedundancy {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- ZoneRedundancy::Maximum => write!(f, "maximum"),
- ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
- }
- }
-}
-
-impl core::str::FromStr for ZoneRedundancy {
- type Err = &'static str;
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
- x => {
- let v = x
- .parse::<usize>()
- .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
- Ok(ZoneRedundancy::AtLeast(v))
- }
- }
- }
-}
-
-impl UpdateTracker {
- fn merge(&mut self, other: &UpdateTracker) -> bool {
- let mut changed = false;
- for (k, v) in other.0.iter() {
- if let Some(v_mut) = self.0.get_mut(k) {
- if *v > *v_mut {
- *v_mut = *v;
- changed = true;
- }
- } else {
- self.0.insert(*k, *v);
- changed = true;
- }
- }
- changed
- }
-
- /// This bumps the update tracker for a given node up to the specified value.
- /// This has potential impacts on the correctness of Garage and should only
- /// be used in very specific circumstances.
- pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
- match self.0.get_mut(&peer) {
- Some(e) if *e < value => {
- *e = value;
- true
- }
- None => {
- self.0.insert(peer, value);
- true
- }
- _ => false,
- }
- }
-
- pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
- storage_nodes
- .iter()
- .map(|x| self.get(x, min_version))
- .min()
- .unwrap_or(min_version)
- }
-
- pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
- self.0.get(node).copied().unwrap_or(min_version)
- }
-}
-
-impl UpdateTrackers {
- pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
- let c1 = self.ack_map.merge(&other.ack_map);
- let c2 = self.sync_map.merge(&other.sync_map);
- let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
- c1 || c2 || c3
- }
-}
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index cbfbee94..5b307156 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -10,7 +10,6 @@ use garage_util::data::*;
use garage_util::error::*;
use super::graph_algo::*;
-use super::schema::*;
use super::*;
// The Message type will be used to collect information on the algorithm.