diff options
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/helper.rs | 33 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 101 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 18 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 6 |
4 files changed, 141 insertions, 17 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 0aa7c6aa..eeaf4ffa 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::schema::*; +use crate::replication_mode::ReplicationMode; use crate::rpc_helper::RpcHelper; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] @@ -22,6 +23,7 @@ pub struct LayoutDigest { } pub struct LayoutHelper { + replication_mode: ReplicationMode, layout: Option<LayoutHistory>, // cached values @@ -48,7 +50,23 @@ impl Deref for LayoutHelper { } impl LayoutHelper { - pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self { + pub fn new( + replication_mode: ReplicationMode, + mut layout: LayoutHistory, + mut ack_lock: HashMap<u64, AtomicUsize>, + ) -> Self { + // In the new() function of the helper, we do a bunch of cleanup + // and calculations on the layout history to make sure things are + // correct and we have rapid access to important values such as + // the layout versions to use when reading to ensure consistency. + + if !replication_mode.is_read_after_write_consistent() { + // Fast path for when no consistency is required. + // In this case we only need to keep the last version of the layout, + // we don't care about coordinating stuff in the cluster. + layout.keep_current_version_only(); + } + layout.cleanup_old_versions(); let all_nodes = layout.get_all_nodes(); @@ -68,7 +86,7 @@ impl LayoutHelper { .ack_map .min_among(&all_nodes, min_version); - // sync_map_min is the minimum value of sync_map among all storage nodes + // sync_map_min is the minimum value of sync_map among storage nodes // in the cluster (non-gateway nodes only, current and previous layouts). // It is the highest layout version for which we know that all relevant // storage nodes have fullfilled a sync, and therefore it is safe to @@ -76,11 +94,10 @@ impl LayoutHelper { // Gateway nodes are excluded here because they hold no relevant data // (they store the bucket and access key tables, but we don't have // consistency on those). - // TODO: this value could take quorums into account instead. - let sync_map_min = layout - .update_trackers - .sync_map - .min_among(&all_nongateway_nodes, min_version); + // This value is calculated using quorums to allow progress even + // if not all nodes have successfully completed a sync. + let sync_map_min = + layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -91,6 +108,7 @@ impl LayoutHelper { .or_insert(AtomicUsize::new(0)); LayoutHelper { + replication_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -115,6 +133,7 @@ impl LayoutHelper { let changed = f(&mut self.layout.as_mut().unwrap()); if changed { *self = Self::new( + self.replication_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index c448ac24..a53256cc 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,6 +6,7 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; +use crate::replication_mode::ReplicationMode; impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { @@ -64,6 +65,13 @@ impl LayoutHistory { // ---- housekeeping (all invoked by LayoutHelper) ---- + pub(crate) fn keep_current_version_only(&mut self) { + while self.versions.len() > 1 { + let removed = self.versions.remove(0); + self.old_versions.push(removed); + } + } + pub(crate) fn cleanup_old_versions(&mut self) { // If there are invalid versions before valid versions, remove them if self.versions.len() > 1 && self.current().check().is_ok() { @@ -114,6 +122,99 @@ impl LayoutHistory { } } + pub(crate) fn calculate_sync_map_min_with_quorum( + &self, + replication_mode: ReplicationMode, + all_nongateway_nodes: &[Uuid], + ) -> u64 { + // This function calculates the minimum layout version from which + // it is safe to read if we want to maintain read-after-write consistency. + // In the general case the computation can be a bit expensive so + // we try to optimize it in several ways. + + // If there is only one layout version, we know that's the one + // we need to read from. + if self.versions.len() == 1 { + return self.current().version; + } + + let quorum = replication_mode.write_quorum(); + + let min_version = self.min_stored(); + let global_min = self + .update_trackers + .sync_map + .min_among(&all_nongateway_nodes, min_version); + + // If the write quorums are equal to the total number of nodes, + // i.e. no writes can succeed while they are not written to all nodes, + // then we must in all case wait for all nodes to complete a sync. + // This is represented by reading from the layout with version + // number global_min, the smallest layout version for which all nodes + // have completed a sync. + if quorum == self.current().replication_factor { + return global_min; + } + + // In the general case, we need to look at all write sets for all partitions, + // and find a safe layout version to read for that partition. We then + // take the minimum value among all partition as the safe layout version + // to read in all cases (the layout version to which all reads are directed). + let mut current_min = self.current().version; + let mut sets_done = HashSet::<Vec<Uuid>>::new(); + + for (_, p_hash) in self.current().partitions() { + for v in self.versions.iter() { + if v.version == self.current().version { + // We don't care about whether nodes in the latest layout version + // have completed a sync or not, as the sync is push-only + // and by definition nodes in the latest layout version do not + // hold data that must be pushed to nodes in the latest layout + // version, since that's the same version (any data that's + // already in the latest version is assumed to have been written + // by an operation that ensured a quorum of writes within + // that version). + continue; + } + + // Determine set of nodes for partition p in layout version v. + // Sort the node set to avoid duplicate computations. + let mut set = v + .nodes_of(&p_hash, v.replication_factor) + .collect::<Vec<Uuid>>(); + set.sort(); + + // If this set was already processed, skip it. + if sets_done.contains(&set) { + continue; + } + + // Find the value of the sync update trackers that is the + // highest possible minimum within a quorum of nodes. + let mut sync_values = set + .iter() + .map(|x| self.update_trackers.sync_map.get(x, min_version)) + .collect::<Vec<_>>(); + sync_values.sort(); + let set_min = sync_values[sync_values.len() - quorum]; + if set_min < current_min { + current_min = set_min; + } + // defavorable case, we know we are at the smallest possible version, + // so we can stop early + assert!(current_min >= global_min); + if current_min == global_min { + return current_min; + } + + // Add set to already processed sets + sets_done.insert(set); + } + } + + current_min + } + pub(crate) fn calculate_trackers_hash(&self) -> Hash { blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index dc963ba0..ec8a2a15 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -14,12 +14,13 @@ use garage_util::error::*; use garage_util::persister::Persister; use super::*; +use crate::replication_mode::ReplicationMode; use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_factor: usize, + replication_mode: ReplicationMode, persist_cluster_layout: Persister<LayoutHistory>, layout: Arc<RwLock<LayoutHelper>>, @@ -37,14 +38,16 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc<Endpoint<SystemRpc, System>>, fullmesh: Arc<FullMeshPeeringStrategy>, - replication_factor: usize, + replication_mode: ReplicationMode, ) -> Result<Arc<Self>, Error> { + let replication_factor = replication_mode.replication_factor(); + let persist_cluster_layout: Persister<LayoutHistory> = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.current().replication_factor != replication_factor { + if x.current().replication_factor != replication_mode.replication_factor() { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", x.current().replication_factor, @@ -62,7 +65,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default()); + let mut cluster_layout = + LayoutHelper::new(replication_mode, cluster_layout, Default::default()); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -77,7 +81,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_factor, + replication_mode, persist_cluster_layout, layout, change_notify, @@ -291,11 +295,11 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_factor { + if adv.current().replication_factor != self.replication_mode.replication_factor() { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", adv.current().replication_factor, - self.replication_factor + self.replication_mode.replication_factor() ); error!("{}", msg); return Err(Error::Message(msg)); diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 49e84420..df949906 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -411,13 +411,13 @@ impl UpdateTracker { pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { storage_nodes .iter() - .map(|x| self.0.get(x).copied().unwrap_or(min_version)) + .map(|x| self.get(x, min_version)) .min() .unwrap_or(min_version) } - pub fn get(&self, node: &Uuid) -> u64 { - self.0.get(node).copied().unwrap_or(0) + pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { + self.0.get(node).copied().unwrap_or(min_version) } } |