aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs74
1 files changed, 69 insertions, 5 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 357b9d62..347f03db 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,3 +1,5 @@
+use std::collections::HashSet;
+
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
@@ -30,6 +32,14 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
+ pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
+ self.versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>()
+ }
+
pub(crate) fn update_hashes(&mut self) {
self.trackers_hash = self.calculate_trackers_hash();
self.staging_hash = self.calculate_staging_hash();
@@ -43,6 +53,65 @@ impl LayoutHistory {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
+ // ------------------ update tracking ---------------
+
+ pub(crate) fn update_trackers(&mut self, node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version in the history
+ self.ack_last(node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(node_id);
+
+ // 4. Cleanup layout versions that are not needed anymore
+ self.cleanup_old_versions();
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+
+ // Finally, update hashes
+ self.update_hashes();
+ }
+
+ pub(crate) fn ack_last(&mut self, node: Uuid) {
+ let last_version = self.current().version;
+ self.update_trackers.ack_map.set_max(node, last_version);
+ }
+
+ pub(crate) fn sync_first(&mut self, node: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update_trackers.sync_map.set_max(node, first_version);
+ }
+
+ pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ self.update_trackers.sync_ack_map.set_max(
+ node,
+ self.calculate_global_min(&self.update_trackers.sync_map),
+ );
+ }
+
+ pub(crate) fn cleanup_old_versions(&mut self) {
+ let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map);
+ while self.versions.first().as_ref().unwrap().version < min_sync_ack {
+ self.versions.remove(0);
+ }
+ }
+
+ pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
+ let storage_nodes = self.all_storage_nodes();
+ storage_nodes
+ .iter()
+ .map(|x| tracker.0.get(x).copied().unwrap_or(0))
+ .min()
+ .unwrap_or(0)
+ }
+
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@@ -78,11 +147,6 @@ impl LayoutHistory {
changed = true;
}
- // Update hashes if there are changes
- if changed {
- self.update_hashes();
- }
-
changed
}