diff options
Diffstat (limited to 'src/rpc/layout/schema.rs')
-rw-r--r-- | src/rpc/layout/schema.rs | 106 |
1 files changed, 100 insertions, 6 deletions
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 14e797be..c5b9b1d3 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -1,3 +1,9 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; + mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; @@ -210,6 +216,15 @@ mod v010 { pub ring_assignment_data: Vec<CompactNodeType>, } + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap<Uuid, NodeRoleV>, + } + /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { @@ -219,10 +234,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, + /// Staged changes for the next version + pub staging: Lww<LayoutStaging>, /// Hash of the serialized staging_parameters + staging_roles pub staging_hash: Hash, } @@ -265,6 +278,10 @@ mod v010 { .map(|x| (*x, version.version)) .collect::<HashMap<Uuid, u64>>(), ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; let mut ret = Self { versions: vec![version], update_trackers: UpdateTrackers { @@ -272,8 +289,7 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - staging_parameters: previous.staging_parameters, - staging_roles: previous.staging_roles, + staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -283,3 +299,81 @@ mod v010 { } pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::<usize>() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } + + pub(crate) fn min(&self) -> u64 { + self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} |