aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/helper.rs18
-rw-r--r--src/rpc/layout/history.rs10
-rw-r--r--src/rpc/layout/manager.rs27
-rw-r--r--src/rpc/layout/test.rs3
4 files changed, 33 insertions, 25 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 9fb738ea..2835347a 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct RpcLayoutDigest {
@@ -29,7 +29,8 @@ pub struct SyncLayoutDigest {
}
pub struct LayoutHelper {
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
layout: Option<LayoutHistory>,
// cached values
@@ -57,7 +58,8 @@ impl Deref for LayoutHelper {
impl LayoutHelper {
pub fn new(
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
mut layout: LayoutHistory,
mut ack_lock: HashMap<u64, AtomicUsize>,
) -> Self {
@@ -66,7 +68,7 @@ impl LayoutHelper {
// correct and we have rapid access to important values such as
// the layout versions to use when reading to ensure consistency.
- if !replication_mode.is_read_after_write_consistent() {
+ if consistency_mode != ConsistencyMode::Consistent {
// Fast path for when no consistency is required.
// In this case we only need to keep the last version of the layout,
// we don't care about coordinating stuff in the cluster.
@@ -103,7 +105,7 @@ impl LayoutHelper {
// This value is calculated using quorums to allow progress even
// if not all nodes have successfully completed a sync.
let sync_map_min =
- layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes);
+ layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
@@ -114,7 +116,8 @@ impl LayoutHelper {
.or_insert(AtomicUsize::new(0));
LayoutHelper {
- replication_mode,
+ replication_factor,
+ consistency_mode,
layout: Some(layout),
ack_map_min,
sync_map_min,
@@ -139,7 +142,8 @@ impl LayoutHelper {
let changed = f(self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
- self.replication_mode,
+ self.replication_factor,
+ self.consistency_mode,
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index b8cc27da..290f058d 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
impl LayoutHistory {
- pub fn new(replication_factor: usize) -> Self {
- let version = LayoutVersion::new(replication_factor);
+ pub fn new(replication_factor: ReplicationFactor) -> Self {
+ let version = LayoutVersion::new(replication_factor.into());
let staging = LayoutStaging {
parameters: Lww::<LayoutParameters>::new(version.parameters),
@@ -119,7 +119,7 @@ impl LayoutHistory {
pub(crate) fn calculate_sync_map_min_with_quorum(
&self,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
all_nongateway_nodes: &[Uuid],
) -> u64 {
// This function calculates the minimum layout version from which
@@ -133,7 +133,7 @@ impl LayoutHistory {
return self.current().version;
}
- let quorum = replication_mode.write_quorum();
+ let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent);
let min_version = self.min_stored();
let global_min = self
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 0b6c7e63..8a6eb1c3 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -14,13 +14,13 @@ use garage_util::error::*;
use garage_util::persister::Persister;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
node_id: Uuid,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
persist_cluster_layout: Persister<LayoutHistory>,
layout: Arc<RwLock<LayoutHelper>>,
@@ -38,20 +38,19 @@ impl LayoutManager {
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
peering: Arc<PeeringManager>,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
- if x.current().replication_factor != replication_mode.replication_factor() {
+ if x.current().replication_factor != replication_factor.replication_factor() {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
- replication_factor
+ replication_factor.replication_factor()
)));
}
x
@@ -65,8 +64,12 @@ impl LayoutManager {
}
};
- let mut cluster_layout =
- LayoutHelper::new(replication_mode, cluster_layout, Default::default());
+ let mut cluster_layout = LayoutHelper::new(
+ replication_factor,
+ consistency_mode,
+ cluster_layout,
+ Default::default(),
+ );
cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -81,7 +84,7 @@ impl LayoutManager {
Ok(Arc::new(Self {
node_id: node_id.into(),
- replication_mode,
+ replication_factor,
persist_cluster_layout,
layout,
change_notify,
@@ -295,11 +298,11 @@ impl LayoutManager {
adv.update_trackers
);
- if adv.current().replication_factor != self.replication_mode.replication_factor() {
+ if adv.current().replication_factor != self.replication_factor.replication_factor() {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
- self.replication_mode.replication_factor()
+ self.replication_factor.replication_factor()
);
error!("{}", msg);
return Err(Error::Message(msg));
diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs
index 88eb518e..fcbb9dfc 100644
--- a/src/rpc/layout/test.rs
+++ b/src/rpc/layout/test.rs
@@ -5,6 +5,7 @@ use garage_util::crdt::Crdt;
use garage_util::error::*;
use crate::layout::*;
+use crate::replication_mode::ReplicationFactor;
// This function checks that the partition size S computed is at least better than the
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
@@ -120,7 +121,7 @@ fn test_assignment() {
let mut node_capacity_vec = vec![4000, 1000, 2000];
let mut node_zone_vec = vec!["A", "B", "C"];
- let mut cl = LayoutHistory::new(3);
+ let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap());
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();