From fe9af1dcaae31a117528a9cfa10c422c9a850201 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 17:49:06 +0100 Subject: WIP: garage_rpc: store layout version history --- src/rpc/layout/history.rs | 170 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 src/rpc/layout/history.rs (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs new file mode 100644 index 00000000..b3019f58 --- /dev/null +++ b/src/rpc/layout/history.rs @@ -0,0 +1,170 @@ +use std::cmp::Ordering; +use std::sync::Arc; + +use garage_util::crdt::{Crdt, Lww, LwwMap}; +use garage_util::data::*; +use garage_util::encode::nonversioned_encode; +use garage_util::error::*; + +use super::schema::*; +use super::*; + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging_parameters = Lww::::new(version.parameters); + let empty_lwwmap = LwwMap::new(); + + let mut ret = LayoutHistory { + versions: vec![version].into_boxed_slice().into(), + update_trackers: Default::default(), + staging_parameters, + staging_roles: empty_lwwmap, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() + } + + pub(crate) fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + } + + // ================== updates to layout, public interface =================== + + pub fn merge(&mut self, other: &LayoutHistory) -> bool { + let mut changed = false; + + // Merge staged layout changes + match other.current().version.cmp(&self.current().version) { + Ordering::Greater => { + self.staging_parameters = other.staging_parameters.clone(); + self.staging_roles = other.staging_roles.clone(); + self.staging_hash = other.staging_hash; + changed = true; + } + Ordering::Equal => { + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); + + let new_staging_hash = self.calculate_staging_hash(); + if new_staging_hash != self.staging_hash { + changed = true; + } + + self.staging_hash = new_staging_hash; + } + Ordering::Less => (), + } + + // Add any new versions to history + let mut versions = self.versions.to_vec(); + for v2 in other.versions.iter() { + if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if v1 != v2 { + error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); + } + } else if versions.iter().all(|v| v.version != v2.version - 1) { + error!( + "Cannot receive new layout version {}, version {} is missing", + v2.version, + v2.version - 1 + ); + } else { + versions.push(v2.clone()); + changed = true; + } + } + self.versions = Arc::from(versions.into_boxed_slice()); + + // Merge trackers + self.update_trackers.merge(&other.update_trackers); + + changed + } + + pub fn apply_staged_changes(mut self, version: Option) -> Result<(Self, Message), Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + let mut new_version = self.current().clone(); + new_version.version += 1; + + new_version.roles.merge(&self.staging_roles); + new_version.roles.retain(|(_, _, v)| v.0.is_some()); + new_version.parameters = *self.staging_parameters.get(); + + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); + + let msg = new_version.calculate_partition_assignment()?; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok((self, msg)) + } + + pub fn revert_staged_changes(mut self, version: Option) -> Result { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging_roles.clear(); + self.staging_parameters.update(self.current().parameters); + self.staging_hash = self.calculate_staging_hash(); + + // TODO this is stupid, we should have a separate version counter/LWW + // for the staging params + let mut new_version = self.current().clone(); + new_version.version += 1; + + let mut versions = self.versions.to_vec(); + versions.push(new_version); + self.versions = Arc::from(versions.into_boxed_slice()); + + Ok(self) + } + + pub fn check(&self) -> Result<(), String> { + // Check that the hash of the staging data is correct + let staging_hash = self.calculate_staging_hash(); + if staging_hash != self.staging_hash { + return Err("staging_hash is incorrect".into()); + } + + // TODO: anythign more ? + + self.current().check() + } +} -- cgit v1.2.3 From 8dccee3ccfe7793c42203f28c1e91c6f989b6899 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 19:28:36 +0100 Subject: cluster layout: adapt all uses of ClusterLayout to LayoutHistory --- src/rpc/layout/history.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b3019f58..e59c9e9c 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,4 @@ use std::cmp::Ordering; -use std::sync::Arc; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -64,24 +63,22 @@ impl LayoutHistory { } // Add any new versions to history - let mut versions = self.versions.to_vec(); for v2 in other.versions.iter() { - if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { if v1 != v2 { error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); } - } else if versions.iter().all(|v| v.version != v2.version - 1) { + } else if self.versions.iter().all(|v| v.version != v2.version - 1) { error!( "Cannot receive new layout version {}, version {} is missing", v2.version, v2.version - 1 ); } else { - versions.push(v2.clone()); + self.versions.push(v2.clone()); changed = true; } } - self.versions = Arc::from(versions.into_boxed_slice()); // Merge trackers self.update_trackers.merge(&other.update_trackers); @@ -117,9 +114,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let msg = new_version.calculate_partition_assignment()?; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok((self, msg)) } @@ -149,9 +144,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let mut new_version = self.current().clone(); new_version.version += 1; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok(self) } -- cgit v1.2.3 From 523d2ecb9511f74e144cd116b942d6c1bf0f546d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 11:19:43 +0100 Subject: layout: use separate CRDT for staged layout changes --- src/rpc/layout/history.rs | 82 ++++++++++++++--------------------------------- 1 file changed, 24 insertions(+), 58 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e59c9e9c..9ae28887 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -12,14 +10,15 @@ impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); - let staging_parameters = Lww::::new(version.parameters); - let empty_lwwmap = LwwMap::new(); + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), - staging_parameters, - staging_roles: empty_lwwmap, + staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -31,8 +30,7 @@ impl LayoutHistory { } pub(crate) fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } // ================== updates to layout, public interface =================== @@ -41,26 +39,10 @@ impl LayoutHistory { let mut changed = false; // Merge staged layout changes - match other.current().version.cmp(&self.current().version) { - Ordering::Greater => { - self.staging_parameters = other.staging_parameters.clone(); - self.staging_roles = other.staging_roles.clone(); - self.staging_hash = other.staging_hash; - changed = true; - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - if new_staging_hash != self.staging_hash { - changed = true; - } - - self.staging_hash = new_staging_hash; - } - Ordering::Less => (), + if self.staging != other.staging { + changed = true; } + self.staging.merge(&other.staging); // Add any new versions to history for v2 in other.versions.iter() { @@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + // Compute new version and add it to history let mut new_version = self.current().clone(); new_version.version += 1; - new_version.roles.merge(&self.staging_roles); + new_version.roles.merge(&self.staging.get().roles); new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); + new_version.parameters = *self.staging.get().parameters.get(); let msg = new_version.calculate_partition_assignment()?; - self.versions.push(new_version); + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + self.staging_hash = self.calculate_staging_hash(); + Ok((self, msg)) } - pub fn revert_staged_changes(mut self, version: Option) -> Result { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.current().parameters); + pub fn revert_staged_changes(mut self) -> Result { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters.clone()), + roles: LwwMap::new(), + }); self.staging_hash = self.calculate_staging_hash(); - // TODO this is stupid, we should have a separate version counter/LWW - // for the staging params - let mut new_version = self.current().clone(); - new_version.version += 1; - - self.versions.push(new_version); - Ok(self) } -- cgit v1.2.3 From 94caf9c0c1342ce1d2ba3ac7af39fb133721ee83 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:53:34 +0100 Subject: layout: separate code path for synchronizing update trackers only --- src/rpc/layout/history.rs | 51 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 14 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 9ae28887..357b9d62 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,10 +18,11 @@ impl LayoutHistory { let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), + trackers_hash: [0u8; 32].into(), staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; - ret.staging_hash = ret.calculate_staging_hash(); + ret.update_hashes(); ret } @@ -29,6 +30,15 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub(crate) fn update_hashes(&mut self) { + self.trackers_hash = self.calculate_trackers_hash(); + self.staging_hash = self.calculate_staging_hash(); + } + + pub(crate) fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + pub(crate) fn calculate_staging_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } @@ -38,12 +48,6 @@ impl LayoutHistory { pub fn merge(&mut self, other: &LayoutHistory) -> bool { let mut changed = false; - // Merge staged layout changes - if self.staging != other.staging { - changed = true; - } - self.staging.merge(&other.staging); - // Add any new versions to history for v2 in other.versions.iter() { if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { @@ -63,7 +67,21 @@ impl LayoutHistory { } // Merge trackers - self.update_trackers.merge(&other.update_trackers); + if self.update_trackers != other.update_trackers { + let c = self.update_trackers.merge(&other.update_trackers); + changed = changed || c; + } + + // Merge staged layout changes + if self.staging != other.staging { + self.staging.merge(&other.staging); + changed = true; + } + + // Update hashes if there are changes + if changed { + self.update_hashes(); + } changed } @@ -100,7 +118,7 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.staging_hash = self.calculate_staging_hash(); + self.update_hashes(); Ok((self, msg)) } @@ -110,20 +128,25 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.staging_hash = self.calculate_staging_hash(); + self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { // Check that the hash of the staging data is correct - let staging_hash = self.calculate_staging_hash(); - if staging_hash != self.staging_hash { + if self.trackers_hash != self.calculate_trackers_hash() { + return Err("trackers_hash is incorrect".into()); + } + if self.staging_hash != self.calculate_staging_hash() { return Err("staging_hash is incorrect".into()); } - // TODO: anythign more ? + for version in self.versions.iter() { + version.check()?; + } - self.current().check() + // TODO: anythign more ? + Ok(()) } } -- cgit v1.2.3 From 03ebf18830dff1983f09abe6ecb8d8d26daeb446 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 15:31:59 +0100 Subject: layout: begin managing the update tracker values --- src/rpc/layout/history.rs | 74 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 5 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 357b9d62..347f03db 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -30,6 +32,14 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub fn all_storage_nodes(&self) -> HashSet { + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>() + } + pub(crate) fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -43,6 +53,65 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ update tracking --------------- + + pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version in the history + self.ack_last(node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(node_id); + + // 4. Cleanup layout versions that are not needed anymore + self.cleanup_old_versions(); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + + // Finally, update hashes + self.update_hashes(); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update_trackers.ack_map.set_max(node, last_version); + } + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update_trackers.sync_map.set_max(node, first_version); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + self.update_trackers.sync_ack_map.set_max( + node, + self.calculate_global_min(&self.update_trackers.sync_map), + ); + } + + pub(crate) fn cleanup_old_versions(&mut self) { + let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + while self.versions.first().as_ref().unwrap().version < min_sync_ack { + self.versions.remove(0); + } + } + + pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { + let storage_nodes = self.all_storage_nodes(); + storage_nodes + .iter() + .map(|x| tracker.0.get(x).copied().unwrap_or(0)) + .min() + .unwrap_or(0) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -78,11 +147,6 @@ impl LayoutHistory { changed = true; } - // Update hashes if there are changes - if changed { - self.update_hashes(); - } - changed } -- cgit v1.2.3 From bad7cc812ead88e9f334405c5c082d79c14c8898 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 15:42:10 +0100 Subject: layout admin: add missing calls to update_hash --- src/rpc/layout/history.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 347f03db..e17a1c77 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -40,7 +40,7 @@ impl LayoutHistory { .collect::>() } - pub(crate) fn update_hashes(&mut self) { + pub fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); } -- cgit v1.2.3 From df36cf3099f6010c4fc62109b85d4d1e62f160cc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 16:32:31 +0100 Subject: layout: add helpers to LayoutHistory and prepare integration with Table --- src/rpc/layout/history.rs | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e17a1c77..dbb02269 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -32,14 +32,6 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } - pub fn all_storage_nodes(&self) -> HashSet { - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>() - } - pub fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -53,6 +45,39 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ who stores what now? --------------- + + pub fn max_ack(&self) -> u64 { + self.calculate_global_min(&self.update_trackers.ack_map) + } + + pub fn all_storage_nodes(&self) -> HashSet { + // TODO: cache this + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>() + } + + pub fn read_nodes_of(&self, position: &Hash) -> Vec { + let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let version = self + .versions + .iter() + .find(|x| x.version == sync_min) + .or(self.versions.last()) + .unwrap(); + version.nodes_of(position, version.replication_factor) + } + + pub fn write_sets_of(&self, position: &Hash) -> Vec> { + self.versions + .iter() + .map(|x| x.nodes_of(position, x.replication_factor)) + .collect::>() + } + // ------------------ update tracking --------------- pub(crate) fn update_trackers(&mut self, node_id: Uuid) { -- cgit v1.2.3 From ce89d1ddabe3b9e638b0173949726522ae9a0311 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:08:32 +0100 Subject: table sync: adapt to new layout history --- src/rpc/layout/history.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dbb02269..185dbb27 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -47,11 +47,19 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- - pub fn max_ack(&self) -> u64 { + pub fn all_ack(&self) -> u64 { self.calculate_global_min(&self.update_trackers.ack_map) } - pub fn all_storage_nodes(&self) -> HashSet { + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn sync_versions(&self) -> (u64, u64, u64) { + (self.current().version, self.all_ack(), self.min_stored()) + } + + pub fn all_nongateway_nodes(&self) -> HashSet { // TODO: cache this self.versions .iter() @@ -71,11 +79,10 @@ impl LayoutHistory { version.nodes_of(position, version.replication_factor) } - pub fn write_sets_of(&self, position: &Hash) -> Vec> { + pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { self.versions .iter() - .map(|x| x.nodes_of(position, x.replication_factor)) - .collect::>() + .map(move |x| x.nodes_of(position, x.replication_factor)) } // ------------------ update tracking --------------- @@ -129,7 +136,9 @@ impl LayoutHistory { } pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - let storage_nodes = self.all_storage_nodes(); + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes(); storage_nodes .iter() .map(|x| tracker.0.get(x).copied().unwrap_or(0)) -- cgit v1.2.3 From df24bb806d64d5d5e748c35efe3f49ad3dda709e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:37:33 +0100 Subject: layout/sync: fix bugs and add tracing --- src/rpc/layout/history.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 185dbb27..cef56647 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -131,7 +131,8 @@ impl LayoutHistory { pub(crate) fn cleanup_old_versions(&mut self) { let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); while self.versions.first().as_ref().unwrap().version < min_sync_ack { - self.versions.remove(0); + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); } } -- cgit v1.2.3 From 9a491fa1372a23e91c793ee1d2b313607752826a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 13:10:59 +0100 Subject: layout: fix test --- src/rpc/layout/history.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index cef56647..050f5d0a 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,7 +18,7 @@ impl LayoutHistory { }; let mut ret = LayoutHistory { - versions: vec![version].into_boxed_slice().into(), + versions: vec![version], update_trackers: Default::default(), trackers_hash: [0u8; 32].into(), staging: Lww::raw(0, staging), @@ -211,6 +211,11 @@ To know the correct value of the new layout version, invoke `garage layout show` let msg = new_version.calculate_partition_assignment()?; self.versions.push(new_version); + if self.current().check().is_ok() { + while self.versions.first().unwrap().check().is_err() { + self.versions.remove(0); + } + } // Reset the staged layout changes self.staging.update(LayoutStaging { @@ -245,7 +250,7 @@ To know the correct value of the new layout version, invoke `garage layout show` version.check()?; } - // TODO: anythign more ? + // TODO: anything more ? Ok(()) } } -- cgit v1.2.3 From 8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 12:48:38 +0100 Subject: layout: some refactoring of nongateway nodes --- src/rpc/layout/history.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 050f5d0a..877ad3a7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashSet; use garage_util::crdt::{Crdt, Lww, LwwMap}; @@ -59,13 +60,19 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } - pub fn all_nongateway_nodes(&self) -> HashSet { + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>() + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>().into() + } } pub fn read_nodes_of(&self, position: &Hash) -> Vec { @@ -202,14 +209,11 @@ To know the correct value of the new layout version, invoke `garage layout show` } // Compute new version and add it to history - let mut new_version = self.current().clone(); - new_version.version += 1; - - new_version.roles.merge(&self.staging.get().roles); - new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging.get().parameters.get(); + let (new_version, msg) = self + .current() + .clone() + .calculate_next_version(&self.staging.get())?; - let msg = new_version.calculate_partition_assignment()?; self.versions.push(new_version); if self.current().check().is_ok() { while self.versions.first().unwrap().check().is_err() { -- cgit v1.2.3 From 1aab1f4e688ebc3f3adcb41c817c16c688a3291c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 13:06:16 +0100 Subject: layout: refactoring of all_nodes --- src/rpc/layout/history.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 877ad3a7..69348873 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -60,6 +60,21 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } + pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { + // TODO: cache this + if self.versions.len() == 1 { + self.versions[0].all_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>().into() + } + } + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this if self.versions.len() == 1 { -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/rpc/layout/history.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 69348873..dce492c9 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -98,13 +98,26 @@ impl LayoutHistory { .find(|x| x.version == sync_min) .or(self.versions.last()) .unwrap(); - version.nodes_of(position, version.replication_factor) + version + .nodes_of(position, version.replication_factor) + .collect() } - pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { + pub fn write_sets_of(&self, position: &Hash) -> Vec> { self.versions .iter() - .map(move |x| x.nodes_of(position, x.replication_factor)) + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec { + let mut ret = vec![]; + for version in self.versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret } // ------------------ update tracking --------------- -- cgit v1.2.3 From b3e729f4b8ec3b06593f8d3b161c76b1263d9f13 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 12:15:58 +0100 Subject: layout history merge: rm invalid versions when valid versions are added --- src/rpc/layout/history.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dce492c9..2346b14a 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -211,6 +211,24 @@ impl LayoutHistory { changed = changed || c; } + // If there are invalid versions before valid versions, remove them, + // and increment update trackers + if self.versions.len() > 1 && self.current().check().is_ok() { + while self.versions.first().unwrap().check().is_err() { + self.versions.remove(0); + changed = true; + } + if changed { + let min_v = self.versions.first().unwrap().version; + let nodes = self.all_nongateway_nodes().into_owned(); + for node in nodes { + self.update_trackers.ack_map.set_max(node, min_v); + self.update_trackers.sync_map.set_max(node, min_v); + self.update_trackers.sync_ack_map.set_max(node, min_v); + } + } + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); -- cgit v1.2.3 From 65066c70640371cc318faddfb4c05c96de18e86d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 13:28:30 +0100 Subject: layout: wip cache global mins --- src/rpc/layout/history.rs | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 2346b14a..1684918e 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -9,6 +9,7 @@ use garage_util::error::*; use super::schema::*; use super::*; + impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -49,7 +50,7 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- pub fn all_ack(&self) -> u64 { - self.calculate_global_min(&self.update_trackers.ack_map) + self.update_trackers.ack_map.current_min } pub fn min_stored(&self) -> u64 { @@ -91,7 +92,7 @@ impl LayoutHistory { } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let sync_min = self.update_trackers.sync_map.current_min; let version = self .versions .iter() @@ -122,7 +123,7 @@ impl LayoutHistory { // ------------------ update tracking --------------- - pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version in the history @@ -138,6 +139,9 @@ impl LayoutHistory { // 4. Cleanup layout versions that are not needed anymore self.cleanup_old_versions(); + // 5. Recalculate global minima + self.update_trackers_min(); + info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); @@ -146,42 +150,41 @@ impl LayoutHistory { self.update_hashes(); } + fn update_trackers_min(&mut self) { + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes().into_owned(); + let min_version = self.versions.first().unwrap().version; + self.update_trackers.update_min(&storage_nodes, min_version); + } + pub(crate) fn ack_last(&mut self, node: Uuid) { let last_version = self.current().version; self.update_trackers.ack_map.set_max(node, last_version); + self.update_trackers_min(); } pub(crate) fn sync_first(&mut self, node: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; self.update_trackers.sync_map.set_max(node, first_version); + self.update_trackers_min(); } pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers.sync_ack_map.set_max( - node, - self.calculate_global_min(&self.update_trackers.sync_map), - ); + self.update_trackers + .sync_ack_map + .set_max(node, self.update_trackers.sync_map.current_min); + self.update_trackers_min(); } pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + let min_sync_ack = self.update_trackers.sync_ack_map.current_min; while self.versions.first().as_ref().unwrap().version < min_sync_ack { let removed = self.versions.remove(0); info!("Layout history: pruning old version {}", removed.version); } } - pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes(); - storage_nodes - .iter() - .map(|x| tracker.0.get(x).copied().unwrap_or(0)) - .min() - .unwrap_or(0) - } - // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -229,6 +232,11 @@ impl LayoutHistory { } } + // Update the current_min value in trackers if anything changed + if changed { + self.update_trackers_min(); + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); -- cgit v1.2.3 From 393c4d4515e0cdadadc8de8ae2df12e4371cff88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 14:20:50 +0100 Subject: layout: add helper for cached/external values to centralize recomputation --- src/rpc/layout/history.rs | 311 ++++++++++++++++++++++++++++------------------ 1 file changed, 188 insertions(+), 123 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 1684918e..b6f0e495 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,5 @@ -use std::borrow::Cow; use std::collections::HashSet; +use std::ops::Deref; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -9,95 +9,106 @@ use garage_util::error::*; use super::schema::*; use super::*; +pub struct LayoutHelper { + layout: Option, -impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + // cached values + ack_map_min: u64, + sync_map_min: u64, - let staging = LayoutStaging { - parameters: Lww::::new(version.parameters), - roles: LwwMap::new(), - }; + all_nodes: Vec, + all_nongateway_nodes: Vec, - let mut ret = LayoutHistory { - versions: vec![version], - update_trackers: Default::default(), - trackers_hash: [0u8; 32].into(), - staging: Lww::raw(0, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret - } + trackers_hash: Hash, + staging_hash: Hash, +} - pub fn current(&self) -> &LayoutVersion { - self.versions.last().as_ref().unwrap() +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() } +} - pub fn update_hashes(&mut self) { - self.trackers_hash = self.calculate_trackers_hash(); - self.staging_hash = self.calculate_staging_hash(); +impl LayoutHelper { + pub fn new(mut layout: LayoutHistory) -> Self { + layout.cleanup_old_versions(); + + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + layout.clamp_update_trackers(&all_nongateway_nodes); + + let min_version = layout.min_stored(); + let ack_map_min = layout + .update_trackers + .ack_map + .min(&all_nongateway_nodes, min_version); + let sync_map_min = layout + .update_trackers + .sync_map + .min(&all_nongateway_nodes, min_version); + + let all_nodes = layout.get_all_nodes(); + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + LayoutHelper { + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + } } - pub(crate) fn calculate_trackers_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) - } + // ------------------ single updating function -------------- - pub(crate) fn calculate_staging_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() } - // ------------------ who stores what now? --------------- - - pub fn all_ack(&self) -> u64 { - self.update_trackers.ack_map.current_min + pub(crate) fn update(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(&mut self.layout.as_mut().unwrap()); + if changed { + *self = Self::new(self.layout.take().unwrap()); + } + changed } - pub fn min_stored(&self) -> u64 { - self.versions.first().as_ref().unwrap().version + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes } - pub fn sync_versions(&self) -> (u64, u64, u64) { - (self.current().version, self.all_ack(), self.min_stored()) + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes } - pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].all_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.all_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } + pub fn all_ack(&self) -> u64 { + self.ack_map_min } - pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].nongateway_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } + pub fn sync_versions(&self) -> (u64, u64, u64) { + ( + self.layout().current().version, + self.all_ack(), + self.layout().min_stored(), + ) } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.update_trackers.sync_map.current_min; + let sync_min = self.sync_map_min; let version = self + .layout() .versions .iter() .find(|x| x.version == sync_min) - .or(self.versions.last()) + .or(self.layout().versions.last()) .unwrap(); version .nodes_of(position, version.replication_factor) @@ -105,7 +116,8 @@ impl LayoutHistory { } pub fn write_sets_of(&self, position: &Hash) -> Vec> { - self.versions + self.layout() + .versions .iter() .map(|x| x.nodes_of(position, x.replication_factor).collect()) .collect() @@ -113,7 +125,7 @@ impl LayoutHistory { pub fn storage_nodes_of(&self, position: &Hash) -> Vec { let mut ret = vec![]; - for version in self.versions.iter() { + for version in self.layout().versions.iter() { ret.extend(version.nodes_of(position, version.replication_factor)); } ret.sort(); @@ -121,7 +133,35 @@ impl LayoutHistory { ret } - // ------------------ update tracking --------------- + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(node, sync_map_min) + }); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + } pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date @@ -136,55 +176,104 @@ impl LayoutHistory { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(node_id); - // 4. Cleanup layout versions that are not needed anymore - self.cleanup_old_versions(); - - // 5. Recalculate global minima - self.update_trackers_min(); - info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } +} - // Finally, update hashes - self.update_hashes(); +// ---- + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; + + LayoutHistory { + versions: vec![version], + update_trackers: Default::default(), + staging: Lww::raw(0, staging), + } } - fn update_trackers_min(&mut self) { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes().into_owned(); - let min_version = self.versions.first().unwrap().version; - self.update_trackers.update_min(&storage_nodes, min_version); + // ------------------ who stores what now? --------------- + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update_trackers.ack_map.set_max(node, last_version); - self.update_trackers_min(); + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version } - pub(crate) fn sync_first(&mut self, node: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update_trackers.sync_map.set_max(node, first_version); - self.update_trackers_min(); + pub fn get_all_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].all_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } } - pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers - .sync_ack_map - .set_max(node, self.update_trackers.sync_map.current_min); - self.update_trackers_min(); + fn get_all_nongateway_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } } - pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.update_trackers.sync_ack_map.current_min; - while self.versions.first().as_ref().unwrap().version < min_sync_ack { - let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + // ---- housekeeping (all invoked by LayoutHelper) ---- + + fn cleanup_old_versions(&mut self) { + loop { + let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min(&all_nongateway_nodes, min_version); + if self.min_stored() < sync_ack_map_min { + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); + } else { + break; + } } } + fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + let min_v = self.min_stored(); + for node in nodes { + self.update_trackers.ack_map.set_max(*node, min_v); + self.update_trackers.sync_map.set_max(*node, min_v); + self.update_trackers.sync_ack_map.set_max(*node, min_v); + } + } + + fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + + fn calculate_staging_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -221,20 +310,6 @@ impl LayoutHistory { self.versions.remove(0); changed = true; } - if changed { - let min_v = self.versions.first().unwrap().version; - let nodes = self.all_nongateway_nodes().into_owned(); - for node in nodes { - self.update_trackers.ack_map.set_max(node, min_v); - self.update_trackers.sync_map.set_max(node, min_v); - self.update_trackers.sync_ack_map.set_max(node, min_v); - } - } - } - - // Update the current_min value in trackers if anything changed - if changed { - self.update_trackers_min(); } // Merge staged layout changes @@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.update_hashes(); Ok((self, msg)) } @@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { - // Check that the hash of the staging data is correct - if self.trackers_hash != self.calculate_trackers_hash() { - return Err("trackers_hash is incorrect".into()); - } - if self.staging_hash != self.calculate_staging_hash() { - return Err("staging_hash is incorrect".into()); - } - for version in self.versions.iter() { version.check()?; } -- cgit v1.2.3 From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/rpc/layout/history.rs | 98 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 26 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b6f0e495..dd38efa7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; use std::collections::HashSet; use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -21,6 +23,11 @@ pub struct LayoutHelper { trackers_hash: Hash, staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap, } impl Deref for LayoutHelper { @@ -31,7 +38,7 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory) -> Self { + pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { layout.cleanup_old_versions(); let all_nongateway_nodes = layout.get_all_nongateway_nodes(); @@ -51,6 +58,11 @@ impl LayoutHelper { let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + LayoutHelper { layout: Some(layout), ack_map_min, @@ -59,6 +71,7 @@ impl LayoutHelper { all_nongateway_nodes, trackers_hash, staging_hash, + ack_lock, } } @@ -74,7 +87,10 @@ impl LayoutHelper { { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { - *self = Self::new(self.layout.take().unwrap()); + *self = Self::new( + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); } changed } @@ -115,7 +131,7 @@ impl LayoutHelper { .collect() } - pub fn write_sets_of(&self, position: &Hash) -> Vec> { + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions .iter() @@ -143,42 +159,72 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn sync_first(&mut self, node: Uuid) { + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); } - pub(crate) fn sync_ack(&mut self, node: Uuid) { + fn sync_ack(&mut self, local_node_id: Uuid) { let sync_map_min = self.sync_map_min; self.update(|layout| { layout .update_trackers .sync_ack_map - .set_max(node, sync_map_min) + .set_max(local_node_id, sync_map_min) }); } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed } - pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version in the history - self.ack_last(node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .take_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .max() + .unwrap_or(self.min_stored()) } } -- cgit v1.2.3 From ad5c6f779f7fdfdc0569920c830c59197023515a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 13:26:43 +0100 Subject: layout: split helper in separate file; more precise difference tracking --- src/rpc/layout/history.rs | 278 +++++----------------------------------------- 1 file changed, 26 insertions(+), 252 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dd38efa7..0a139549 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,7 +1,4 @@ -use std::collections::HashMap; use std::collections::HashSet; -use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -11,225 +8,6 @@ use garage_util::error::*; use super::schema::*; use super::*; -pub struct LayoutHelper { - layout: Option, - - // cached values - ack_map_min: u64, - sync_map_min: u64, - - all_nodes: Vec, - all_nongateway_nodes: Vec, - - trackers_hash: Hash, - staging_hash: Hash, - - // ack lock: counts in-progress write operations for each - // layout version ; we don't increase the ack update tracker - // while this lock is nonzero - pub(crate) ack_lock: HashMap, -} - -impl Deref for LayoutHelper { - type Target = LayoutHistory; - fn deref(&self) -> &LayoutHistory { - self.layout() - } -} - -impl LayoutHelper { - pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap) -> Self { - layout.cleanup_old_versions(); - - let all_nongateway_nodes = layout.get_all_nongateway_nodes(); - layout.clamp_update_trackers(&all_nongateway_nodes); - - let min_version = layout.min_stored(); - let ack_map_min = layout - .update_trackers - .ack_map - .min(&all_nongateway_nodes, min_version); - let sync_map_min = layout - .update_trackers - .sync_map - .min(&all_nongateway_nodes, min_version); - - let all_nodes = layout.get_all_nodes(); - let trackers_hash = layout.calculate_trackers_hash(); - let staging_hash = layout.calculate_staging_hash(); - - ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); - ack_lock - .entry(layout.current().version) - .or_insert(AtomicUsize::new(0)); - - LayoutHelper { - layout: Some(layout), - ack_map_min, - sync_map_min, - all_nodes, - all_nongateway_nodes, - trackers_hash, - staging_hash, - ack_lock, - } - } - - // ------------------ single updating function -------------- - - fn layout(&self) -> &LayoutHistory { - self.layout.as_ref().unwrap() - } - - pub(crate) fn update(&mut self, f: F) -> bool - where - F: FnOnce(&mut LayoutHistory) -> bool, - { - let changed = f(&mut self.layout.as_mut().unwrap()); - if changed { - *self = Self::new( - self.layout.take().unwrap(), - std::mem::take(&mut self.ack_lock), - ); - } - changed - } - - // ------------------ read helpers --------------- - - pub fn all_nodes(&self) -> &[Uuid] { - &self.all_nodes - } - - pub fn all_nongateway_nodes(&self) -> &[Uuid] { - &self.all_nongateway_nodes - } - - pub fn all_ack(&self) -> u64 { - self.ack_map_min - } - - pub fn sync_versions(&self) -> (u64, u64, u64) { - ( - self.layout().current().version, - self.all_ack(), - self.layout().min_stored(), - ) - } - - pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.sync_map_min; - let version = self - .layout() - .versions - .iter() - .find(|x| x.version == sync_min) - .or(self.layout().versions.last()) - .unwrap(); - version - .nodes_of(position, version.replication_factor) - .collect() - } - - pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { - self.layout() - .versions - .iter() - .map(|x| x.nodes_of(position, x.replication_factor).collect()) - .collect() - } - - pub fn storage_nodes_of(&self, position: &Hash) -> Vec { - let mut ret = vec![]; - for version in self.layout().versions.iter() { - ret.extend(version.nodes_of(position, version.replication_factor)); - } - ret.sort(); - ret.dedup(); - ret - } - - pub fn trackers_hash(&self) -> Hash { - self.trackers_hash - } - - pub fn staging_hash(&self) -> Hash { - self.staging_hash - } - - // ------------------ helpers for update tracking --------------- - - pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { - // Ensure trackers for this node's values are up-to-date - - // 1. Acknowledge the last layout version which is not currently - // locked by an in-progress write operation - self.ack_max_free(local_node_id); - - // 2. Assume the data on this node is sync'ed up at least to - // the first layout version in the history - self.sync_first(local_node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(local_node_id); - - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); - } - - fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update(|layout| { - layout - .update_trackers - .sync_map - .set_max(local_node_id, first_version) - }); - } - - fn sync_ack(&mut self, local_node_id: Uuid) { - let sync_map_min = self.sync_map_min; - self.update(|layout| { - layout - .update_trackers - .sync_ack_map - .set_max(local_node_id, sync_map_min) - }); - } - - pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { - let max_ack = self.max_free_ack(); - let changed = self.update(|layout| { - layout - .update_trackers - .ack_map - .set_max(local_node_id, max_ack) - }); - if changed { - info!("ack_until updated to {}", max_ack); - } - changed - } - - pub(crate) fn max_free_ack(&self) -> u64 { - self.layout() - .versions - .iter() - .map(|x| x.version) - .take_while(|v| { - self.ack_lock - .get(v) - .map(|x| x.load(Ordering::Relaxed) == 0) - .unwrap_or(true) - }) - .max() - .unwrap_or(self.min_stored()) - } -} - -// ---- - impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -270,7 +48,7 @@ impl LayoutHistory { } } - fn get_all_nongateway_nodes(&self) -> Vec { + pub(crate) fn get_all_nongateway_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].nongateway_nodes().to_vec() } else { @@ -286,8 +64,21 @@ impl LayoutHistory { // ---- housekeeping (all invoked by LayoutHelper) ---- - fn cleanup_old_versions(&mut self) { - loop { + pub(crate) fn cleanup_old_versions(&mut self) { + // If there are invalid versions before valid versions, remove them + if self.versions.len() > 1 && self.current().check().is_ok() { + while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() { + let removed = self.versions.remove(0); + info!( + "Layout history: pruning old invalid version {}", + removed.version + ); + } + } + + // If there are old versions that no one is reading from anymore, + // remove them + while self.versions.len() > 1 { let all_nongateway_nodes = self.get_all_nongateway_nodes(); let min_version = self.min_stored(); let sync_ack_map_min = self @@ -303,7 +94,7 @@ impl LayoutHistory { } } - fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { let min_v = self.min_stored(); for node in nodes { self.update_trackers.ack_map.set_max(*node, min_v); @@ -312,11 +103,11 @@ impl LayoutHistory { } } - fn calculate_trackers_hash(&self) -> Hash { + pub(crate) fn calculate_trackers_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) } - fn calculate_staging_hash(&self) -> Hash { + pub(crate) fn calculate_staging_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } @@ -328,6 +119,7 @@ impl LayoutHistory { // Add any new versions to history for v2 in other.versions.iter() { if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { + // Version is already present, check consistency if v1 != v2 { error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); } @@ -344,24 +136,14 @@ impl LayoutHistory { } // Merge trackers - if self.update_trackers != other.update_trackers { - let c = self.update_trackers.merge(&other.update_trackers); - changed = changed || c; - } - - // If there are invalid versions before valid versions, remove them, - // and increment update trackers - if self.versions.len() > 1 && self.current().check().is_ok() { - while self.versions.first().unwrap().check().is_err() { - self.versions.remove(0); - changed = true; - } - } + let c = self.update_trackers.merge(&other.update_trackers); + changed = changed || c; // Merge staged layout changes if self.staging != other.staging { + let prev_staging = self.staging.clone(); self.staging.merge(&other.staging); - changed = true; + changed = changed || self.staging != prev_staging; } changed @@ -390,11 +172,7 @@ To know the correct value of the new layout version, invoke `garage layout show` .calculate_next_version(&self.staging.get())?; self.versions.push(new_version); - if self.current().check().is_ok() { - while self.versions.first().unwrap().check().is_err() { - self.versions.remove(0); - } - } + self.cleanup_old_versions(); // Reset the staged layout changes self.staging.update(LayoutStaging { @@ -415,11 +193,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } pub fn check(&self) -> Result<(), String> { - for version in self.versions.iter() { - version.check()?; - } - // TODO: anything more ? - Ok(()) + self.current().check() } } -- cgit v1.2.3 From 707442f5de416fdbed4681a33b739f0a787b7834 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 13:51:40 +0100 Subject: layout: refactor digests and add "!=" assertions before epidemic bcast --- src/rpc/layout/history.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 0a139549..653d2a48 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -5,7 +5,6 @@ use garage_util::data::*; use garage_util::encode::nonversioned_encode; use garage_util::error::*; -use super::schema::*; use super::*; impl LayoutHistory { -- cgit v1.2.3 From d6d239fc7909cbd017da6ea35cceb3d561a87cca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 27 Nov 2023 11:52:57 +0100 Subject: block manager: read_block using old layout versions if necessary --- src/rpc/layout/history.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 653d2a48..7d4a1b48 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,6 +18,7 @@ impl LayoutHistory { LayoutHistory { versions: vec![version], + old_versions: vec![], update_trackers: Default::default(), staging: Lww::raw(0, staging), } @@ -86,11 +87,20 @@ impl LayoutHistory { .min(&all_nongateway_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); } else { break; } } + + while self.old_versions.len() > OLD_VERSION_COUNT { + let removed = self.old_versions.remove(0); + info!("Layout history: removing old_version {}", removed.version); + } } pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { -- cgit v1.2.3 From c8356a91d9bf1d1488ec288099f2a55a1019918f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 10:30:26 +0100 Subject: layout updates: fix the set of nodes among which minima are calculated --- src/rpc/layout/history.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 7d4a1b48..c448ac24 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -77,14 +77,16 @@ impl LayoutHistory { } // If there are old versions that no one is reading from anymore, - // remove them + // remove them (keep them in self.old_versions). + // ASSUMPTION: we only care about where nodes in the current layout version + // are reading from, as we assume older nodes are being discarded. while self.versions.len() > 1 { - let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let current_nodes = &self.current().node_id_vec; let min_version = self.min_stored(); let sync_ack_map_min = self .update_trackers .sync_ack_map - .min(&all_nongateway_nodes, min_version); + .min_among(¤t_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); info!( -- cgit v1.2.3 From 9cecea64d4509e95ac9793b29c947e2ecf9bb0b8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 14:27:53 +0100 Subject: layout: allow sync update tracker to progress with only quorums --- src/rpc/layout/history.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index c448ac24..a53256cc 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,6 +6,7 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; +use crate::replication_mode::ReplicationMode; impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { @@ -64,6 +65,13 @@ impl LayoutHistory { // ---- housekeeping (all invoked by LayoutHelper) ---- + pub(crate) fn keep_current_version_only(&mut self) { + while self.versions.len() > 1 { + let removed = self.versions.remove(0); + self.old_versions.push(removed); + } + } + pub(crate) fn cleanup_old_versions(&mut self) { // If there are invalid versions before valid versions, remove them if self.versions.len() > 1 && self.current().check().is_ok() { @@ -114,6 +122,99 @@ impl LayoutHistory { } } + pub(crate) fn calculate_sync_map_min_with_quorum( + &self, + replication_mode: ReplicationMode, + all_nongateway_nodes: &[Uuid], + ) -> u64 { + // This function calculates the minimum layout version from which + // it is safe to read if we want to maintain read-after-write consistency. + // In the general case the computation can be a bit expensive so + // we try to optimize it in several ways. + + // If there is only one layout version, we know that's the one + // we need to read from. + if self.versions.len() == 1 { + return self.current().version; + } + + let quorum = replication_mode.write_quorum(); + + let min_version = self.min_stored(); + let global_min = self + .update_trackers + .sync_map + .min_among(&all_nongateway_nodes, min_version); + + // If the write quorums are equal to the total number of nodes, + // i.e. no writes can succeed while they are not written to all nodes, + // then we must in all case wait for all nodes to complete a sync. + // This is represented by reading from the layout with version + // number global_min, the smallest layout version for which all nodes + // have completed a sync. + if quorum == self.current().replication_factor { + return global_min; + } + + // In the general case, we need to look at all write sets for all partitions, + // and find a safe layout version to read for that partition. We then + // take the minimum value among all partition as the safe layout version + // to read in all cases (the layout version to which all reads are directed). + let mut current_min = self.current().version; + let mut sets_done = HashSet::>::new(); + + for (_, p_hash) in self.current().partitions() { + for v in self.versions.iter() { + if v.version == self.current().version { + // We don't care about whether nodes in the latest layout version + // have completed a sync or not, as the sync is push-only + // and by definition nodes in the latest layout version do not + // hold data that must be pushed to nodes in the latest layout + // version, since that's the same version (any data that's + // already in the latest version is assumed to have been written + // by an operation that ensured a quorum of writes within + // that version). + continue; + } + + // Determine set of nodes for partition p in layout version v. + // Sort the node set to avoid duplicate computations. + let mut set = v + .nodes_of(&p_hash, v.replication_factor) + .collect::>(); + set.sort(); + + // If this set was already processed, skip it. + if sets_done.contains(&set) { + continue; + } + + // Find the value of the sync update trackers that is the + // highest possible minimum within a quorum of nodes. + let mut sync_values = set + .iter() + .map(|x| self.update_trackers.sync_map.get(x, min_version)) + .collect::>(); + sync_values.sort(); + let set_min = sync_values[sync_values.len() - quorum]; + if set_min < current_min { + current_min = set_min; + } + // defavorable case, we know we are at the smallest possible version, + // so we can stop early + assert!(current_min >= global_min); + if current_min == global_min { + return current_min; + } + + // Add set to already processed sets + sets_done.insert(set); + } + } + + current_min + } + pub(crate) fn calculate_trackers_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) } -- cgit v1.2.3 From 85b5a6bcd11c0a7651e4c589569e1935a3d18e46 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:31:47 +0100 Subject: fix some clippy lints --- src/rpc/layout/history.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index a53256cc..23196aee 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -42,8 +42,7 @@ impl LayoutHistory { let set = self .versions .iter() - .map(|x| x.all_nodes()) - .flatten() + .flat_map(|x| x.all_nodes()) .collect::>(); set.into_iter().copied().collect::>() } @@ -56,8 +55,7 @@ impl LayoutHistory { let set = self .versions .iter() - .map(|x| x.nongateway_nodes()) - .flatten() + .flat_map(|x| x.nongateway_nodes()) .collect::>(); set.into_iter().copied().collect::>() } @@ -94,7 +92,7 @@ impl LayoutHistory { let sync_ack_map_min = self .update_trackers .sync_ack_map - .min_among(¤t_nodes, min_version); + .min_among(current_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); info!( @@ -144,7 +142,7 @@ impl LayoutHistory { let global_min = self .update_trackers .sync_map - .min_among(&all_nongateway_nodes, min_version); + .min_among(all_nongateway_nodes, min_version); // If the write quorums are equal to the total number of nodes, // i.e. no writes can succeed while they are not written to all nodes, @@ -281,7 +279,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let (new_version, msg) = self .current() .clone() - .calculate_next_version(&self.staging.get())?; + .calculate_next_version(self.staging.get())?; self.versions.push(new_version); self.cleanup_old_versions(); @@ -297,7 +295,7 @@ To know the correct value of the new layout version, invoke `garage layout show` pub fn revert_staged_changes(mut self) -> Result { self.staging.update(LayoutStaging { - parameters: Lww::new(self.current().parameters.clone()), + parameters: Lww::new(self.current().parameters), roles: LwwMap::new(), }); -- cgit v1.2.3 From adccce1145d5d82581e4a5da707be35badb2d5a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:45:14 +0100 Subject: layout: refactor/fix bad while loop --- src/rpc/layout/history.rs | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 23196aee..b8cc27da 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -86,23 +86,20 @@ impl LayoutHistory { // remove them (keep them in self.old_versions). // ASSUMPTION: we only care about where nodes in the current layout version // are reading from, as we assume older nodes are being discarded. - while self.versions.len() > 1 { - let current_nodes = &self.current().node_id_vec; - let min_version = self.min_stored(); - let sync_ack_map_min = self - .update_trackers - .sync_ack_map - .min_among(current_nodes, min_version); - if self.min_stored() < sync_ack_map_min { - let removed = self.versions.remove(0); - info!( - "Layout history: moving version {} to old_versions", - removed.version - ); - self.old_versions.push(removed); - } else { - break; - } + let current_nodes = &self.current().node_id_vec; + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min_among(current_nodes, min_version); + while self.min_stored() < sync_ack_map_min { + assert!(self.versions.len() > 1); + let removed = self.versions.remove(0); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); } while self.old_versions.len() > OLD_VERSION_COUNT { -- cgit v1.2.3 From c1769bbe69f723fb3980cf4fdac7615cfb782720 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 19:58:32 +0100 Subject: ReplicationMode -> ConsistencyMode+ReplicationFactor --- src/rpc/layout/history.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b8cc27da..290f058d 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + pub fn new(replication_factor: ReplicationFactor) -> Self { + let version = LayoutVersion::new(replication_factor.into()); let staging = LayoutStaging { parameters: Lww::::new(version.parameters), @@ -119,7 +119,7 @@ impl LayoutHistory { pub(crate) fn calculate_sync_map_min_with_quorum( &self, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, all_nongateway_nodes: &[Uuid], ) -> u64 { // This function calculates the minimum layout version from which @@ -133,7 +133,7 @@ impl LayoutHistory { return self.current().version; } - let quorum = replication_mode.write_quorum(); + let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); let min_version = self.min_stored(); let global_min = self -- cgit v1.2.3 From c0eeb0b0f32ed0a27cfdf9297d0e71e1b9948b73 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 10:44:03 +0100 Subject: [next-0.10] fixes to k2v rpc + comment fixes --- src/rpc/layout/history.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/rpc/layout/history.rs') diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 290f058d..af2cbc63 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -27,14 +27,18 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- + /// Returns the layout version with the highest number pub fn current(&self) -> &LayoutVersion { self.versions.last().as_ref().unwrap() } + /// Returns the version number of the oldest layout version still active pub fn min_stored(&self) -> u64 { self.versions.first().as_ref().unwrap().version } + /// Calculate the set of all nodes that have a role (gateway or storage) + /// in one of the currently active layout versions pub fn get_all_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].all_nodes().to_vec() @@ -48,6 +52,8 @@ impl LayoutHistory { } } + /// Calculate the set of all nodes that are configured to store data + /// in one of the currently active layout versions pub(crate) fn get_all_nongateway_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].nongateway_nodes().to_vec() -- cgit v1.2.3