aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
commitfe9af1dcaae31a117528a9cfa10c422c9a850201 (patch)
tree6e43dbb97d37d48f6af5398b4d067747e652108c /src/rpc/layout/history.rs
parent4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 (diff)
downloadgarage-fe9af1dcaae31a117528a9cfa10c422c9a850201.tar.gz
garage-fe9af1dcaae31a117528a9cfa10c422c9a850201.zip
WIP: garage_rpc: store layout version history
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
new file mode 100644
index 00000000..b3019f58
--- /dev/null
+++ b/src/rpc/layout/history.rs
@@ -0,0 +1,170 @@
+use std::cmp::Ordering;
+use std::sync::Arc;
+
+use garage_util::crdt::{Crdt, Lww, LwwMap};
+use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
+use garage_util::error::*;
+
+use super::schema::*;
+use super::*;
+
+impl LayoutHistory {
+ pub fn new(replication_factor: usize) -> Self {
+ let version = LayoutVersion::new(replication_factor);
+
+ let staging_parameters = Lww::<LayoutParameters>::new(version.parameters);
+ let empty_lwwmap = LwwMap::new();
+
+ let mut ret = LayoutHistory {
+ versions: vec![version].into_boxed_slice().into(),
+ update_trackers: Default::default(),
+ staging_parameters,
+ staging_roles: empty_lwwmap,
+ staging_hash: [0u8; 32].into(),
+ };
+ ret.staging_hash = ret.calculate_staging_hash();
+ ret
+ }
+
+ pub fn current(&self) -> &LayoutVersion {
+ self.versions.last().as_ref().unwrap()
+ }
+
+ pub(crate) fn calculate_staging_hash(&self) -> Hash {
+ let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
+ blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
+ }
+
+ // ================== updates to layout, public interface ===================
+
+ pub fn merge(&mut self, other: &LayoutHistory) -> bool {
+ let mut changed = false;
+
+ // Merge staged layout changes
+ match other.current().version.cmp(&self.current().version) {
+ Ordering::Greater => {
+ self.staging_parameters = other.staging_parameters.clone();
+ self.staging_roles = other.staging_roles.clone();
+ self.staging_hash = other.staging_hash;
+ changed = true;
+ }
+ Ordering::Equal => {
+ self.staging_parameters.merge(&other.staging_parameters);
+ self.staging_roles.merge(&other.staging_roles);
+
+ let new_staging_hash = self.calculate_staging_hash();
+ if new_staging_hash != self.staging_hash {
+ changed = true;
+ }
+
+ self.staging_hash = new_staging_hash;
+ }
+ Ordering::Less => (),
+ }
+
+ // Add any new versions to history
+ let mut versions = self.versions.to_vec();
+ for v2 in other.versions.iter() {
+ if let Some(v1) = versions.iter().find(|v| v.version == v2.version) {
+ if v1 != v2 {
+ error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
+ }
+ } else if versions.iter().all(|v| v.version != v2.version - 1) {
+ error!(
+ "Cannot receive new layout version {}, version {} is missing",
+ v2.version,
+ v2.version - 1
+ );
+ } else {
+ versions.push(v2.clone());
+ changed = true;
+ }
+ }
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ // Merge trackers
+ self.update_trackers.merge(&other.update_trackers);
+
+ changed
+ }
+
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.current().version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ let mut new_version = self.current().clone();
+ new_version.version += 1;
+
+ new_version.roles.merge(&self.staging_roles);
+ new_version.roles.retain(|(_, _, v)| v.0.is_some());
+ new_version.parameters = *self.staging_parameters.get();
+
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
+
+ let msg = new_version.calculate_partition_assignment()?;
+
+ let mut versions = self.versions.to_vec();
+ versions.push(new_version);
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ Ok((self, msg))
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.current().version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging_roles.clear();
+ self.staging_parameters.update(self.current().parameters);
+ self.staging_hash = self.calculate_staging_hash();
+
+ // TODO this is stupid, we should have a separate version counter/LWW
+ // for the staging params
+ let mut new_version = self.current().clone();
+ new_version.version += 1;
+
+ let mut versions = self.versions.to_vec();
+ versions.push(new_version);
+ self.versions = Arc::from(versions.into_boxed_slice());
+
+ Ok(self)
+ }
+
+ pub fn check(&self) -> Result<(), String> {
+ // Check that the hash of the staging data is correct
+ let staging_hash = self.calculate_staging_hash();
+ if staging_hash != self.staging_hash {
+ return Err("staging_hash is incorrect".into());
+ }
+
+ // TODO: anythign more ?
+
+ self.current().check()
+ }
+}