aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs46
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 2346b14a..1684918e 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -9,6 +9,7 @@ use garage_util::error::*;
use super::schema::*;
use super::*;
+
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
@@ -49,7 +50,7 @@ impl LayoutHistory {
// ------------------ who stores what now? ---------------
pub fn all_ack(&self) -> u64 {
- self.calculate_global_min(&self.update_trackers.ack_map)
+ self.update_trackers.ack_map.current_min
}
pub fn min_stored(&self) -> u64 {
@@ -91,7 +92,7 @@ impl LayoutHistory {
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
- let sync_min = self.calculate_global_min(&self.update_trackers.sync_map);
+ let sync_min = self.update_trackers.sync_map.current_min;
let version = self
.versions
.iter()
@@ -122,7 +123,7 @@ impl LayoutHistory {
// ------------------ update tracking ---------------
- pub(crate) fn update_trackers(&mut self, node_id: Uuid) {
+ pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version in the history
@@ -138,6 +139,9 @@ impl LayoutHistory {
// 4. Cleanup layout versions that are not needed anymore
self.cleanup_old_versions();
+ // 5. Recalculate global minima
+ self.update_trackers_min();
+
info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
@@ -146,42 +150,41 @@ impl LayoutHistory {
self.update_hashes();
}
+ fn update_trackers_min(&mut self) {
+ // TODO: for TableFullReplication, counting gateway nodes might be
+ // necessary? Think about this more.
+ let storage_nodes = self.all_nongateway_nodes().into_owned();
+ let min_version = self.versions.first().unwrap().version;
+ self.update_trackers.update_min(&storage_nodes, min_version);
+ }
+
pub(crate) fn ack_last(&mut self, node: Uuid) {
let last_version = self.current().version;
self.update_trackers.ack_map.set_max(node, last_version);
+ self.update_trackers_min();
}
pub(crate) fn sync_first(&mut self, node: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version;
self.update_trackers.sync_map.set_max(node, first_version);
+ self.update_trackers_min();
}
pub(crate) fn sync_ack(&mut self, node: Uuid) {
- self.update_trackers.sync_ack_map.set_max(
- node,
- self.calculate_global_min(&self.update_trackers.sync_map),
- );
+ self.update_trackers
+ .sync_ack_map
+ .set_max(node, self.update_trackers.sync_map.current_min);
+ self.update_trackers_min();
}
pub(crate) fn cleanup_old_versions(&mut self) {
- let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map);
+ let min_sync_ack = self.update_trackers.sync_ack_map.current_min;
while self.versions.first().as_ref().unwrap().version < min_sync_ack {
let removed = self.versions.remove(0);
info!("Layout history: pruning old version {}", removed.version);
}
}
- pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
- // TODO: for TableFullReplication, counting gateway nodes might be
- // necessary? Think about this more.
- let storage_nodes = self.all_nongateway_nodes();
- storage_nodes
- .iter()
- .map(|x| tracker.0.get(x).copied().unwrap_or(0))
- .min()
- .unwrap_or(0)
- }
-
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@@ -229,6 +232,11 @@ impl LayoutHistory {
}
}
+ // Update the current_min value in trackers if anything changed
+ if changed {
+ self.update_trackers_min();
+ }
+
// Merge staged layout changes
if self.staging != other.staging {
self.staging.merge(&other.staging);