diff options
author | Alex Auvolat <alex@adnab.me> | 2023-12-08 14:15:52 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-12-08 14:15:52 +0100 |
commit | 4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f (patch) | |
tree | f01ed9873593fb12d0bd8c83258d2771774c5bab /src/rpc/layout | |
parent | 64a6e557a4ff6aa1ad833a1b25ef8c85cf9ee3f3 (diff) | |
download | garage-4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f.tar.gz garage-4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f.zip |
layout: refactoring, merge two files
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/helper.rs | 2 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 441 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 431 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 1 |
4 files changed, 440 insertions, 435 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 147c8b4f..2ba010b8 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -use super::schema::*; +use super::*; use crate::replication_mode::ReplicationMode; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index eb127fda..facdb2ce 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,7 +1,13 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; +use garage_util::data::Uuid; + mod graph_algo; mod helper; mod history; -mod schema; mod version; #[cfg(test)] @@ -13,7 +19,6 @@ pub mod manager; pub use helper::{LayoutDigest, LayoutHelper}; pub use manager::WriteLock; -pub use schema::*; pub use version::*; // ---- defines: partitions ---- @@ -39,3 +44,435 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; // Change this to u16 the day we want to have more than 256 nodes in a cluster pub type CompactNodeType = u8; pub const MAX_NODE_NUMBER: usize = 256; + +// ======== actual data structures for the layout data ======== +// ======== that is persisted to disk ======== +// some small utility impls are at the end of this file, +// but most of the code that actually computes stuff is in +// version.rs, history.rs and helper.rs + +mod v08 { + use crate::layout::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec<Uuid>, + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option<NodeRole>); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u64>, + /// A set of tags to recognize the node + pub tags: Vec<String>, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} +} + +mod v09 { + use super::v08; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec<Uuid>, + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"G09layout"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), // will be set in the next migration + } + } + } + + fn multiply_all_capacities( + old_roles: LwwMap<Uuid, NodeRoleV>, + mul: u64, + ) -> LwwMap<Uuid, NodeRoleV> { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap *= mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } +} + +mod v010 { + use super::v09; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + + /// Number of old (non-live) versions to keep, see LayoutHistory::old_versions + pub const OLD_VERSION_COUNT: usize = 5; + + /// The history of cluster layouts, with trackers to keep a record + /// of which nodes are up-to-date to current cluster data + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Vec<LayoutVersion>, + /// At most 5 of the previous versions, not used by the garage_table + /// module, but usefull for the garage_block module to find data blocks + /// that have not yet been moved + pub old_versions: Vec<LayoutVersion>, + + /// Update trackers + pub update_trackers: UpdateTrackers, + + /// Staged changes for the next version + pub staging: Lww<LayoutStaging>, + } + + /// A version of the layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutVersion { + /// The number of this version + pub version: u64, + + /// Roles assigned to nodes in this version + pub roles: LwwMap<Uuid, NodeRoleV>, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + /// The number of replicas for each data partition + pub replication_factor: usize, + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// number of non-gateway nodes, which are the first ids in node_id_vec + pub nongateway_node_count: usize, + /// The assignation of data partitions to nodes, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + } + + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap<Uuid, NodeRoleV>, + } + + /// The tracker of acknowlegments and data syncs around the cluster + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTrackers { + /// The highest layout version number each node has ack'ed + pub ack_map: UpdateTracker, + /// The highest layout version number each node has synced data for + pub sync_map: UpdateTracker, + /// The highest layout version number each node has + /// ack'ed that all other nodes have synced data for + pub sync_ack_map: UpdateTracker, + } + + /// Generic update tracker struct + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTracker(pub BTreeMap<Uuid, u64>); + + impl garage_util::migrate::Migrate for LayoutHistory { + const VERSION_MARKER: &'static [u8] = b"G010lh"; + + type Previous = v09::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + let nongateway_node_count = previous + .node_id_vec + .iter() + .enumerate() + .filter(|(_, uuid)| { + let role = previous.roles.get(uuid); + matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) + }) + .map(|(i, _)| i + 1) + .max() + .unwrap_or(0); + + let version = LayoutVersion { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size: previous.partition_size, + parameters: previous.parameters, + roles: previous.roles, + node_id_vec: previous.node_id_vec, + nongateway_node_count, + ring_assignment_data: previous.ring_assignment_data, + }; + let update_tracker = UpdateTracker( + version + .nongateway_nodes() + .iter() + .copied() + .map(|x| (x, version.version)) + .collect::<BTreeMap<Uuid, u64>>(), + ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; + Self { + versions: vec![version], + old_versions: vec![], + update_trackers: UpdateTrackers { + ack_map: update_tracker.clone(), + sync_map: update_tracker.clone(), + sync_ack_map: update_tracker.clone(), + }, + staging: Lww::raw(previous.version, staging), + } + } + } +} + +pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::<usize>() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) -> bool { + let mut changed = false; + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + if *v > *v_mut { + *v_mut = *v; + changed = true; + } + } else { + self.0.insert(*k, *v); + changed = true; + } + } + changed + } + + /// This bumps the update tracker for a given node up to the specified value. + /// This has potential impacts on the correctness of Garage and should only + /// be used in very specific circumstances. + pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { + match self.0.get_mut(&peer) { + Some(e) if *e < value => { + *e = value; + true + } + None => { + self.0.insert(peer, value); + true + } + _ => false, + } + } + + pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + storage_nodes + .iter() + .map(|x| self.get(x, min_version)) + .min() + .unwrap_or(min_version) + } + + pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { + self.0.get(node).copied().unwrap_or(min_version) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { + let c1 = self.ack_map.merge(&other.ack_map); + let c2 = self.sync_map.merge(&other.sync_map); + let c3 = self.sync_ack_map.merge(&other.sync_ack_map); + c1 || c2 || c3 + } +} diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs deleted file mode 100644 index df949906..00000000 --- a/src/rpc/layout/schema.rs +++ /dev/null @@ -1,431 +0,0 @@ -use std::fmt; - -use bytesize::ByteSize; - -use garage_util::crdt::{AutoCrdt, Crdt}; -use garage_util::data::Uuid; - -mod v08 { - use crate::layout::CompactNodeType; - use garage_util::crdt::LwwMap; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec<Uuid>, - /// the assignation of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignation_data: Vec<CompactNodeType>, - - /// Role changes which are staged for the next version of the layout - pub staging: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, - } - - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRoleV(pub Option<NodeRole>); - - /// The user-assigned roles of cluster nodes - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option<u64>, - /// A set of tags to recognize the node - pub tags: Vec<String>, - } - - impl garage_util::migrate::InitialFormat for ClusterLayout {} -} - -mod v09 { - use super::v08; - use crate::layout::CompactNodeType; - use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - pub use v08::{NodeRole, NodeRoleV}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, - - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// see comment in v08::ClusterLayout - pub node_id_vec: Vec<Uuid>, - /// see comment in v08::ClusterLayout - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec<CompactNodeType>, - - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, - } - - /// This struct is used to set the parameters to be used in the assignment computation - /// algorithm. It is stored as a Crdt. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub struct LayoutParameters { - pub zone_redundancy: ZoneRedundancy, - } - - /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies - /// of each partition on at least that number of different zones. - /// Otherwise, copies will be stored on the maximum possible number of zones. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub enum ZoneRedundancy { - AtLeast(usize), - Maximum, - } - - impl garage_util::migrate::Migrate for ClusterLayout { - const VERSION_MARKER: &'static [u8] = b"G09layout"; - - type Previous = v08::ClusterLayout; - - fn migrate(previous: Self::Previous) -> Self { - use itertools::Itertools; - - // In the old layout, capacities are in an arbitrary unit, - // but in the new layout they are in bytes. - // Here we arbitrarily multiply everything by 1G, - // such that 1 old capacity unit = 1GB in the new units. - // This is totally arbitrary and won't work for most users. - let cap_mul = 1024 * 1024 * 1024; - let roles = multiply_all_capacities(previous.roles, cap_mul); - let staging_roles = multiply_all_capacities(previous.staging, cap_mul); - let node_id_vec = previous.node_id_vec; - - // Determine partition size - let mut tmp = previous.ring_assignation_data.clone(); - tmp.sort(); - let partition_size = tmp - .into_iter() - .dedup_with_count() - .map(|(npart, node)| { - roles - .get(&node_id_vec[node as usize]) - .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) - .unwrap_or(0) / npart as u64 - }) - .min() - .unwrap_or(0); - - // By default, zone_redundancy is maximum possible value - let parameters = LayoutParameters { - zone_redundancy: ZoneRedundancy::Maximum, - }; - - Self { - version: previous.version, - replication_factor: previous.replication_factor, - partition_size, - parameters, - roles, - node_id_vec, - ring_assignment_data: previous.ring_assignation_data, - staging_parameters: Lww::new(parameters), - staging_roles, - staging_hash: [0u8; 32].into(), // will be set in the next migration - } - } - } - - fn multiply_all_capacities( - old_roles: LwwMap<Uuid, NodeRoleV>, - mul: u64, - ) -> LwwMap<Uuid, NodeRoleV> { - let mut new_roles = LwwMap::new(); - for (node, ts, role) in old_roles.items() { - let mut role = role.clone(); - if let NodeRoleV(Some(NodeRole { - capacity: Some(ref mut cap), - .. - })) = role - { - *cap *= mul; - } - new_roles.merge_raw(node, *ts, &role); - } - new_roles - } -} - -mod v010 { - use super::v09; - use crate::layout::CompactNodeType; - use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::Uuid; - use serde::{Deserialize, Serialize}; - use std::collections::BTreeMap; - pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; - - pub const OLD_VERSION_COUNT: usize = 5; - - /// The history of cluster layouts, with trackers to keep a record - /// of which nodes are up-to-date to current cluster data - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutHistory { - /// The versions currently in use in the cluster - pub versions: Vec<LayoutVersion>, - /// At most 5 of the previous versions, not used by the garage_table - /// module, but usefull for the garage_block module to find data blocks - /// that have not yet been moved - pub old_versions: Vec<LayoutVersion>, - - /// Update trackers - pub update_trackers: UpdateTrackers, - - /// Staged changes for the next version - pub staging: Lww<LayoutStaging>, - } - - /// A version of the layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutVersion { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, - - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// see comment in v08::ClusterLayout - pub node_id_vec: Vec<Uuid>, - /// number of non-gateway nodes, which are the first ids in node_id_vec - pub nongateway_node_count: usize, - /// see comment in v08::ClusterLayout - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec<CompactNodeType>, - } - - /// The staged changes for the next layout version - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] - pub struct LayoutStaging { - /// Parameters to be used in the next partition assignment computation. - pub parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub roles: LwwMap<Uuid, NodeRoleV>, - } - - /// The tracker of acknowlegments and data syncs around the cluster - #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTrackers { - /// The highest layout version number each node has ack'ed - pub ack_map: UpdateTracker, - /// The highest layout version number each node has synced data for - pub sync_map: UpdateTracker, - /// The highest layout version number each node has - /// ack'ed that all other nodes have synced data for - pub sync_ack_map: UpdateTracker, - } - - /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker(pub BTreeMap<Uuid, u64>); - - impl garage_util::migrate::Migrate for LayoutHistory { - const VERSION_MARKER: &'static [u8] = b"G010lh"; - - type Previous = v09::ClusterLayout; - - fn migrate(previous: Self::Previous) -> Self { - let nongateway_node_count = previous - .node_id_vec - .iter() - .enumerate() - .filter(|(_, uuid)| { - let role = previous.roles.get(uuid); - matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) - }) - .map(|(i, _)| i + 1) - .max() - .unwrap_or(0); - - let version = LayoutVersion { - version: previous.version, - replication_factor: previous.replication_factor, - partition_size: previous.partition_size, - parameters: previous.parameters, - roles: previous.roles, - node_id_vec: previous.node_id_vec, - nongateway_node_count, - ring_assignment_data: previous.ring_assignment_data, - }; - let update_tracker = UpdateTracker( - version - .nongateway_nodes() - .iter() - .copied() - .map(|x| (x, version.version)) - .collect::<BTreeMap<Uuid, u64>>(), - ); - let staging = LayoutStaging { - parameters: previous.staging_parameters, - roles: previous.staging_roles, - }; - Self { - versions: vec![version], - old_versions: vec![], - update_trackers: UpdateTrackers { - ack_map: update_tracker.clone(), - sync_map: update_tracker.clone(), - sync_ack_map: update_tracker.clone(), - }, - staging: Lww::raw(previous.version, staging), - } - } - } -} - -pub use v010::*; - -// ---- utility functions ---- - -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for LayoutStaging { - fn merge(&mut self, other: &LayoutStaging) { - self.parameters.merge(&other.parameters); - self.roles.merge(&other.roles); - } -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::<usize>() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) -> bool { - let mut changed = false; - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - if *v > *v_mut { - *v_mut = *v; - changed = true; - } - } else { - self.0.insert(*k, *v); - changed = true; - } - } - changed - } - - /// This bumps the update tracker for a given node up to the specified value. - /// This has potential impacts on the correctness of Garage and should only - /// be used in very specific circumstances. - pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.0.get_mut(&peer) { - Some(e) if *e < value => { - *e = value; - true - } - None => { - self.0.insert(peer, value); - true - } - _ => false, - } - } - - pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { - storage_nodes - .iter() - .map(|x| self.get(x, min_version)) - .min() - .unwrap_or(min_version) - } - - pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { - self.0.get(node).copied().unwrap_or(min_version) - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { - let c1 = self.ack_map.merge(&other.ack_map); - let c2 = self.sync_map.merge(&other.sync_map); - let c3 = self.sync_ack_map.merge(&other.sync_ack_map); - c1 || c2 || c3 - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index cbfbee94..5b307156 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -10,7 +10,6 @@ use garage_util::data::*; use garage_util::error::*; use super::graph_algo::*; -use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. |