diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-09 11:19:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-09 11:19:43 +0100 |
commit | 523d2ecb9511f74e144cd116b942d6c1bf0f546d (patch) | |
tree | 7ba0323fb691eac4f05308676cd24771a8a6a8bb /src/rpc | |
parent | 1da0a5676edcd20fc5c7412596edb5772da9f606 (diff) | |
download | garage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.tar.gz garage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.zip |
layout: use separate CRDT for staged layout changes
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout/graph_algo.rs (renamed from src/rpc/graph_algo.rs) | 10 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 82 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 4 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 106 | ||||
-rw-r--r-- | src/rpc/layout/tracker.rs | 21 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 54 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 |
7 files changed, 130 insertions, 148 deletions
diff --git a/src/rpc/graph_algo.rs b/src/rpc/layout/graph_algo.rs index d8c6c9b9..bd33e97f 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/layout/graph_algo.rs @@ -114,16 +114,6 @@ impl Graph<FlowEdge> { Ok(result) } - /// This function returns the value of the flow incoming to v. - pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> { - let idv = self.get_vertex_id(&v)?; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0, self.graph[edge.dest][edge.rev].flow); - } - Ok(result) - } - /// This function returns the value of the flow outgoing from v. pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> { let idv = self.get_vertex_id(&v)?; diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e59c9e9c..9ae28887 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -12,14 +10,15 @@ impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); - let staging_parameters = Lww::<LayoutParameters>::new(version.parameters); - let empty_lwwmap = LwwMap::new(); + let staging = LayoutStaging { + parameters: Lww::<LayoutParameters>::new(version.parameters), + roles: LwwMap::new(), + }; let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), - staging_parameters, - staging_roles: empty_lwwmap, + staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -31,8 +30,7 @@ impl LayoutHistory { } pub(crate) fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } // ================== updates to layout, public interface =================== @@ -41,26 +39,10 @@ impl LayoutHistory { let mut changed = false; // Merge staged layout changes - match other.current().version.cmp(&self.current().version) { - Ordering::Greater => { - self.staging_parameters = other.staging_parameters.clone(); - self.staging_roles = other.staging_roles.clone(); - self.staging_hash = other.staging_hash; - changed = true; - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - if new_staging_hash != self.staging_hash { - changed = true; - } - - self.staging_hash = new_staging_hash; - } - Ordering::Less => (), + if self.staging != other.staging { + changed = true; } + self.staging.merge(&other.staging); // Add any new versions to history for v2 in other.versions.iter() { @@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + // Compute new version and add it to history let mut new_version = self.current().clone(); new_version.version += 1; - new_version.roles.merge(&self.staging_roles); + new_version.roles.merge(&self.staging.get().roles); new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); + new_version.parameters = *self.staging.get().parameters.get(); let msg = new_version.calculate_partition_assignment()?; - self.versions.push(new_version); + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + self.staging_hash = self.calculate_staging_hash(); + Ok((self, msg)) } - pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.current().parameters); + pub fn revert_staged_changes(mut self) -> Result<Self, Error> { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters.clone()), + roles: LwwMap::new(), + }); self.staging_hash = self.calculate_staging_hash(); - // TODO this is stupid, we should have a separate version counter/LWW - // for the staging params - let mut new_version = self.current().clone(); - new_version.version += 1; - - self.versions.push(new_version); - Ok(self) } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 122d4b65..7c15988a 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,8 +1,10 @@ +mod graph_algo; mod history; mod schema; -mod tracker; mod version; +// ---- re-exports ---- + pub use history::*; pub use schema::*; pub use version::*; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 14e797be..c5b9b1d3 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -1,3 +1,9 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; + mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; @@ -210,6 +216,15 @@ mod v010 { pub ring_assignment_data: Vec<CompactNodeType>, } + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap<Uuid, NodeRoleV>, + } + /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { @@ -219,10 +234,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, + /// Staged changes for the next version + pub staging: Lww<LayoutStaging>, /// Hash of the serialized staging_parameters + staging_roles pub staging_hash: Hash, } @@ -265,6 +278,10 @@ mod v010 { .map(|x| (*x, version.version)) .collect::<HashMap<Uuid, u64>>(), ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; let mut ret = Self { versions: vec![version], update_trackers: UpdateTrackers { @@ -272,8 +289,7 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - staging_parameters: previous.staging_parameters, - staging_roles: previous.staging_roles, + staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -283,3 +299,81 @@ mod v010 { } pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::<usize>() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } + + pub(crate) fn min(&self) -> u64 { + self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs deleted file mode 100644 index 778121e4..00000000 --- a/src/rpc/layout/tracker.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::*; - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) { - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - *v_mut = std::cmp::max(*v_mut, *v); - } else { - self.0.insert(*k, *v); - } - } - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) { - self.ack_map.merge(&other.ack_map); - self.sync_map.merge(&other.sync_map); - self.sync_ack_map.merge(&other.sync_ack_map); - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 363bc204..6918fdf9 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -1,69 +1,21 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::fmt; +use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::{AutoCrdt, LwwMap}; +use garage_util::crdt::LwwMap; use garage_util::data::*; use garage_util::error::*; -use crate::graph_algo::*; - -use std::convert::TryInto; - +use super::graph_algo::*; use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. pub type Message = Vec<String>; -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::<usize>() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - impl LayoutVersion { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be Maximum, meaning that the maximum diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 1af8b78e..b5b31c05 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -11,7 +11,6 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; -pub mod graph_algo; pub mod layout; pub mod replication_mode; pub mod system; |