diff options
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/helper.rs | 18 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 10 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 27 | ||||
-rw-r--r-- | src/rpc/layout/test.rs | 3 |
4 files changed, 33 insertions, 25 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 9fb738ea..2835347a 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct RpcLayoutDigest { @@ -29,7 +29,8 @@ pub struct SyncLayoutDigest { } pub struct LayoutHelper { - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, layout: Option<LayoutHistory>, // cached values @@ -57,7 +58,8 @@ impl Deref for LayoutHelper { impl LayoutHelper { pub fn new( - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>, ) -> Self { @@ -66,7 +68,7 @@ impl LayoutHelper { // correct and we have rapid access to important values such as // the layout versions to use when reading to ensure consistency. - if !replication_mode.is_read_after_write_consistent() { + if consistency_mode != ConsistencyMode::Consistent { // Fast path for when no consistency is required. // In this case we only need to keep the last version of the layout, // we don't care about coordinating stuff in the cluster. @@ -103,7 +105,7 @@ impl LayoutHelper { // This value is calculated using quorums to allow progress even // if not all nodes have successfully completed a sync. let sync_map_min = - layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); + layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -114,7 +116,8 @@ impl LayoutHelper { .or_insert(AtomicUsize::new(0)); LayoutHelper { - replication_mode, + replication_factor, + consistency_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -139,7 +142,8 @@ impl LayoutHelper { let changed = f(self.layout.as_mut().unwrap()); if changed { *self = Self::new( - self.replication_mode, + self.replication_factor, + self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b8cc27da..290f058d 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + pub fn new(replication_factor: ReplicationFactor) -> Self { + let version = LayoutVersion::new(replication_factor.into()); let staging = LayoutStaging { parameters: Lww::<LayoutParameters>::new(version.parameters), @@ -119,7 +119,7 @@ impl LayoutHistory { pub(crate) fn calculate_sync_map_min_with_quorum( &self, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, all_nongateway_nodes: &[Uuid], ) -> u64 { // This function calculates the minimum layout version from which @@ -133,7 +133,7 @@ impl LayoutHistory { return self.current().version; } - let quorum = replication_mode.write_quorum(); + let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); let min_version = self.min_stored(); let global_min = self diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 0b6c7e63..8a6eb1c3 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -14,13 +14,13 @@ use garage_util::error::*; use garage_util::persister::Persister; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, persist_cluster_layout: Persister<LayoutHistory>, layout: Arc<RwLock<LayoutHelper>>, @@ -38,20 +38,19 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc<Endpoint<SystemRpc, System>>, peering: Arc<PeeringManager>, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, ) -> Result<Arc<Self>, Error> { - let replication_factor = replication_mode.replication_factor(); - let persist_cluster_layout: Persister<LayoutHistory> = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.current().replication_factor != replication_mode.replication_factor() { + if x.current().replication_factor != replication_factor.replication_factor() { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", x.current().replication_factor, - replication_factor + replication_factor.replication_factor() ))); } x @@ -65,8 +64,12 @@ impl LayoutManager { } }; - let mut cluster_layout = - LayoutHelper::new(replication_mode, cluster_layout, Default::default()); + let mut cluster_layout = LayoutHelper::new( + replication_factor, + consistency_mode, + cluster_layout, + Default::default(), + ); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -81,7 +84,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_mode, + replication_factor, persist_cluster_layout, layout, change_notify, @@ -295,11 +298,11 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_mode.replication_factor() { + if adv.current().replication_factor != self.replication_factor.replication_factor() { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", adv.current().replication_factor, - self.replication_mode.replication_factor() + self.replication_factor.replication_factor() ); error!("{}", msg); return Err(Error::Message(msg)); diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index 88eb518e..fcbb9dfc 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -5,6 +5,7 @@ use garage_util::crdt::Crdt; use garage_util::error::*; use crate::layout::*; +use crate::replication_mode::ReplicationFactor; // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm @@ -120,7 +121,7 @@ fn test_assignment() { let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"]; - let mut cl = LayoutHistory::new(3); + let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); |