diff options
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r-- | src/rpc/layout/history.rs | 311 |
1 files changed, 188 insertions, 123 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 1684918e..b6f0e495 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,5 @@ -use std::borrow::Cow; use std::collections::HashSet; +use std::ops::Deref; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -9,95 +9,106 @@ use garage_util::error::*; use super::schema::*; use super::*; +pub struct LayoutHelper { + layout: Option<LayoutHistory>, -impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + // cached values + ack_map_min: u64, + sync_map_min: u64, - let staging = LayoutStaging { - parameters: Lww::<LayoutParameters>::new(version.parameters), - roles: LwwMap::new(), - }; + all_nodes: Vec<Uuid>, + all_nongateway_nodes: Vec<Uuid>, - let mut ret = LayoutHistory { - versions: vec![version], - update_trackers: Default::default(), - trackers_hash: [0u8; 32].into(), - staging: Lww::raw(0, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret - } + trackers_hash: Hash, + staging_hash: Hash, +} - pub fn current(&self) -> &LayoutVersion { - self.versions.last().as_ref().unwrap() +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() } +} - pub fn update_hashes(&mut self) { - self.trackers_hash = self.calculate_trackers_hash(); - self.staging_hash = self.calculate_staging_hash(); +impl LayoutHelper { + pub fn new(mut layout: LayoutHistory) -> Self { + layout.cleanup_old_versions(); + + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + layout.clamp_update_trackers(&all_nongateway_nodes); + + let min_version = layout.min_stored(); + let ack_map_min = layout + .update_trackers + .ack_map + .min(&all_nongateway_nodes, min_version); + let sync_map_min = layout + .update_trackers + .sync_map + .min(&all_nongateway_nodes, min_version); + + let all_nodes = layout.get_all_nodes(); + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + LayoutHelper { + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + } } - pub(crate) fn calculate_trackers_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) - } + // ------------------ single updating function -------------- - pub(crate) fn calculate_staging_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() } - // ------------------ who stores what now? --------------- - - pub fn all_ack(&self) -> u64 { - self.update_trackers.ack_map.current_min + pub(crate) fn update<F>(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(&mut self.layout.as_mut().unwrap()); + if changed { + *self = Self::new(self.layout.take().unwrap()); + } + changed } - pub fn min_stored(&self) -> u64 { - self.versions.first().as_ref().unwrap().version + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes } - pub fn sync_versions(&self) -> (u64, u64, u64) { - (self.current().version, self.all_ack(), self.min_stored()) + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes } - pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].all_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.all_nodes()) - .flatten() - .collect::<HashSet<_>>(); - set.into_iter().copied().collect::<Vec<_>>().into() - } + pub fn all_ack(&self) -> u64 { + self.ack_map_min } - pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].nongateway_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::<HashSet<_>>(); - set.into_iter().copied().collect::<Vec<_>>().into() - } + pub fn sync_versions(&self) -> (u64, u64, u64) { + ( + self.layout().current().version, + self.all_ack(), + self.layout().min_stored(), + ) } pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { - let sync_min = self.update_trackers.sync_map.current_min; + let sync_min = self.sync_map_min; let version = self + .layout() .versions .iter() .find(|x| x.version == sync_min) - .or(self.versions.last()) + .or(self.layout().versions.last()) .unwrap(); version .nodes_of(position, version.replication_factor) @@ -105,7 +116,8 @@ impl LayoutHistory { } pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { - self.versions + self.layout() + .versions .iter() .map(|x| x.nodes_of(position, x.replication_factor).collect()) .collect() @@ -113,7 +125,7 @@ impl LayoutHistory { pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> { let mut ret = vec![]; - for version in self.versions.iter() { + for version in self.layout().versions.iter() { ret.extend(version.nodes_of(position, version.replication_factor)); } ret.sort(); @@ -121,7 +133,35 @@ impl LayoutHistory { ret } - // ------------------ update tracking --------------- + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(node, sync_map_min) + }); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + } pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date @@ -136,55 +176,104 @@ impl LayoutHistory { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(node_id); - // 4. Cleanup layout versions that are not needed anymore - self.cleanup_old_versions(); - - // 5. Recalculate global minima - self.update_trackers_min(); - info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } +} - // Finally, update hashes - self.update_hashes(); +// ---- + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging = LayoutStaging { + parameters: Lww::<LayoutParameters>::new(version.parameters), + roles: LwwMap::new(), + }; + + LayoutHistory { + versions: vec![version], + update_trackers: Default::default(), + staging: Lww::raw(0, staging), + } } - fn update_trackers_min(&mut self) { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes().into_owned(); - let min_version = self.versions.first().unwrap().version; - self.update_trackers.update_min(&storage_nodes, min_version); + // ------------------ who stores what now? --------------- + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() } - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update_trackers.ack_map.set_max(node, last_version); - self.update_trackers_min(); + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version } - pub(crate) fn sync_first(&mut self, node: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update_trackers.sync_map.set_max(node, first_version); - self.update_trackers_min(); + pub fn get_all_nodes(&self) -> Vec<Uuid> { + if self.versions.len() == 1 { + self.versions[0].all_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>() + } } - pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers - .sync_ack_map - .set_max(node, self.update_trackers.sync_map.current_min); - self.update_trackers_min(); + fn get_all_nongateway_nodes(&self) -> Vec<Uuid> { + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>() + } } - pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.update_trackers.sync_ack_map.current_min; - while self.versions.first().as_ref().unwrap().version < min_sync_ack { - let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + // ---- housekeeping (all invoked by LayoutHelper) ---- + + fn cleanup_old_versions(&mut self) { + loop { + let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min(&all_nongateway_nodes, min_version); + if self.min_stored() < sync_ack_map_min { + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); + } else { + break; + } } } + fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + let min_v = self.min_stored(); + for node in nodes { + self.update_trackers.ack_map.set_max(*node, min_v); + self.update_trackers.sync_map.set_max(*node, min_v); + self.update_trackers.sync_ack_map.set_max(*node, min_v); + } + } + + fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + + fn calculate_staging_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -221,20 +310,6 @@ impl LayoutHistory { self.versions.remove(0); changed = true; } - if changed { - let min_v = self.versions.first().unwrap().version; - let nodes = self.all_nongateway_nodes().into_owned(); - for node in nodes { - self.update_trackers.ack_map.set_max(node, min_v); - self.update_trackers.sync_map.set_max(node, min_v); - self.update_trackers.sync_ack_map.set_max(node, min_v); - } - } - } - - // Update the current_min value in trackers if anything changed - if changed { - self.update_trackers_min(); } // Merge staged layout changes @@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.update_hashes(); Ok((self, msg)) } @@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { - // Check that the hash of the staging data is correct - if self.trackers_hash != self.calculate_trackers_hash() { - return Err("trackers_hash is incorrect".into()); - } - if self.staging_hash != self.calculate_staging_hash() { - return Err("staging_hash is incorrect".into()); - } - for version in self.versions.iter() { version.check()?; } |