diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-08 17:49:06 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-08 17:49:06 +0100 |
commit | fe9af1dcaae31a117528a9cfa10c422c9a850201 (patch) | |
tree | 6e43dbb97d37d48f6af5398b4d067747e652108c /src | |
parent | 4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 (diff) | |
download | garage-fe9af1dcaae31a117528a9cfa10c422c9a850201.tar.gz garage-fe9af1dcaae31a117528a9cfa10c422c9a850201.zip |
WIP: garage_rpc: store layout version history
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/layout/history.rs | 170 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 32 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 286 | ||||
-rw-r--r-- | src/rpc/layout/tracker.rs | 21 | ||||
-rw-r--r-- | src/rpc/layout/version.rs (renamed from src/rpc/layout.rs) | 330 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 12 | ||||
-rw-r--r-- | src/rpc/system.rs | 44 |
7 files changed, 550 insertions, 345 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs new file mode 100644 index 00000000..b3019f58 --- /dev/null +++ b/src/rpc/layout/history.rs @@ -0,0 +1,170 @@ +use std::cmp::Ordering; +use std::sync::Arc; + +use garage_util::crdt::{Crdt, Lww, LwwMap}; +use garage_util::data::*; +use garage_util::encode::nonversioned_encode; +use garage_util::error::*; + +use super::schema::*; +use super::*; + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging_parameters = Lww::<LayoutParameters>::new(version.parameters); + let empty_lwwmap = LwwMap::new(); + + let mut ret = LayoutHistory { + versions: vec![version].into_boxed_slice().into(), + update_trackers: Default::default(), + staging_parameters, + staging_roles: empty_lwwmap, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() + } + + pub(crate) fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + } + + // ================== updates to layout, public interface =================== + + pub fn merge(&mut self, other: &LayoutHistory) -> bool { + let mut changed = false; + + // Merge staged layout changes + match other.current().version.cmp(&self.current().version) { + Ordering::Greater => { + self.staging_parameters = other.staging_parameters.clone(); + self.staging_roles = other.staging_roles.clone(); + self.staging_hash = other.staging_hash; + changed = true; + } + Ordering::Equal => { + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); + + let new_staging_hash = self.calculate_staging_hash(); + if new_staging_hash != self.staging_hash { + changed = true; + } + + self.staging_hash = new_staging_hash; + } + Ordering::Less => (), + } + + // Add any new versions to history + let mut versions = self.versions.to_vec(); + for v2 in other.versions.iter() { + if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if v1 != v2 { + error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); + } + } else if versions.iter().all(|v| v.version != v2.version - 1) { + error!( + "Cannot receive new layout version {}, version {} is missing", + v2.version, + v2.version - 1 + ); + } else { + versions.push(v2.clone()); + changed = true; + } + } + self.versions = Arc::from(versions.into_boxed_slice()); + + // Merge trackers + self.update_trackers.merge(&other.update_trackers); + + changed + } + + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + let mut new_version = self.current().clone(); + new_version.version += 1; + + new_version.roles.merge(&self.staging_roles); + new_version.roles.retain(|(_, _, v)| v.0.is_some()); + new_version.parameters = *self.staging_parameters.get(); + + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); + + let msg = new_version.calculate_partition_assignment()?; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok((self, msg)) + } + + pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging_roles.clear(); + self.staging_parameters.update(self.current().parameters); + self.staging_hash = self.calculate_staging_hash(); + + // TODO this is stupid, we should have a separate version counter/LWW + // for the staging params + let mut new_version = self.current().clone(); + new_version.version += 1; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok(self) + } + + pub fn check(&self) -> Result<(), String> { + // Check that the hash of the staging data is correct + let staging_hash = self.calculate_staging_hash(); + if staging_hash != self.staging_hash { + return Err("staging_hash is incorrect".into()); + } + + // TODO: anythign more ? + + self.current().check() + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs new file mode 100644 index 00000000..122d4b65 --- /dev/null +++ b/src/rpc/layout/mod.rs @@ -0,0 +1,32 @@ +mod history; +mod schema; +mod tracker; +mod version; + +pub use history::*; +pub use schema::*; +pub use version::*; + +// ---- defines: partitions ---- + +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 +pub const PARTITION_BITS: usize = 8; + +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + +// ---- defines: nodes ---- + +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs new file mode 100644 index 00000000..fa0822fa --- /dev/null +++ b/src/rpc/layout/schema.rs @@ -0,0 +1,286 @@ +mod v08 { + use crate::layout::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option<NodeRole>); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u64>, + /// A set of tags to recognize the node + pub tags: Vec<String>, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} +} + +mod v09 { + use super::v08; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec<Uuid>, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"G09layout"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), // will be set in the next migration + } + } + } + + fn multiply_all_capacities( + old_roles: LwwMap<Uuid, NodeRoleV>, + mul: u64, + ) -> LwwMap<Uuid, NodeRoleV> { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap *= mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } +} + +mod v010 { + use super::v09; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + use std::sync::Arc; + pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutVersion { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec<Uuid>, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + } + + /// The history of cluster layouts + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Arc<[LayoutVersion]>, + + /// Update trackers + pub update_trackers: UpdateTrackers, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + /// Hash of the serialized staging_parameters + staging_roles + pub staging_hash: Hash, + } + + /// The tracker of acknowlegments and data syncs around the cluster + #[derive(Clone, Debug, Serialize, Deserialize, Default)] + pub struct UpdateTrackers { + /// The highest layout version number each node has ack'ed + pub ack_map: UpdateTracker, + /// The highest layout version number each node has synced data for + pub sync_map: UpdateTracker, + /// The highest layout version number each node has + /// ack'ed that all other nodes have synced data for + pub sync_ack_map: UpdateTracker, + } + + /// The history of cluster layouts + #[derive(Clone, Debug, Serialize, Deserialize, Default)] + pub struct UpdateTracker(pub HashMap<Uuid, u64>); + + impl garage_util::migrate::Migrate for LayoutHistory { + const VERSION_MARKER: &'static [u8] = b"G010lh"; + + type Previous = v09::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + let version = LayoutVersion { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size: previous.partition_size, + parameters: previous.parameters, + roles: previous.roles, + node_id_vec: previous.node_id_vec, + ring_assignment_data: previous.ring_assignment_data, + }; + let update_tracker = UpdateTracker( + version + .nongateway_nodes() + .iter() + .map(|x| (*x, version.version)) + .collect::<HashMap<Uuid, u64>>(), + ); + let mut ret = Self { + versions: Arc::from(vec![version].into_boxed_slice()), + update_trackers: UpdateTrackers { + ack_map: update_tracker.clone(), + sync_map: update_tracker.clone(), + sync_ack_map: update_tracker.clone(), + }, + staging_parameters: previous.staging_parameters, + staging_roles: previous.staging_roles, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + } +} + +pub use v010::*; diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs new file mode 100644 index 00000000..778121e4 --- /dev/null +++ b/src/rpc/layout/tracker.rs @@ -0,0 +1,21 @@ +use super::*; + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout.rs b/src/rpc/layout/version.rs index 2b5b6606..363bc204 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout/version.rs @@ -1,4 +1,3 @@ -use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; use std::fmt; @@ -6,227 +5,20 @@ use std::fmt; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; +use garage_util::crdt::{AutoCrdt, LwwMap}; use garage_util::data::*; -use garage_util::encode::nonversioned_encode; use garage_util::error::*; use crate::graph_algo::*; use std::convert::TryInto; -// ---- defines: partitions ---- - -/// A partition id, which is stored on 16 bits -/// i.e. we have up to 2**16 partitions. -/// (in practice we have exactly 2**PARTITION_BITS partitions) -pub type Partition = u16; - -// TODO: make this constant parametrizable in the config file -// For deployments with many nodes it might make sense to bump -// it up to 10. -// Maximum value : 16 -/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in -/// presence of numerous nodes, but exponentially bigger ring. Max 16 -pub const PARTITION_BITS: usize = 8; - -const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; - -// ---- defines: nodes ---- - -// Type to store compactly the id of a node in the system -// Change this to u16 the day we want to have more than 256 nodes in a cluster -pub type CompactNodeType = u8; -pub const MAX_NODE_NUMBER: usize = 256; - -// ---- defines: other ---- +use super::schema::*; +use super::*; // The Message type will be used to collect information on the algorithm. pub type Message = Vec<String>; -mod v08 { - use super::CompactNodeType; - use garage_util::crdt::LwwMap; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec<Uuid>, - /// the assignation of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignation_data: Vec<CompactNodeType>, - - /// Role changes which are staged for the next version of the layout - pub staging: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, - } - - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRoleV(pub Option<NodeRole>); - - /// The user-assigned roles of cluster nodes - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option<u64>, - /// A set of tags to recognize the node - pub tags: Vec<String>, - } - - impl garage_util::migrate::InitialFormat for ClusterLayout {} -} - -mod v09 { - use super::v08; - use super::CompactNodeType; - use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::{Hash, Uuid}; - use serde::{Deserialize, Serialize}; - pub use v08::{NodeRole, NodeRoleV}; - - /// The layout of the cluster, i.e. the list of roles - /// which are assigned to each cluster node - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, - - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// see comment in v08::ClusterLayout - pub node_id_vec: Vec<Uuid>, - /// see comment in v08::ClusterLayout - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec<CompactNodeType>, - - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, - } - - /// This struct is used to set the parameters to be used in the assignment computation - /// algorithm. It is stored as a Crdt. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub struct LayoutParameters { - pub zone_redundancy: ZoneRedundancy, - } - - /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies - /// of each partition on at least that number of different zones. - /// Otherwise, copies will be stored on the maximum possible number of zones. - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] - pub enum ZoneRedundancy { - AtLeast(usize), - Maximum, - } - - impl garage_util::migrate::Migrate for ClusterLayout { - const VERSION_MARKER: &'static [u8] = b"G09layout"; - - type Previous = v08::ClusterLayout; - - fn migrate(previous: Self::Previous) -> Self { - use itertools::Itertools; - - // In the old layout, capacities are in an arbitrary unit, - // but in the new layout they are in bytes. - // Here we arbitrarily multiply everything by 1G, - // such that 1 old capacity unit = 1GB in the new units. - // This is totally arbitrary and won't work for most users. - let cap_mul = 1024 * 1024 * 1024; - let roles = multiply_all_capacities(previous.roles, cap_mul); - let staging_roles = multiply_all_capacities(previous.staging, cap_mul); - let node_id_vec = previous.node_id_vec; - - // Determine partition size - let mut tmp = previous.ring_assignation_data.clone(); - tmp.sort(); - let partition_size = tmp - .into_iter() - .dedup_with_count() - .map(|(npart, node)| { - roles - .get(&node_id_vec[node as usize]) - .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) - .unwrap_or(0) / npart as u64 - }) - .min() - .unwrap_or(0); - - // By default, zone_redundancy is maximum possible value - let parameters = LayoutParameters { - zone_redundancy: ZoneRedundancy::Maximum, - }; - - let mut res = Self { - version: previous.version, - replication_factor: previous.replication_factor, - partition_size, - parameters, - roles, - node_id_vec, - ring_assignment_data: previous.ring_assignation_data, - staging_parameters: Lww::new(parameters), - staging_roles, - staging_hash: [0u8; 32].into(), - }; - res.staging_hash = res.calculate_staging_hash(); - res - } - } - - fn multiply_all_capacities( - old_roles: LwwMap<Uuid, NodeRoleV>, - mul: u64, - ) -> LwwMap<Uuid, NodeRoleV> { - let mut new_roles = LwwMap::new(); - for (node, ts, role) in old_roles.items() { - let mut role = role.clone(); - if let NodeRoleV(Some(NodeRole { - capacity: Some(ref mut cap), - .. - })) = role - { - *cap *= mul; - } - new_roles.merge_raw(node, *ts, &role); - } - new_roles - } -} - -pub use v09::*; - impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } @@ -272,19 +64,15 @@ impl core::str::FromStr for ZoneRedundancy { } } -// Implementation of the ClusterLayout methods unrelated to the assignment algorithm. -impl ClusterLayout { +impl LayoutVersion { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be Maximum, meaning that the maximum // possible value will be used depending on the cluster topology let parameters = LayoutParameters { zone_redundancy: ZoneRedundancy::Maximum, }; - let staging_parameters = Lww::<LayoutParameters>::new(parameters); - let empty_lwwmap = LwwMap::new(); - - let mut ret = ClusterLayout { + LayoutVersion { version: 0, replication_factor, partition_size: 0, @@ -292,12 +80,7 @@ impl ClusterLayout { node_id_vec: Vec::new(), ring_assignment_data: Vec::new(), parameters, - staging_parameters, - staging_roles: empty_lwwmap, - staging_hash: [0u8; 32].into(), - }; - ret.staging_hash = ret.calculate_staging_hash(); - ret + } } // ===================== accessors ====================== @@ -399,7 +182,7 @@ impl ClusterLayout { // ===================== internal information extractors ====================== /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - fn nongateway_nodes(&self) -> Vec<Uuid> { + pub(crate) fn nongateway_nodes(&self) -> Vec<Uuid> { let mut result = Vec::<Uuid>::new(); for uuid in self.node_id_vec.iter() { match self.node_role(uuid) { @@ -446,99 +229,10 @@ impl ClusterLayout { } } - fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) - } - - // ================== updates to layout, public interface =================== - - pub fn merge(&mut self, other: &ClusterLayout) -> bool { - match other.version.cmp(&self.version) { - Ordering::Greater => { - *self = other.clone(); - true - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - let changed = new_staging_hash != self.staging_hash; - - self.staging_hash = new_staging_hash; - - changed - } - Ordering::Less => false, - } - } - - pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.roles.merge(&self.staging_roles); - self.roles.retain(|(_, _, v)| v.0.is_some()); - self.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); - - let msg = self.calculate_partition_assignment()?; - - self.version += 1; - - Ok((self, msg)) - } - - pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.parameters); - self.staging_hash = self.calculate_staging_hash(); - - self.version += 1; - - Ok(self) - } - /// Check a cluster layout for internal consistency /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> Result<(), String> { - // Check that the hash of the staging data is correct - let staging_hash = self.calculate_staging_hash(); - if staging_hash != self.staging_hash { - return Err("staging_hash is incorrect".into()); - } - // Check that node_id_vec contains the correct list of nodes let mut expected_nodes = self .roles @@ -654,7 +348,7 @@ To know the correct value of the new layout version, invoke `garage layout show` /// data to be moved. /// Staged role changes must be merged with nodes roles before calling this function, /// hence it must only be called from apply_staged_changes() and hence is not public. - fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { + pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { // We update the node ids, since the node role list might have changed with the // changes in the layout. We retrieve the old_assignment reframed with new ids let old_assignment_opt = self.update_node_id_vec()?; @@ -911,7 +605,7 @@ To know the correct value of the new layout version, invoke `garage layout show` zone_redundancy: usize, ) -> Result<Graph<FlowEdge>, Error> { let vertices = - ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); let mut g = Graph::<FlowEdge>::new(&vertices); let nb_zones = zone_to_id.len(); for p in 0..NB_PARTITIONS { @@ -1214,7 +908,7 @@ mod tests { // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) // With these parameters, the naive algo fails, whereas there is a solution: // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) - fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> { + fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> { let over_size = cl.partition_size + 1; let mut zone_token = HashMap::<String, usize>::new(); @@ -1280,7 +974,7 @@ mod tests { } fn update_layout( - cl: &mut ClusterLayout, + cl: &mut LayoutVersion, node_id_vec: &Vec<u8>, node_capacity_vec: &Vec<u64>, node_zone_vec: &Vec<String>, @@ -1316,7 +1010,7 @@ mod tests { .map(|x| x.to_string()) .collect(); - let mut cl = ClusterLayout::new(3); + let mut cl = LayoutVersion::new(3); update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); let v = cl.version; let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 56bef2f3..3fdb4acd 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,7 +26,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; -use crate::layout::ClusterLayout; +use crate::layout::LayoutHistory; use crate::metrics::RpcMetrics; // Default RPC timeout = 5 minutes @@ -91,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - layout_watch: watch::Receiver<Arc<ClusterLayout>>, + layout_watch: watch::Receiver<Arc<LayoutHistory>>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +100,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - layout_watch: watch::Receiver<Arc<ClusterLayout>>, + layout_watch: watch::Receiver<Arc<LayoutHistory>>, rpc_timeout: Option<Duration>, ) -> Self { let metrics = RpcMetrics::new(); @@ -392,8 +392,8 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let layout: Arc<ClusterLayout> = self.0.layout_watch.borrow().clone(); - let our_zone = match layout.node_role(&self.0.our_node_id) { + let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone(); + let our_zone = match layout.current().node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", }; @@ -407,7 +407,7 @@ impl RpcHelper { let mut nodes = nodes .iter() .map(|to| { - let peer_zone = match layout.node_role(to) { + let peer_zone = match layout.current().node_role(to) { Some(pc) => &pc.zone, None => "", }; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 93144e39..86d724f1 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -64,7 +64,7 @@ pub enum SystemRpc { /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(ClusterLayout), + AdvertiseClusterLayout(LayoutHistory), /// Get known nodes states GetKnownNodes, /// Return known nodes @@ -84,7 +84,7 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_cluster_layout: Persister<ClusterLayout>, + persist_cluster_layout: Persister<LayoutHistory>, persist_peer_list: Persister<PeerList>, local_status: ArcSwap<NodeStatus>, @@ -112,8 +112,8 @@ pub struct System { replication_factor: usize, /// The layout - pub layout_watch: watch::Receiver<Arc<ClusterLayout>>, - update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>, + pub layout_watch: watch::Receiver<Arc<LayoutHistory>>, + update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -256,16 +256,16 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout: Persister<ClusterLayout> = + let persist_cluster_layout: Persister<LayoutHistory> = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.replication_factor != replication_factor { + if x.current().replication_factor != replication_factor { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", - x.replication_factor, + x.current().replication_factor, replication_factor ))); } @@ -276,7 +276,7 @@ impl System { "No valid previous cluster layout stored ({}), starting fresh.", e ); - ClusterLayout::new(replication_factor) + LayoutHistory::new(replication_factor) } }; @@ -423,13 +423,13 @@ impl System { known_nodes } - pub fn cluster_layout(&self) -> watch::Ref<Arc<ClusterLayout>> { + pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> { self.layout_watch.borrow() } pub async fn update_cluster_layout( self: &Arc<Self>, - layout: &ClusterLayout, + layout: &LayoutHistory, ) -> Result<(), Error> { self.handle_advertise_cluster_layout(layout).await?; Ok(()) @@ -475,7 +475,9 @@ impl System { .collect::<HashMap<Uuid, _>>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + // TODO: not only layout.current() let storage_nodes = layout + .current() .roles .items() .iter() @@ -486,11 +488,11 @@ impl System { .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count(); - let partitions = layout.partitions(); + let partitions = layout.current().partitions(); let partitions_n_up = partitions .iter() .map(|(_, h)| { - let pn = layout.nodes_of(h, layout.replication_factor); + let pn = layout.current().nodes_of(h, replication_factor); pn.iter() .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() @@ -581,7 +583,7 @@ impl System { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone(); + let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone(); self.persist_cluster_layout .save_async(&layout) .await @@ -593,7 +595,7 @@ impl System { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let layout = self.layout_watch.borrow(); - new_si.cluster_layout_version = layout.version; + new_si.cluster_layout_version = layout.current().version; new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -648,12 +650,12 @@ impl System { async fn handle_advertise_cluster_layout( self: &Arc<Self>, - adv: &ClusterLayout, + adv: &LayoutHistory, ) -> Result<SystemRpc, Error> { - if adv.replication_factor != self.replication_factor { + if adv.current().replication_factor != self.replication_factor { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", - adv.replication_factor, + adv.current().replication_factor, self.replication_factor ); error!("{}", msg); @@ -662,7 +664,7 @@ impl System { let update_layout = self.update_layout.lock().await; // TODO: don't clone each time an AdvertiseClusterLayout is received - let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone(); + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { @@ -724,7 +726,7 @@ impl System { while !*stop_signal.borrow() { let not_configured = self.layout_watch.borrow().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.layout_watch.borrow().num_nodes(); + let expected_n_nodes = self.layout_watch.borrow().current().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() @@ -863,13 +865,13 @@ impl EndpointHandler<SystemRpc> for System { } impl NodeStatus { - fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self { NodeStatus { hostname: gethostname::gethostname() .into_string() .unwrap_or_else(|_| "<invalid utf-8>".to_string()), replication_factor, - cluster_layout_version: layout.version, + cluster_layout_version: layout.current().version, cluster_layout_staging_hash: layout.staging_hash, meta_disk_avail: None, data_disk_avail: None, |