diff options
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r-- | src/rpc/layout/history.rs | 74 |
1 files changed, 69 insertions, 5 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 357b9d62..347f03db 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -30,6 +32,14 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub fn all_storage_nodes(&self) -> HashSet<Uuid> { + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::<HashSet<_>>() + } + pub(crate) fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -43,6 +53,65 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ update tracking --------------- + + pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version in the history + self.ack_last(node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(node_id); + + // 4. Cleanup layout versions that are not needed anymore + self.cleanup_old_versions(); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + + // Finally, update hashes + self.update_hashes(); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update_trackers.ack_map.set_max(node, last_version); + } + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update_trackers.sync_map.set_max(node, first_version); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + self.update_trackers.sync_ack_map.set_max( + node, + self.calculate_global_min(&self.update_trackers.sync_map), + ); + } + + pub(crate) fn cleanup_old_versions(&mut self) { + let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + while self.versions.first().as_ref().unwrap().version < min_sync_ack { + self.versions.remove(0); + } + } + + pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { + let storage_nodes = self.all_storage_nodes(); + storage_nodes + .iter() + .map(|x| tracker.0.get(x).copied().unwrap_or(0)) + .min() + .unwrap_or(0) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -78,11 +147,6 @@ impl LayoutHistory { changed = true; } - // Update hashes if there are changes - if changed { - self.update_hashes(); - } - changed } |