diff options
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r-- | src/rpc/layout/history.rs | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 2346b14a..1684918e 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -9,6 +9,7 @@ use garage_util::error::*; use super::schema::*; use super::*; + impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -49,7 +50,7 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- pub fn all_ack(&self) -> u64 { - self.calculate_global_min(&self.update_trackers.ack_map) + self.update_trackers.ack_map.current_min } pub fn min_stored(&self) -> u64 { @@ -91,7 +92,7 @@ impl LayoutHistory { } pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { - let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let sync_min = self.update_trackers.sync_map.current_min; let version = self .versions .iter() @@ -122,7 +123,7 @@ impl LayoutHistory { // ------------------ update tracking --------------- - pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version in the history @@ -138,6 +139,9 @@ impl LayoutHistory { // 4. Cleanup layout versions that are not needed anymore self.cleanup_old_versions(); + // 5. Recalculate global minima + self.update_trackers_min(); + info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); @@ -146,42 +150,41 @@ impl LayoutHistory { self.update_hashes(); } + fn update_trackers_min(&mut self) { + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes().into_owned(); + let min_version = self.versions.first().unwrap().version; + self.update_trackers.update_min(&storage_nodes, min_version); + } + pub(crate) fn ack_last(&mut self, node: Uuid) { let last_version = self.current().version; self.update_trackers.ack_map.set_max(node, last_version); + self.update_trackers_min(); } pub(crate) fn sync_first(&mut self, node: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; self.update_trackers.sync_map.set_max(node, first_version); + self.update_trackers_min(); } pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers.sync_ack_map.set_max( - node, - self.calculate_global_min(&self.update_trackers.sync_map), - ); + self.update_trackers + .sync_ack_map + .set_max(node, self.update_trackers.sync_map.current_min); + self.update_trackers_min(); } pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + let min_sync_ack = self.update_trackers.sync_ack_map.current_min; while self.versions.first().as_ref().unwrap().version < min_sync_ack { let removed = self.versions.remove(0); info!("Layout history: pruning old version {}", removed.version); } } - pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes(); - storage_nodes - .iter() - .map(|x| tracker.0.get(x).copied().unwrap_or(0)) - .min() - .unwrap_or(0) - } - // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -229,6 +232,11 @@ impl LayoutHistory { } } + // Update the current_min value in trackers if anything changed + if changed { + self.update_trackers_min(); + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); |