aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/helper.rs33
-rw-r--r--src/rpc/layout/history.rs101
-rw-r--r--src/rpc/layout/manager.rs18
-rw-r--r--src/rpc/layout/schema.rs6
-rw-r--r--src/rpc/replication_mode.rs7
-rw-r--r--src/rpc/system.rs2
6 files changed, 149 insertions, 18 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 0aa7c6aa..eeaf4ffa 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
use super::schema::*;
+use crate::replication_mode::ReplicationMode;
use crate::rpc_helper::RpcHelper;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
@@ -22,6 +23,7 @@ pub struct LayoutDigest {
}
pub struct LayoutHelper {
+ replication_mode: ReplicationMode,
layout: Option<LayoutHistory>,
// cached values
@@ -48,7 +50,23 @@ impl Deref for LayoutHelper {
}
impl LayoutHelper {
- pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
+ pub fn new(
+ replication_mode: ReplicationMode,
+ mut layout: LayoutHistory,
+ mut ack_lock: HashMap<u64, AtomicUsize>,
+ ) -> Self {
+ // In the new() function of the helper, we do a bunch of cleanup
+ // and calculations on the layout history to make sure things are
+ // correct and we have rapid access to important values such as
+ // the layout versions to use when reading to ensure consistency.
+
+ if !replication_mode.is_read_after_write_consistent() {
+ // Fast path for when no consistency is required.
+ // In this case we only need to keep the last version of the layout,
+ // we don't care about coordinating stuff in the cluster.
+ layout.keep_current_version_only();
+ }
+
layout.cleanup_old_versions();
let all_nodes = layout.get_all_nodes();
@@ -68,7 +86,7 @@ impl LayoutHelper {
.ack_map
.min_among(&all_nodes, min_version);
- // sync_map_min is the minimum value of sync_map among all storage nodes
+ // sync_map_min is the minimum value of sync_map among storage nodes
// in the cluster (non-gateway nodes only, current and previous layouts).
// It is the highest layout version for which we know that all relevant
// storage nodes have fullfilled a sync, and therefore it is safe to
@@ -76,11 +94,10 @@ impl LayoutHelper {
// Gateway nodes are excluded here because they hold no relevant data
// (they store the bucket and access key tables, but we don't have
// consistency on those).
- // TODO: this value could take quorums into account instead.
- let sync_map_min = layout
- .update_trackers
- .sync_map
- .min_among(&all_nongateway_nodes, min_version);
+ // This value is calculated using quorums to allow progress even
+ // if not all nodes have successfully completed a sync.
+ let sync_map_min =
+ layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
@@ -91,6 +108,7 @@ impl LayoutHelper {
.or_insert(AtomicUsize::new(0));
LayoutHelper {
+ replication_mode,
layout: Some(layout),
ack_map_min,
sync_map_min,
@@ -115,6 +133,7 @@ impl LayoutHelper {
let changed = f(&mut self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
+ self.replication_mode,
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index c448ac24..a53256cc 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -6,6 +6,7 @@ use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::*;
+use crate::replication_mode::ReplicationMode;
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
@@ -64,6 +65,13 @@ impl LayoutHistory {
// ---- housekeeping (all invoked by LayoutHelper) ----
+ pub(crate) fn keep_current_version_only(&mut self) {
+ while self.versions.len() > 1 {
+ let removed = self.versions.remove(0);
+ self.old_versions.push(removed);
+ }
+ }
+
pub(crate) fn cleanup_old_versions(&mut self) {
// If there are invalid versions before valid versions, remove them
if self.versions.len() > 1 && self.current().check().is_ok() {
@@ -114,6 +122,99 @@ impl LayoutHistory {
}
}
+ pub(crate) fn calculate_sync_map_min_with_quorum(
+ &self,
+ replication_mode: ReplicationMode,
+ all_nongateway_nodes: &[Uuid],
+ ) -> u64 {
+ // This function calculates the minimum layout version from which
+ // it is safe to read if we want to maintain read-after-write consistency.
+ // In the general case the computation can be a bit expensive so
+ // we try to optimize it in several ways.
+
+ // If there is only one layout version, we know that's the one
+ // we need to read from.
+ if self.versions.len() == 1 {
+ return self.current().version;
+ }
+
+ let quorum = replication_mode.write_quorum();
+
+ let min_version = self.min_stored();
+ let global_min = self
+ .update_trackers
+ .sync_map
+ .min_among(&all_nongateway_nodes, min_version);
+
+ // If the write quorums are equal to the total number of nodes,
+ // i.e. no writes can succeed while they are not written to all nodes,
+ // then we must in all case wait for all nodes to complete a sync.
+ // This is represented by reading from the layout with version
+ // number global_min, the smallest layout version for which all nodes
+ // have completed a sync.
+ if quorum == self.current().replication_factor {
+ return global_min;
+ }
+
+ // In the general case, we need to look at all write sets for all partitions,
+ // and find a safe layout version to read for that partition. We then
+ // take the minimum value among all partition as the safe layout version
+ // to read in all cases (the layout version to which all reads are directed).
+ let mut current_min = self.current().version;
+ let mut sets_done = HashSet::<Vec<Uuid>>::new();
+
+ for (_, p_hash) in self.current().partitions() {
+ for v in self.versions.iter() {
+ if v.version == self.current().version {
+ // We don't care about whether nodes in the latest layout version
+ // have completed a sync or not, as the sync is push-only
+ // and by definition nodes in the latest layout version do not
+ // hold data that must be pushed to nodes in the latest layout
+ // version, since that's the same version (any data that's
+ // already in the latest version is assumed to have been written
+ // by an operation that ensured a quorum of writes within
+ // that version).
+ continue;
+ }
+
+ // Determine set of nodes for partition p in layout version v.
+ // Sort the node set to avoid duplicate computations.
+ let mut set = v
+ .nodes_of(&p_hash, v.replication_factor)
+ .collect::<Vec<Uuid>>();
+ set.sort();
+
+ // If this set was already processed, skip it.
+ if sets_done.contains(&set) {
+ continue;
+ }
+
+ // Find the value of the sync update trackers that is the
+ // highest possible minimum within a quorum of nodes.
+ let mut sync_values = set
+ .iter()
+ .map(|x| self.update_trackers.sync_map.get(x, min_version))
+ .collect::<Vec<_>>();
+ sync_values.sort();
+ let set_min = sync_values[sync_values.len() - quorum];
+ if set_min < current_min {
+ current_min = set_min;
+ }
+ // defavorable case, we know we are at the smallest possible version,
+ // so we can stop early
+ assert!(current_min >= global_min);
+ if current_min == global_min {
+ return current_min;
+ }
+
+ // Add set to already processed sets
+ sets_done.insert(set);
+ }
+ }
+
+ current_min
+ }
+
pub(crate) fn calculate_trackers_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index dc963ba0..ec8a2a15 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -14,12 +14,13 @@ use garage_util::error::*;
use garage_util::persister::Persister;
use super::*;
+use crate::replication_mode::ReplicationMode;
use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
node_id: Uuid,
- replication_factor: usize,
+ replication_mode: ReplicationMode,
persist_cluster_layout: Persister<LayoutHistory>,
layout: Arc<RwLock<LayoutHelper>>,
@@ -37,14 +38,16 @@ impl LayoutManager {
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
fullmesh: Arc<FullMeshPeeringStrategy>,
- replication_factor: usize,
+ replication_mode: ReplicationMode,
) -> Result<Arc<Self>, Error> {
+ let replication_factor = replication_mode.replication_factor();
+
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
- if x.current().replication_factor != replication_factor {
+ if x.current().replication_factor != replication_mode.replication_factor() {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
@@ -62,7 +65,8 @@ impl LayoutManager {
}
};
- let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default());
+ let mut cluster_layout =
+ LayoutHelper::new(replication_mode, cluster_layout, Default::default());
cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -77,7 +81,7 @@ impl LayoutManager {
Ok(Arc::new(Self {
node_id: node_id.into(),
- replication_factor,
+ replication_mode,
persist_cluster_layout,
layout,
change_notify,
@@ -291,11 +295,11 @@ impl LayoutManager {
adv.update_trackers
);
- if adv.current().replication_factor != self.replication_factor {
+ if adv.current().replication_factor != self.replication_mode.replication_factor() {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
- self.replication_factor
+ self.replication_mode.replication_factor()
);
error!("{}", msg);
return Err(Error::Message(msg));
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index 49e84420..df949906 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -411,13 +411,13 @@ impl UpdateTracker {
pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
storage_nodes
.iter()
- .map(|x| self.0.get(x).copied().unwrap_or(min_version))
+ .map(|x| self.get(x, min_version))
.min()
.unwrap_or(min_version)
}
- pub fn get(&self, node: &Uuid) -> u64 {
- self.0.get(node).copied().unwrap_or(0)
+ pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
+ self.0.get(node).copied().unwrap_or(min_version)
}
}
diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs
index e244e063..2f7e2fec 100644
--- a/src/rpc/replication_mode.rs
+++ b/src/rpc/replication_mode.rs
@@ -54,4 +54,11 @@ impl ReplicationMode {
Self::ThreeWayDangerous => 1,
}
}
+
+ pub fn is_read_after_write_consistent(&self) -> bool {
+ match self {
+ Self::None | Self::TwoWay | Self::ThreeWay => true,
+ _ => false,
+ }
+ }
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index be4aefa2..81a47ff3 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -280,7 +280,7 @@ impl System {
netapp.id,
system_endpoint.clone(),
fullmesh.clone(),
- replication_factor,
+ replication_mode,
)?;
// ---- set up metrics and status exchange ----