aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/history.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/history.rs')
-rw-r--r--src/rpc/layout/history.rs101
1 files changed, 101 insertions, 0 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index c448ac24..a53256cc 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -6,6 +6,7 @@ use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::*;
+use crate::replication_mode::ReplicationMode;
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
@@ -64,6 +65,13 @@ impl LayoutHistory {
// ---- housekeeping (all invoked by LayoutHelper) ----
+ pub(crate) fn keep_current_version_only(&mut self) {
+ while self.versions.len() > 1 {
+ let removed = self.versions.remove(0);
+ self.old_versions.push(removed);
+ }
+ }
+
pub(crate) fn cleanup_old_versions(&mut self) {
// If there are invalid versions before valid versions, remove them
if self.versions.len() > 1 && self.current().check().is_ok() {
@@ -114,6 +122,99 @@ impl LayoutHistory {
}
}
+ pub(crate) fn calculate_sync_map_min_with_quorum(
+ &self,
+ replication_mode: ReplicationMode,
+ all_nongateway_nodes: &[Uuid],
+ ) -> u64 {
+ // This function calculates the minimum layout version from which
+ // it is safe to read if we want to maintain read-after-write consistency.
+ // In the general case the computation can be a bit expensive so
+ // we try to optimize it in several ways.
+
+ // If there is only one layout version, we know that's the one
+ // we need to read from.
+ if self.versions.len() == 1 {
+ return self.current().version;
+ }
+
+ let quorum = replication_mode.write_quorum();
+
+ let min_version = self.min_stored();
+ let global_min = self
+ .update_trackers
+ .sync_map
+ .min_among(&all_nongateway_nodes, min_version);
+
+ // If the write quorums are equal to the total number of nodes,
+ // i.e. no writes can succeed while they are not written to all nodes,
+ // then we must in all case wait for all nodes to complete a sync.
+ // This is represented by reading from the layout with version
+ // number global_min, the smallest layout version for which all nodes
+ // have completed a sync.
+ if quorum == self.current().replication_factor {
+ return global_min;
+ }
+
+ // In the general case, we need to look at all write sets for all partitions,
+ // and find a safe layout version to read for that partition. We then
+ // take the minimum value among all partition as the safe layout version
+ // to read in all cases (the layout version to which all reads are directed).
+ let mut current_min = self.current().version;
+ let mut sets_done = HashSet::<Vec<Uuid>>::new();
+
+ for (_, p_hash) in self.current().partitions() {
+ for v in self.versions.iter() {
+ if v.version == self.current().version {
+ // We don't care about whether nodes in the latest layout version
+ // have completed a sync or not, as the sync is push-only
+ // and by definition nodes in the latest layout version do not
+ // hold data that must be pushed to nodes in the latest layout
+ // version, since that's the same version (any data that's
+ // already in the latest version is assumed to have been written
+ // by an operation that ensured a quorum of writes within
+ // that version).
+ continue;
+ }
+
+ // Determine set of nodes for partition p in layout version v.
+ // Sort the node set to avoid duplicate computations.
+ let mut set = v
+ .nodes_of(&p_hash, v.replication_factor)
+ .collect::<Vec<Uuid>>();
+ set.sort();
+
+ // If this set was already processed, skip it.
+ if sets_done.contains(&set) {
+ continue;
+ }
+
+ // Find the value of the sync update trackers that is the
+ // highest possible minimum within a quorum of nodes.
+ let mut sync_values = set
+ .iter()
+ .map(|x| self.update_trackers.sync_map.get(x, min_version))
+ .collect::<Vec<_>>();
+ sync_values.sort();
+ let set_min = sync_values[sync_values.len() - quorum];
+ if set_min < current_min {
+ current_min = set_min;
+ }
+ // defavorable case, we know we are at the smallest possible version,
+ // so we can stop early
+ assert!(current_min >= global_min);
+ if current_min == global_min {
+ return current_min;
+ }
+
+ // Add set to already processed sets
+ sets_done.insert(set);
+ }
+ }
+
+ current_min
+ }
+
pub(crate) fn calculate_trackers_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
}