aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorYureka <yuka@yuka.dev>2024-03-04 19:58:32 +0100
committerYureka <yuka@yuka.dev>2024-03-07 12:45:33 +0100
commitc1769bbe69f723fb3980cf4fdac7615cfb782720 (patch)
treee623904bbdc861b42039057d6a1e982486fc60b4 /src
parent8f86af52ed917bce506989ae1f378d977aa6c3ef (diff)
downloadgarage-c1769bbe69f723fb3980cf4fdac7615cfb782720.tar.gz
garage-c1769bbe69f723fb3980cf4fdac7615cfb782720.zip
ReplicationMode -> ConsistencyMode+ReplicationFactor
Diffstat (limited to 'src')
-rw-r--r--src/garage/secrets.rs6
-rw-r--r--src/garage/tests/common/garage.rs2
-rw-r--r--src/model/garage.rs23
-rw-r--r--src/rpc/layout/helper.rs18
-rw-r--r--src/rpc/layout/history.rs10
-rw-r--r--src/rpc/layout/manager.rs27
-rw-r--r--src/rpc/layout/test.rs3
-rw-r--r--src/rpc/replication_mode.rs119
-rw-r--r--src/rpc/system.rs22
-rw-r--r--src/rpc/system_metrics.rs2
-rw-r--r--src/util/config.rs27
11 files changed, 158 insertions, 101 deletions
diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs
index c3d704aa..8d2ff475 100644
--- a/src/garage/secrets.rs
+++ b/src/garage/secrets.rs
@@ -163,7 +163,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
@@ -185,7 +185,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
allow_world_readable_secrets = true
@@ -296,7 +296,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index ebc82f37..f1c1efc8 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -54,7 +54,7 @@ metadata_dir = "{path}/meta"
data_dir = "{path}/data"
db_engine = "lmdb"
-replication_mode = "1"
+replication_factor = 1
rpc_bind_addr = "127.0.0.1:{rpc_port}"
rpc_public_addr = "127.0.0.1:{rpc_port}"
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 561aca8f..19f58077 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -9,7 +9,7 @@ use garage_util::config::*;
use garage_util::error::*;
use garage_util::persister::PersisterShared;
-use garage_rpc::replication_mode::ReplicationMode;
+use garage_rpc::replication_mode::*;
use garage_rpc::system::System;
use garage_block::manager::*;
@@ -39,8 +39,8 @@ pub struct Garage {
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
- /// The replication mode of this cluster
- pub replication_mode: ReplicationMode,
+ /// The replication factor of this cluster
+ pub replication_factor: ReplicationFactor,
/// The local database
pub db: db::Db,
@@ -222,27 +222,26 @@ impl Garage {
.and_then(|x| NetworkKey::from_slice(&x))
.ok_or_message("Invalid RPC secret key")?;
- let replication_mode = ReplicationMode::parse(&config.replication_mode)
- .ok_or_message("Invalid replication_mode in config file.")?;
+ let (replication_factor, consistency_mode) = parse_replication_mode(&config)?;
info!("Initialize background variable system...");
let mut bg_vars = vars::BgVars::new();
info!("Initialize membership management system...");
- let system = System::new(network_key, replication_mode, &config)?;
+ let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
- replication_factor: replication_mode.replication_factor(),
- write_quorum: replication_mode.write_quorum(),
+ replication_factor: replication_factor.into(),
+ write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: 1,
};
let meta_rep_param = TableShardedReplication {
system: system.clone(),
- replication_factor: replication_mode.replication_factor(),
- write_quorum: replication_mode.write_quorum(),
- read_quorum: replication_mode.read_quorum(),
+ replication_factor: replication_factor.into(),
+ write_quorum: replication_factor.write_quorum(consistency_mode),
+ read_quorum: replication_factor.read_quorum(consistency_mode),
};
let control_rep_param = TableFullReplication {
@@ -338,7 +337,7 @@ impl Garage {
Ok(Arc::new(Self {
config,
bg_vars,
- replication_mode,
+ replication_factor,
db,
system,
block_manager,
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 9fb738ea..2835347a 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct RpcLayoutDigest {
@@ -29,7 +29,8 @@ pub struct SyncLayoutDigest {
}
pub struct LayoutHelper {
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
layout: Option<LayoutHistory>,
// cached values
@@ -57,7 +58,8 @@ impl Deref for LayoutHelper {
impl LayoutHelper {
pub fn new(
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
mut layout: LayoutHistory,
mut ack_lock: HashMap<u64, AtomicUsize>,
) -> Self {
@@ -66,7 +68,7 @@ impl LayoutHelper {
// correct and we have rapid access to important values such as
// the layout versions to use when reading to ensure consistency.
- if !replication_mode.is_read_after_write_consistent() {
+ if consistency_mode != ConsistencyMode::Consistent {
// Fast path for when no consistency is required.
// In this case we only need to keep the last version of the layout,
// we don't care about coordinating stuff in the cluster.
@@ -103,7 +105,7 @@ impl LayoutHelper {
// This value is calculated using quorums to allow progress even
// if not all nodes have successfully completed a sync.
let sync_map_min =
- layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes);
+ layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
@@ -114,7 +116,8 @@ impl LayoutHelper {
.or_insert(AtomicUsize::new(0));
LayoutHelper {
- replication_mode,
+ replication_factor,
+ consistency_mode,
layout: Some(layout),
ack_map_min,
sync_map_min,
@@ -139,7 +142,8 @@ impl LayoutHelper {
let changed = f(self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
- self.replication_mode,
+ self.replication_factor,
+ self.consistency_mode,
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index b8cc27da..290f058d 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
impl LayoutHistory {
- pub fn new(replication_factor: usize) -> Self {
- let version = LayoutVersion::new(replication_factor);
+ pub fn new(replication_factor: ReplicationFactor) -> Self {
+ let version = LayoutVersion::new(replication_factor.into());
let staging = LayoutStaging {
parameters: Lww::<LayoutParameters>::new(version.parameters),
@@ -119,7 +119,7 @@ impl LayoutHistory {
pub(crate) fn calculate_sync_map_min_with_quorum(
&self,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
all_nongateway_nodes: &[Uuid],
) -> u64 {
// This function calculates the minimum layout version from which
@@ -133,7 +133,7 @@ impl LayoutHistory {
return self.current().version;
}
- let quorum = replication_mode.write_quorum();
+ let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent);
let min_version = self.min_stored();
let global_min = self
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 0b6c7e63..8a6eb1c3 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -14,13 +14,13 @@ use garage_util::error::*;
use garage_util::persister::Persister;
use super::*;
-use crate::replication_mode::ReplicationMode;
+use crate::replication_mode::*;
use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
node_id: Uuid,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
persist_cluster_layout: Persister<LayoutHistory>,
layout: Arc<RwLock<LayoutHelper>>,
@@ -38,20 +38,19 @@ impl LayoutManager {
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
peering: Arc<PeeringManager>,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
- if x.current().replication_factor != replication_mode.replication_factor() {
+ if x.current().replication_factor != replication_factor.replication_factor() {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
- replication_factor
+ replication_factor.replication_factor()
)));
}
x
@@ -65,8 +64,12 @@ impl LayoutManager {
}
};
- let mut cluster_layout =
- LayoutHelper::new(replication_mode, cluster_layout, Default::default());
+ let mut cluster_layout = LayoutHelper::new(
+ replication_factor,
+ consistency_mode,
+ cluster_layout,
+ Default::default(),
+ );
cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -81,7 +84,7 @@ impl LayoutManager {
Ok(Arc::new(Self {
node_id: node_id.into(),
- replication_mode,
+ replication_factor,
persist_cluster_layout,
layout,
change_notify,
@@ -295,11 +298,11 @@ impl LayoutManager {
adv.update_trackers
);
- if adv.current().replication_factor != self.replication_mode.replication_factor() {
+ if adv.current().replication_factor != self.replication_factor.replication_factor() {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
- self.replication_mode.replication_factor()
+ self.replication_factor.replication_factor()
);
error!("{}", msg);
return Err(Error::Message(msg));
diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs
index 88eb518e..fcbb9dfc 100644
--- a/src/rpc/layout/test.rs
+++ b/src/rpc/layout/test.rs
@@ -5,6 +5,7 @@ use garage_util::crdt::Crdt;
use garage_util::error::*;
use crate::layout::*;
+use crate::replication_mode::ReplicationFactor;
// This function checks that the partition size S computed is at least better than the
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
@@ -120,7 +121,7 @@ fn test_assignment() {
let mut node_capacity_vec = vec![4000, 1000, 2000];
let mut node_zone_vec = vec!["A", "B", "C"];
- let mut cl = LayoutHistory::new(3);
+ let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap());
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs
index b142ea10..a3a94085 100644
--- a/src/rpc/replication_mode.rs
+++ b/src/rpc/replication_mode.rs
@@ -1,57 +1,94 @@
-#[derive(Clone, Copy)]
-pub enum ReplicationMode {
- None,
- TwoWay,
- TwoWayDangerous,
- ThreeWay,
- ThreeWayDegraded,
- ThreeWayDangerous,
+use garage_util::config::Config;
+use garage_util::crdt::AutoCrdt;
+use garage_util::error::*;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
+#[serde(transparent)]
+pub struct ReplicationFactor(usize);
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum ConsistencyMode {
+ /// Read- and Write-quorum are 1
+ Dangerous,
+ /// Read-quorum is 1
+ Degraded,
+ /// Read- and Write-quorum are determined for read-after-write-consistency
+ #[default]
+ Consistent,
+}
+
+impl ConsistencyMode {
+ pub fn parse(s: &str) -> Option<Self> {
+ serde_json::from_value(serde_json::Value::String(s.to_string())).ok()
+ }
+}
+
+impl AutoCrdt for ConsistencyMode {
+ const WARN_IF_DIFFERENT: bool = true;
}
-impl ReplicationMode {
- pub fn parse(v: &str) -> Option<Self> {
- match v {
- "none" | "1" => Some(Self::None),
- "2" => Some(Self::TwoWay),
- "2-dangerous" => Some(Self::TwoWayDangerous),
- "3" => Some(Self::ThreeWay),
- "3-degraded" => Some(Self::ThreeWayDegraded),
- "3-dangerous" => Some(Self::ThreeWayDangerous),
- _ => None,
+impl ReplicationFactor {
+ pub fn new(replication_factor: usize) -> Option<Self> {
+ if replication_factor < 1 {
+ None
+ } else {
+ Some(Self(replication_factor))
}
}
pub fn replication_factor(&self) -> usize {
- match self {
- Self::None => 1,
- Self::TwoWay | Self::TwoWayDangerous => 2,
- Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3,
- }
+ self.0
}
- pub fn read_quorum(&self) -> usize {
- match self {
- Self::None => 1,
- Self::TwoWay | Self::TwoWayDangerous => 1,
- Self::ThreeWay => 2,
- Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1,
+ pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
+ match consistency_mode {
+ ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
+ ConsistencyMode::Consistent => self.replication_factor().div_ceil(2),
}
}
- pub fn write_quorum(&self) -> usize {
- match self {
- Self::None => 1,
- Self::TwoWay => 2,
- Self::TwoWayDangerous => 1,
- Self::ThreeWay | Self::ThreeWayDegraded => 2,
- Self::ThreeWayDangerous => 1,
+ pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
+ match consistency_mode {
+ ConsistencyMode::Dangerous => 1,
+ ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
+ (self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent)
+ }
}
}
+}
- pub fn is_read_after_write_consistent(&self) -> bool {
- match self {
- Self::None | Self::TwoWay | Self::ThreeWay => true,
- _ => false,
- }
+impl std::convert::From<ReplicationFactor> for usize {
+ fn from(replication_factor: ReplicationFactor) -> usize {
+ replication_factor.0
}
}
+
+pub fn parse_replication_mode(
+ config: &Config,
+) -> Result<(ReplicationFactor, ConsistencyMode), Error> {
+ match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) {
+ (Some(replication_mode), None, "consistent") => {
+ tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode");
+ let parsed_replication_mode = match replication_mode.as_str() {
+ "1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)),
+ "2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)),
+ "2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)),
+ "3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)),
+ "3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)),
+ "3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)),
+ _ => None,
+ };
+ Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?)
+ },
+ (None, Some(replication_factor), consistency_mode) => {
+ let replication_factor = ReplicationFactor::new(replication_factor)
+ .ok_or_message("Invalid replication_factor in config file.")?;
+ let consistency_mode = ConsistencyMode::parse(consistency_mode)
+ .ok_or_message("Invalid consistency_mode in config file.")?;
+ Some((replication_factor, consistency_mode))
+ }
+ _ => None,
+ }.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.")
+}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1c668306..54d589d2 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -112,8 +112,7 @@ pub struct System {
metrics: ArcSwapOption<SystemMetrics>,
- replication_mode: ReplicationMode,
- pub(crate) replication_factor: usize,
+ pub(crate) replication_factor: ReplicationFactor,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -243,7 +242,8 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
// ---- setup netapp RPC protocol ----
@@ -274,14 +274,13 @@ impl System {
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
// ---- setup cluster layout and layout manager ----
- let replication_factor = replication_mode.replication_factor();
-
let layout_manager = LayoutManager::new(
config,
netapp.id,
system_endpoint.clone(),
peering.clone(),
- replication_mode,
+ replication_factor,
+ consistency_mode,
)?;
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
@@ -315,7 +314,6 @@ impl System {
netapp: netapp.clone(),
peering: peering.clone(),
system_endpoint,
- replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr,
@@ -427,7 +425,9 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let quorum = self.replication_mode.write_quorum();
+ let quorum = self
+ .replication_factor
+ .write_quorum(ConsistencyMode::Consistent);
// Gather information about running nodes.
// Technically, `nodes` contains currently running nodes, as well
@@ -631,7 +631,7 @@ impl System {
.count();
let not_configured = self.cluster_layout().check().is_err();
- let no_peers = n_connected < self.replication_factor;
+ let no_peers = n_connected < self.replication_factor.into();
let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = n_connected != expected_n_nodes;
@@ -774,14 +774,14 @@ impl EndpointHandler<SystemRpc> for System {
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
+ fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self {
NodeStatus {
hostname: Some(
gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
),
- replication_factor,
+ replication_factor: replication_factor.into(),
layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None,
data_disk_avail: None,
diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs
index 0bb55bf3..a64daec8 100644
--- a/src/rpc/system_metrics.rs
+++ b/src/rpc/system_metrics.rs
@@ -68,7 +68,7 @@ impl SystemMetrics {
let replication_factor = system.replication_factor;
meter
.u64_value_observer("garage_replication_factor", move |observer| {
- observer.observe(replication_factor as u64, &[])
+ observer.observe(replication_factor.replication_factor() as u64, &[])
})
.with_description("Garage replication factor setting")
.init()
diff --git a/src/util/config.rs b/src/util/config.rs
index 056c625d..b7f27676 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -30,12 +30,20 @@ pub struct Config {
)]
pub block_size: usize,
- /// Replication mode. Supported values:
- /// - none, 1 -> no replication
- /// - 2 -> 2-way replication
- /// - 3 -> 3-way replication
- // (we can add more aliases for this later)
- pub replication_mode: String,
+ /// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
+ /// - 1 for single-node clusters, or to disable replication
+ /// - 3 is the recommended and supported setting.
+ #[serde(default)]
+ pub replication_factor: Option<usize>,
+
+ /// Consistency mode for all for requests through this node
+ /// - Degraded -> Disable read quorum
+ /// - Dangerous -> Disable read and write quorum
+ #[serde(default = "default_consistency_mode")]
+ pub consistency_mode: String,
+
+ /// Legacy option
+ pub replication_mode: Option<String>,
/// Zstd compression level used on data blocks
#[serde(
@@ -244,10 +252,15 @@ fn default_sled_cache_capacity() -> usize {
fn default_sled_flush_every_ms() -> u64 {
2000
}
+
fn default_block_size() -> usize {
1048576
}
+fn default_consistency_mode() -> String {
+ "consistent".into()
+}
+
fn default_compression() -> Option<i32> {
Some(1)
}
@@ -359,7 +372,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret = "foo"