diff options
author | Alex <alex@adnab.me> | 2024-03-07 16:32:52 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-03-07 16:32:52 +0000 |
commit | 20c0b4ffb2ae8e250068f8bf8001b5811a6bb6f2 (patch) | |
tree | ee7ab2f5eaf862927ef87d43e661557906742cc5 /src/rpc | |
parent | 2fd13c7d135949a83ed52ed81672ac7e1956f134 (diff) | |
parent | c1769bbe69f723fb3980cf4fdac7615cfb782720 (diff) | |
download | garage-20c0b4ffb2ae8e250068f8bf8001b5811a6bb6f2.tar.gz garage-20c0b4ffb2ae8e250068f8bf8001b5811a6bb6f2.zip |
Merge pull request 'ReplicationMode -> ConsistencyMode+ReplicationFactor' (#750) from yuka/garage:split-consistency-mode into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/750
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout/helper.rs | 18 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 10 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 27 | ||||
-rw-r--r-- | src/rpc/layout/test.rs | 3 | ||||
-rw-r--r-- | src/rpc/replication_mode.rs | 119 | ||||
-rw-r--r-- | src/rpc/system.rs | 22 | ||||
-rw-r--r-- | src/rpc/system_metrics.rs | 2 |
7 files changed, 123 insertions, 78 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 9fb738ea..2835347a 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct RpcLayoutDigest { @@ -29,7 +29,8 @@ pub struct SyncLayoutDigest { } pub struct LayoutHelper { - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, layout: Option<LayoutHistory>, // cached values @@ -57,7 +58,8 @@ impl Deref for LayoutHelper { impl LayoutHelper { pub fn new( - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>, ) -> Self { @@ -66,7 +68,7 @@ impl LayoutHelper { // correct and we have rapid access to important values such as // the layout versions to use when reading to ensure consistency. - if !replication_mode.is_read_after_write_consistent() { + if consistency_mode != ConsistencyMode::Consistent { // Fast path for when no consistency is required. // In this case we only need to keep the last version of the layout, // we don't care about coordinating stuff in the cluster. @@ -103,7 +105,7 @@ impl LayoutHelper { // This value is calculated using quorums to allow progress even // if not all nodes have successfully completed a sync. let sync_map_min = - layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); + layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -114,7 +116,8 @@ impl LayoutHelper { .or_insert(AtomicUsize::new(0)); LayoutHelper { - replication_mode, + replication_factor, + consistency_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -139,7 +142,8 @@ impl LayoutHelper { let changed = f(self.layout.as_mut().unwrap()); if changed { *self = Self::new( - self.replication_mode, + self.replication_factor, + self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b8cc27da..290f058d 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + pub fn new(replication_factor: ReplicationFactor) -> Self { + let version = LayoutVersion::new(replication_factor.into()); let staging = LayoutStaging { parameters: Lww::<LayoutParameters>::new(version.parameters), @@ -119,7 +119,7 @@ impl LayoutHistory { pub(crate) fn calculate_sync_map_min_with_quorum( &self, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, all_nongateway_nodes: &[Uuid], ) -> u64 { // This function calculates the minimum layout version from which @@ -133,7 +133,7 @@ impl LayoutHistory { return self.current().version; } - let quorum = replication_mode.write_quorum(); + let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); let min_version = self.min_stored(); let global_min = self diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 0b6c7e63..8a6eb1c3 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -14,13 +14,13 @@ use garage_util::error::*; use garage_util::persister::Persister; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, persist_cluster_layout: Persister<LayoutHistory>, layout: Arc<RwLock<LayoutHelper>>, @@ -38,20 +38,19 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc<Endpoint<SystemRpc, System>>, peering: Arc<PeeringManager>, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, ) -> Result<Arc<Self>, Error> { - let replication_factor = replication_mode.replication_factor(); - let persist_cluster_layout: Persister<LayoutHistory> = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.current().replication_factor != replication_mode.replication_factor() { + if x.current().replication_factor != replication_factor.replication_factor() { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", x.current().replication_factor, - replication_factor + replication_factor.replication_factor() ))); } x @@ -65,8 +64,12 @@ impl LayoutManager { } }; - let mut cluster_layout = - LayoutHelper::new(replication_mode, cluster_layout, Default::default()); + let mut cluster_layout = LayoutHelper::new( + replication_factor, + consistency_mode, + cluster_layout, + Default::default(), + ); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -81,7 +84,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_mode, + replication_factor, persist_cluster_layout, layout, change_notify, @@ -295,11 +298,11 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_mode.replication_factor() { + if adv.current().replication_factor != self.replication_factor.replication_factor() { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", adv.current().replication_factor, - self.replication_mode.replication_factor() + self.replication_factor.replication_factor() ); error!("{}", msg); return Err(Error::Message(msg)); diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index 88eb518e..fcbb9dfc 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -5,6 +5,7 @@ use garage_util::crdt::Crdt; use garage_util::error::*; use crate::layout::*; +use crate::replication_mode::ReplicationFactor; // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm @@ -120,7 +121,7 @@ fn test_assignment() { let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"]; - let mut cl = LayoutHistory::new(3); + let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs index b142ea10..a3a94085 100644 --- a/src/rpc/replication_mode.rs +++ b/src/rpc/replication_mode.rs @@ -1,57 +1,94 @@ -#[derive(Clone, Copy)] -pub enum ReplicationMode { - None, - TwoWay, - TwoWayDangerous, - ThreeWay, - ThreeWayDegraded, - ThreeWayDangerous, +use garage_util::config::Config; +use garage_util::crdt::AutoCrdt; +use garage_util::error::*; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ReplicationFactor(usize); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ConsistencyMode { + /// Read- and Write-quorum are 1 + Dangerous, + /// Read-quorum is 1 + Degraded, + /// Read- and Write-quorum are determined for read-after-write-consistency + #[default] + Consistent, +} + +impl ConsistencyMode { + pub fn parse(s: &str) -> Option<Self> { + serde_json::from_value(serde_json::Value::String(s.to_string())).ok() + } +} + +impl AutoCrdt for ConsistencyMode { + const WARN_IF_DIFFERENT: bool = true; } -impl ReplicationMode { - pub fn parse(v: &str) -> Option<Self> { - match v { - "none" | "1" => Some(Self::None), - "2" => Some(Self::TwoWay), - "2-dangerous" => Some(Self::TwoWayDangerous), - "3" => Some(Self::ThreeWay), - "3-degraded" => Some(Self::ThreeWayDegraded), - "3-dangerous" => Some(Self::ThreeWayDangerous), - _ => None, +impl ReplicationFactor { + pub fn new(replication_factor: usize) -> Option<Self> { + if replication_factor < 1 { + None + } else { + Some(Self(replication_factor)) } } pub fn replication_factor(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay | Self::TwoWayDangerous => 2, - Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3, - } + self.0 } - pub fn read_quorum(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay | Self::TwoWayDangerous => 1, - Self::ThreeWay => 2, - Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1, + pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1, + ConsistencyMode::Consistent => self.replication_factor().div_ceil(2), } } - pub fn write_quorum(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay => 2, - Self::TwoWayDangerous => 1, - Self::ThreeWay | Self::ThreeWayDegraded => 2, - Self::ThreeWayDangerous => 1, + pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous => 1, + ConsistencyMode::Degraded | ConsistencyMode::Consistent => { + (self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent) + } } } +} - pub fn is_read_after_write_consistent(&self) -> bool { - match self { - Self::None | Self::TwoWay | Self::ThreeWay => true, - _ => false, - } +impl std::convert::From<ReplicationFactor> for usize { + fn from(replication_factor: ReplicationFactor) -> usize { + replication_factor.0 } } + +pub fn parse_replication_mode( + config: &Config, +) -> Result<(ReplicationFactor, ConsistencyMode), Error> { + match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) { + (Some(replication_mode), None, "consistent") => { + tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode"); + let parsed_replication_mode = match replication_mode.as_str() { + "1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)), + "2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)), + "2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)), + "3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)), + "3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)), + "3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)), + _ => None, + }; + Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?) + }, + (None, Some(replication_factor), consistency_mode) => { + let replication_factor = ReplicationFactor::new(replication_factor) + .ok_or_message("Invalid replication_factor in config file.")?; + let consistency_mode = ConsistencyMode::parse(consistency_mode) + .ok_or_message("Invalid consistency_mode in config file.")?; + Some((replication_factor, consistency_mode)) + } + _ => None, + }.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.") +} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1c668306..54d589d2 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -112,8 +112,7 @@ pub struct System { metrics: ArcSwapOption<SystemMetrics>, - replication_mode: ReplicationMode, - pub(crate) replication_factor: usize, + pub(crate) replication_factor: ReplicationFactor, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -243,7 +242,8 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, config: &Config, ) -> Result<Arc<Self>, Error> { // ---- setup netapp RPC protocol ---- @@ -274,14 +274,13 @@ impl System { let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); // ---- setup cluster layout and layout manager ---- - let replication_factor = replication_mode.replication_factor(); - let layout_manager = LayoutManager::new( config, netapp.id, system_endpoint.clone(), peering.clone(), - replication_mode, + replication_factor, + consistency_mode, )?; let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); @@ -315,7 +314,6 @@ impl System { netapp: netapp.clone(), peering: peering.clone(), system_endpoint, - replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, rpc_public_addr, @@ -427,7 +425,9 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let quorum = self.replication_mode.write_quorum(); + let quorum = self + .replication_factor + .write_quorum(ConsistencyMode::Consistent); // Gather information about running nodes. // Technically, `nodes` contains currently running nodes, as well @@ -631,7 +631,7 @@ impl System { .count(); let not_configured = self.cluster_layout().check().is_err(); - let no_peers = n_connected < self.replication_factor; + let no_peers = n_connected < self.replication_factor.into(); let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = n_connected != expected_n_nodes; @@ -774,14 +774,14 @@ impl EndpointHandler<SystemRpc> for System { } impl NodeStatus { - fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self { + fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self { NodeStatus { hostname: Some( gethostname::gethostname() .into_string() .unwrap_or_else(|_| "<invalid utf-8>".to_string()), ), - replication_factor, + replication_factor: replication_factor.into(), layout_digest: layout_manager.layout().digest(), meta_disk_avail: None, data_disk_avail: None, diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs index 0bb55bf3..a64daec8 100644 --- a/src/rpc/system_metrics.rs +++ b/src/rpc/system_metrics.rs @@ -68,7 +68,7 @@ impl SystemMetrics { let replication_factor = system.replication_factor; meter .u64_value_observer("garage_replication_factor", move |observer| { - observer.observe(replication_factor as u64, &[]) + observer.observe(replication_factor.replication_factor() as u64, &[]) }) .with_description("Garage replication factor setting") .init() |