aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/schema.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-09 11:19:43 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-09 11:19:43 +0100
commit523d2ecb9511f74e144cd116b942d6c1bf0f546d (patch)
tree7ba0323fb691eac4f05308676cd24771a8a6a8bb /src/rpc/layout/schema.rs
parent1da0a5676edcd20fc5c7412596edb5772da9f606 (diff)
downloadgarage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.tar.gz
garage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.zip
layout: use separate CRDT for staged layout changes
Diffstat (limited to 'src/rpc/layout/schema.rs')
-rw-r--r--src/rpc/layout/schema.rs106
1 files changed, 100 insertions, 6 deletions
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index 14e797be..c5b9b1d3 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -1,3 +1,9 @@
+use std::fmt;
+
+use bytesize::ByteSize;
+
+use garage_util::crdt::{AutoCrdt, Crdt};
+
mod v08 {
use crate::layout::CompactNodeType;
use garage_util::crdt::LwwMap;
@@ -210,6 +216,15 @@ mod v010 {
pub ring_assignment_data: Vec<CompactNodeType>,
}
+ /// The staged changes for the next layout version
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutStaging {
+ /// Parameters to be used in the next partition assignment computation.
+ pub parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ }
+
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LayoutHistory {
@@ -219,10 +234,8 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
- /// Parameters to be used in the next partition assignment computation.
- pub staging_parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ /// Staged changes for the next version
+ pub staging: Lww<LayoutStaging>,
/// Hash of the serialized staging_parameters + staging_roles
pub staging_hash: Hash,
}
@@ -265,6 +278,10 @@ mod v010 {
.map(|x| (*x, version.version))
.collect::<HashMap<Uuid, u64>>(),
);
+ let staging = LayoutStaging {
+ parameters: previous.staging_parameters,
+ roles: previous.staging_roles,
+ };
let mut ret = Self {
versions: vec![version],
update_trackers: UpdateTrackers {
@@ -272,8 +289,7 @@ mod v010 {
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
- staging_parameters: previous.staging_parameters,
- staging_roles: previous.staging_roles,
+ staging: Lww::raw(previous.version, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
@@ -283,3 +299,81 @@ mod v010 {
}
pub use v010::*;
+
+// ---- utility functions ----
+
+impl AutoCrdt for LayoutParameters {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for NodeRoleV {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for LayoutStaging {
+ fn merge(&mut self, other: &LayoutStaging) {
+ self.parameters.merge(&other.parameters);
+ self.roles.merge(&other.roles);
+ }
+}
+
+impl NodeRole {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => ByteSize::b(c).to_string_as(false),
+ None => "gateway".to_string(),
+ }
+ }
+
+ pub fn tags_string(&self) -> String {
+ self.tags.join(",")
+ }
+}
+
+impl fmt::Display for ZoneRedundancy {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ ZoneRedundancy::Maximum => write!(f, "maximum"),
+ ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
+ }
+ }
+}
+
+impl core::str::FromStr for ZoneRedundancy {
+ type Err = &'static str;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
+ x => {
+ let v = x
+ .parse::<usize>()
+ .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
+ Ok(ZoneRedundancy::AtLeast(v))
+ }
+ }
+ }
+}
+
+impl UpdateTracker {
+ fn merge(&mut self, other: &UpdateTracker) {
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
+ *v_mut = std::cmp::max(*v_mut, *v);
+ } else {
+ self.0.insert(*k, *v);
+ }
+ }
+ }
+
+ pub(crate) fn min(&self) -> u64 {
+ self.0.iter().map(|(_, v)| *v).min().unwrap_or(0)
+ }
+}
+
+impl UpdateTrackers {
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
+ self.ack_map.merge(&other.ack_map);
+ self.sync_map.merge(&other.sync_map);
+ self.sync_ack_map.merge(&other.sync_ack_map);
+ }
+}