aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/admin/api_server.rs2
-rw-r--r--src/api/admin/bucket.rs4
-rw-r--r--src/api/admin/cluster.rs176
-rw-r--r--src/api/common_error.rs8
-rw-r--r--src/api/k2v/index.rs11
-rw-r--r--src/api/s3/put.rs2
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/manager.rs20
-rw-r--r--src/block/resync.rs8
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin/bucket.rs4
-rw-r--r--src/garage/admin/mod.rs30
-rw-r--r--src/garage/cli/cmd.rs189
-rw-r--r--src/garage/cli/layout.rs230
-rw-r--r--src/garage/cli/structs.rs24
-rw-r--r--src/garage/cli/util.rs4
-rw-r--r--src/garage/tests/common/ext/process.rs44
-rw-r--r--src/garage/tests/common/garage.rs2
-rw-r--r--src/model/Cargo.toml2
-rw-r--r--src/model/helper/bucket.rs12
-rw-r--r--src/model/index_counter.rs8
-rw-r--r--src/model/k2v/rpc.rs45
-rw-r--r--src/net/Cargo.toml2
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/layout/graph_algo.rs (renamed from src/rpc/graph_algo.rs)10
-rw-r--r--src/rpc/layout/helper.rs294
-rw-r--r--src/rpc/layout/history.rs306
-rw-r--r--src/rpc/layout/manager.rs378
-rw-r--r--src/rpc/layout/mod.rs478
-rw-r--r--src/rpc/layout/test.rs157
-rw-r--r--src/rpc/layout/version.rs (renamed from src/rpc/layout.rs)713
-rw-r--r--src/rpc/lib.rs2
-rw-r--r--src/rpc/replication_mode.rs7
-rw-r--r--src/rpc/ring.rs164
-rw-r--r--src/rpc/rpc_helper.rs492
-rw-r--r--src/rpc/system.rs429
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/data.rs3
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/table/merkle.rs2
-rw-r--r--src/table/replication/fullcopy.rs40
-rw-r--r--src/table/replication/parameters.rs29
-rw-r--r--src/table/replication/sharded.rs51
-rw-r--r--src/table/sync.rs249
-rw-r--r--src/table/table.rs159
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/error.rs7
-rw-r--r--src/web/Cargo.toml2
50 files changed, 3216 insertions, 1607 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index bc6b6aa7..3b555b8b 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_api"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 50813d11..2b9be24e 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -284,7 +284,7 @@ impl ApiHandler for AdminApiServer {
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
- Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await,
// Keys
Endpoint::ListKeys => handle_list_keys(&self.garage).await,
Endpoint::GetKeyInfo {
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 1b22dd03..a8718a9f 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -123,7 +123,7 @@ async fn bucket_info_results(
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let mpu_counters = garage
@@ -131,7 +131,7 @@ async fn bucket_info_results(
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 3876c608..8ce6c5ed 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -16,25 +17,95 @@ use crate::admin::error::*;
use crate::helpers::{json_ok_response, parse_json_body};
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ let layout = garage.system.cluster_layout();
+ let mut nodes = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ i.id,
+ NodeResp {
+ id: hex::encode(i.id),
+ addr: Some(i.addr),
+ hostname: i.status.hostname,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ data_partition: i
+ .status
+ .data_disk_avail
+ .map(|(avail, total)| FreeSpaceResp {
+ available: avail,
+ total,
+ }),
+ metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| {
+ FreeSpaceResp {
+ available: avail,
+ total,
+ }
+ }),
+ ..Default::default()
+ },
+ )
+ })
+ .collect::<HashMap<_, _>>();
+
+ for (id, _, role) in layout.current().roles.items().iter() {
+ if let layout::NodeRoleV(Some(r)) = role {
+ let role = NodeRoleResp {
+ id: hex::encode(id),
+ zone: r.zone.to_string(),
+ capacity: r.capacity,
+ tags: r.tags.clone(),
+ };
+ match nodes.get_mut(id) {
+ None => {
+ nodes.insert(
+ *id,
+ NodeResp {
+ id: hex::encode(id),
+ role: Some(role),
+ ..Default::default()
+ },
+ );
+ }
+ Some(n) => {
+ if n.role.is_none() {
+ n.role = Some(role);
+ }
+ }
+ }
+ }
+ }
+
+ for ver in layout.versions.iter().rev().skip(1) {
+ for (id, _, role) in ver.roles.items().iter() {
+ if let layout::NodeRoleV(Some(r)) = role {
+ if !nodes.contains_key(id) && r.capacity.is_some() {
+ nodes.insert(
+ *id,
+ NodeResp {
+ id: hex::encode(id),
+ draining: true,
+ ..Default::default()
+ },
+ );
+ }
+ }
+ }
+ }
+
+ let mut nodes = nodes.into_values().collect::<Vec<_>>();
+ nodes.sort_by(|x, y| x.id.cmp(&y.id));
+
let res = GetClusterStatusResponse {
node: hex::encode(garage.system.id),
garage_version: garage_util::version::garage_version(),
garage_features: garage_util::version::garage_features(),
rust_version: garage_util::version::rust_version(),
db_engine: garage.db.engine(),
- known_nodes: garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|i| KnownNodeResp {
- id: hex::encode(i.id),
- addr: i.addr,
- is_up: i.is_up,
- last_seen_secs_ago: i.last_seen_secs_ago,
- hostname: i.status.hostname,
- })
- .collect(),
- layout: format_cluster_layout(&garage.system.get_cluster_layout()),
+ layout_version: layout.current().version,
+ nodes,
};
Ok(json_ok_response(&res)?)
@@ -85,13 +156,14 @@ pub async fn handle_connect_cluster_nodes(
}
pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let res = format_cluster_layout(&garage.system.get_cluster_layout());
+ let res = format_cluster_layout(&garage.system.cluster_layout());
Ok(json_ok_response(&res)?)
}
-fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse {
+fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
let roles = layout
+ .current()
.roles
.items()
.iter()
@@ -105,10 +177,12 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.collect::<Vec<_>>();
let staged_role_changes = layout
- .staging_roles
+ .staging
+ .get()
+ .roles
.items()
.iter()
- .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v))
.map(|(k, _, v)| match &v.0 {
None => NodeRoleChange {
id: hex::encode(k),
@@ -126,7 +200,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.collect::<Vec<_>>();
GetClusterLayoutResponse {
- version: layout.version,
+ version: layout.current().version,
roles,
staged_role_changes,
}
@@ -155,8 +229,8 @@ struct GetClusterStatusResponse {
garage_features: Option<&'static [&'static str]>,
rust_version: &'static str,
db_engine: String,
- known_nodes: Vec<KnownNodeResp>,
- layout: GetClusterLayoutResponse,
+ layout_version: u64,
+ nodes: Vec<NodeResp>,
}
#[derive(Serialize)]
@@ -190,14 +264,27 @@ struct NodeRoleResp {
tags: Vec<String>,
}
-#[derive(Serialize)]
+#[derive(Serialize, Default)]
+#[serde(rename_all = "camelCase")]
+struct FreeSpaceResp {
+ available: u64,
+ total: u64,
+}
+
+#[derive(Serialize, Default)]
#[serde(rename_all = "camelCase")]
-struct KnownNodeResp {
+struct NodeResp {
id: String,
- addr: SocketAddr,
+ role: Option<NodeRoleResp>,
+ addr: Option<SocketAddr>,
+ hostname: Option<String>,
is_up: bool,
last_seen_secs_ago: Option<u64>,
- hostname: String,
+ draining: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ data_partition: Option<FreeSpaceResp>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ metadata_partition: Option<FreeSpaceResp>,
}
// ---- update functions ----
@@ -208,10 +295,10 @@ pub async fn handle_update_cluster_layout(
) -> Result<Response<ResBody>, Error> {
let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
- let mut layout = garage.system.get_cluster_layout();
+ let mut layout = garage.system.cluster_layout().clone();
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
for change in updates {
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
@@ -232,11 +319,17 @@ pub async fn handle_update_cluster_layout(
};
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
@@ -246,12 +339,16 @@ pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> {
- let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
+ let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?;
- let layout = garage.system.get_cluster_layout();
+ let layout = garage.system.cluster_layout().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = ApplyClusterLayoutResponse {
message: msg,
@@ -262,13 +359,14 @@ pub async fn handle_apply_cluster_layout(
pub async fn handle_revert_cluster_layout(
garage: &Arc<Garage>,
- req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> {
- let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
-
- let layout = garage.system.get_cluster_layout();
- let layout = layout.revert_staged_changes(Some(param.version))?;
- garage.system.update_cluster_layout(&layout).await?;
+ let layout = garage.system.cluster_layout().clone();
+ let layout = layout.revert_staged_changes()?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
@@ -280,7 +378,7 @@ type UpdateClusterLayoutRequest = Vec<NodeRoleChange>;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
-struct ApplyRevertLayoutRequest {
+struct ApplyLayoutRequest {
version: u64,
}
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
index 4381f227..c47555d4 100644
--- a/src/api/common_error.rs
+++ b/src/api/common_error.rs
@@ -59,9 +59,7 @@ impl CommonError {
pub fn http_status_code(&self) -> StatusCode {
match self {
CommonError::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..),
) => StatusCode::SERVICE_UNAVAILABLE,
CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
@@ -80,9 +78,7 @@ impl CommonError {
match self {
CommonError::Forbidden(_) => "AccessDenied",
CommonError::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..),
) => "ServiceUnavailable",
CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
"InternalError"
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 1baec1db..291464ab 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -5,7 +5,6 @@ use serde::Serialize;
use garage_util::data::*;
-use garage_rpc::ring::Ring;
use garage_table::util::*;
use garage_model::garage::Garage;
@@ -27,7 +26,11 @@ pub async fn handle_read_index(
) -> Result<Response<ResBody>, Error> {
let reverse = reverse.unwrap_or(false);
- let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+ let node_id_vec = garage
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .to_vec();
let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table,
@@ -36,7 +39,7 @@ pub async fn handle_read_index(
&start,
&end,
limit,
- Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ Some((DeletedFilter::NotDeleted, node_id_vec)),
EnumerationOrder::from_reverse(reverse),
)
.await?;
@@ -55,7 +58,7 @@ pub async fn handle_read_index(
partition_keys: partition_keys
.into_iter()
.map(|part| {
- let vals = part.filtered_values(&ring);
+ let vals = part.filtered_values(&garage.system.cluster_layout());
ReadIndexResponseEntry {
pk: part.sk,
entries: *vals.get(&s_entries).unwrap_or(&0),
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index fdfa567d..6dd68360 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -255,7 +255,7 @@ pub(crate) async fn check_quotas(
.await?;
let counters = counters
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let (prev_cnt_obj, prev_cnt_size) = match prev_object {
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index d2666b10..b5763120 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_block"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 848d9141..5283886c 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -267,8 +267,10 @@ impl BlockManager {
F: Fn(DataBlockHeader, ByteStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -308,7 +310,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
@@ -354,7 +356,7 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
@@ -363,10 +365,10 @@ impl BlockManager {
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
@@ -439,7 +441,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -533,7 +535,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..15f210e4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -377,7 +377,7 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.write_nodes(hash);
+ let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -385,7 +385,7 @@ impl BlockResyncManager {
let who_needs_resps = manager
.system
- .rpc
+ .rpc_helper()
.call_many(
&manager.endpoint,
&who,
@@ -431,10 +431,10 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes);
manager
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&manager.endpoint,
- &need_nodes[..],
+ &need_nodes,
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(need_nodes.len()),
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 9a925136..fddc5cca 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_db"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 65135530..00ecb35e 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
index 0781cb8b..9e642f57 100644
--- a/src/garage/admin/bucket.rs
+++ b/src/garage/admin/bucket.rs
@@ -70,7 +70,7 @@ impl AdminRpcHandler {
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mpu_counters = self
@@ -79,7 +79,7 @@ impl AdminRpcHandler {
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index b6f9c426..de7851e1 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -18,7 +18,7 @@ use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
-use garage_rpc::ring::PARTITION_BITS;
+use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
@@ -126,8 +126,8 @@ impl AdminRpcHandler {
opt_to_send.all_nodes = false;
let mut failures = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
@@ -163,9 +163,9 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let ring = self.garage.system.ring.borrow().clone();
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in ring.layout.node_ids().iter() {
+ for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
@@ -274,11 +274,11 @@ impl AdminRpcHandler {
fn gather_cluster_stats(&self) -> String {
let mut ret = String::new();
- // Gather storage node and free space statistics
- let layout = &self.garage.system.ring.borrow().layout;
+ // Gather storage node and free space statistics for current nodes
+ let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.ring_assignment_data.iter() {
- let id = layout.node_id_vec[*short_id as usize];
+ for short_id in layout.current().ring_assignment_data.iter() {
+ let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = self
@@ -293,8 +293,8 @@ impl AdminRpcHandler {
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
- let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
@@ -440,8 +440,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
@@ -488,8 +488,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 48359614..fb6dface 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use std::time::Duration;
use format_table::format_table;
@@ -49,50 +49,61 @@ pub async fn cli_command_dispatch(
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
+ let status = fetch_status(rpc_cli, rpc_host).await?;
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
let mut healthy_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
- match layout.roles.get(&adv.id) {
- Some(NodeRoleV(Some(cfg))) => {
- let data_avail = match &adv.status.data_disk_avail {
- _ if cfg.capacity.is_none() => "N/A".into(),
- Some((avail, total)) => {
- let pct = (*avail as f64) / (*total as f64) * 100.;
- let avail = bytesize::ByteSize::b(*avail);
- format!("{} ({:.1}%)", avail, pct)
- }
- None => "?".into(),
- };
+ let host = adv.status.hostname.as_deref().unwrap_or("?");
+ if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
+ let data_avail = match &adv.status.data_disk_avail {
+ _ if cfg.capacity.is_none() => "N/A".into(),
+ Some((avail, total)) => {
+ let pct = (*avail as f64) / (*total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(*avail);
+ format!("{} ({:.1}%)", avail, pct)
+ }
+ None => "?".into(),
+ };
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
+ id = adv.id,
+ host = host,
+ addr = adv.addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ data_avail = data_avail,
+ ));
+ } else {
+ let prev_role = layout
+ .versions
+ .iter()
+ .rev()
+ .find_map(|x| match x.roles.get(&adv.id) {
+ Some(NodeRoleV(Some(cfg))) => Some(cfg),
+ _ => None,
+ });
+ if let Some(cfg) = prev_role {
healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
id = adv.id,
- host = adv.status.hostname,
+ host = host,
addr = adv.addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
- capacity = cfg.capacity_string(),
- data_avail = data_avail,
));
- }
- _ => {
- let new_role = match layout.staging_roles.get(&adv.id) {
- Some(NodeRoleV(Some(_))) => "(pending)",
+ } else {
+ let new_role = match layout.staging.get().roles.get(&adv.id) {
+ Some(NodeRoleV(Some(_))) => "pending...",
_ => "NO ROLE ASSIGNED",
};
healthy_nodes.push(format!(
- "{id:?}\t{h}\t{addr}\t{new_role}",
+ "{id:?}\t{h}\t{addr}\t\t\t{new_role}",
id = adv.id,
- h = adv.status.hostname,
+ h = host,
addr = adv.addr,
new_role = new_role,
));
@@ -101,51 +112,76 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
}
format_table(healthy_nodes);
- let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
- let failure_case_1 = status
- .iter()
- .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_)))));
- let failure_case_2 = layout
- .roles
- .items()
+ // Determine which nodes are unhealthy and print that to stdout
+ let status_map = status
.iter()
- .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some());
- if failure_case_1 || failure_case_2 {
- println!("\n==== FAILED NODES ====");
- let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
- for adv in status.iter().filter(|adv| !adv.is_up) {
- if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
- let tf = timeago::Formatter::new();
- failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
- id = adv.id,
- host = adv.status.hostname,
- addr = adv.addr,
- tags = cfg.tags.join(","),
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- last_seen = adv
- .last_seen_secs_ago
- .map(|s| tf.convert(Duration::from_secs(s)))
- .unwrap_or_else(|| "never seen".into()),
- ));
+ .map(|adv| (adv.id, adv))
+ .collect::<HashMap<_, _>>();
+
+ let tf = timeago::Formatter::new();
+ let mut drain_msg = false;
+ let mut failed_nodes =
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
+ let mut listed = HashSet::new();
+ for ver in layout.versions.iter().rev() {
+ for (node, _, role) in ver.roles.items().iter() {
+ let cfg = match role {
+ NodeRoleV(Some(role)) if role.capacity.is_some() => role,
+ _ => continue,
+ };
+
+ if listed.contains(node) {
+ continue;
}
- }
- for (id, _, role_v) in layout.roles.items().iter() {
- if let NodeRoleV(Some(cfg)) = role_v {
- if !status_keys.contains(id) {
- failed_nodes.push(format!(
- "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen",
- id = id,
- tags = cfg.tags.join(","),
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
- }
+ listed.insert(*node);
+
+ let adv = status_map.get(node);
+ if adv.map(|x| x.is_up).unwrap_or(false) {
+ continue;
}
+
+ // Node is in a layout version, is not a gateway node, and is not up:
+ // it is in a failed state, add proper line to the output
+ let (host, addr, last_seen) = match adv {
+ Some(adv) => (
+ adv.status.hostname.as_deref().unwrap_or("?"),
+ adv.addr.to_string(),
+ adv.last_seen_secs_ago
+ .map(|s| tf.convert(Duration::from_secs(s)))
+ .unwrap_or_else(|| "never seen".into()),
+ ),
+ None => ("??", "??".into(), "never seen".into()),
+ };
+ let capacity = if ver.version == layout.current().version {
+ cfg.capacity_string()
+ } else {
+ drain_msg = true;
+ "draining metadata...".to_string()
+ };
+ failed_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
+ id = node,
+ host = host,
+ addr = addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = capacity,
+ last_seen = last_seen,
+ ));
}
+ }
+
+ if failed_nodes.len() > 1 {
+ println!("\n==== FAILED NODES ====");
format_table(failed_nodes);
+ if drain_msg {
+ println!();
+ println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
+ println!("If these nodes are definitely dead, please review the layout history with");
+ println!(
+ "`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
+ );
+ }
}
if print_staging_role_changes(&layout) {
@@ -226,3 +262,18 @@ pub async fn cmd_admin(
}
Ok(())
}
+
+// ---- utility ----
+
+pub async fn fetch_status(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<Vec<KnownNodeInfo>, Error> {
+ match rpc_cli
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
+ resp => Err(Error::unexpected_rpc_message(resp)),
+ }
+}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index ce2b11e0..f76e33c5 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -32,6 +32,10 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
}
+ LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
+ LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
+ cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
+ }
}
}
@@ -49,6 +53,7 @@ pub async fn cmd_assign_role(
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let all_nodes = layout.get_all_nodes();
let added_nodes = args
.node_ids
@@ -58,21 +63,23 @@ pub async fn cmd_assign_role(
status
.iter()
.map(|adv| adv.id)
- .chain(layout.node_ids().iter().cloned()),
+ .chain(all_nodes.iter().cloned()),
node_id,
)
})
.collect::<Result<Vec<_>, _>>()?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
+ let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -130,7 +137,9 @@ pub async fn cmd_assign_role(
};
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -149,14 +158,16 @@ pub async fn cmd_remove_role(
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -174,13 +185,16 @@ pub async fn cmd_show_layout(
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
- print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
+ print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
- println!("Current cluster layout version: {}", layout.version);
+ println!(
+ "Current cluster layout version: {}",
+ layout.current().version
+ );
let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes {
- let v = layout.version;
+ let v = layout.current().version;
let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
@@ -189,7 +203,7 @@ pub async fn cmd_show_layout(
Ok((layout, msg)) => {
println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- print_cluster_layout(&layout, "No nodes have a role in the new layout.");
+ print_cluster_layout(layout.current(), "No nodes have a role in the new layout.");
println!();
for line in msg.iter() {
@@ -199,16 +213,12 @@ pub async fn cmd_show_layout(
println!();
println!(" garage layout apply --version {}", v + 1);
println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
Err(e) => {
println!("Error while trying to compute the assignment: {}", e);
println!("This new layout cannot yet be applied.");
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
@@ -241,9 +251,15 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
+ if !revert_opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to run the layout revert operation".into(),
+ ));
+ }
+
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.revert_staged_changes(revert_opt.version)?;
+ let layout = layout.revert_staged_changes()?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -266,11 +282,11 @@ pub async fn cmd_config_layout(
.parse::<ZoneRedundancy>()
.ok_or_message("invalid zone redundancy value")?;
if let ZoneRedundancy::AtLeast(r_int) = r {
- if r_int > layout.replication_factor {
+ if r_int > layout.current().replication_factor {
return Err(Error::Message(format!(
"The zone redundancy must be smaller or equal to the \
replication factor ({}).",
- layout.replication_factor
+ layout.current().replication_factor
)));
} else if r_int < 1 {
return Err(Error::Message(
@@ -280,7 +296,9 @@ pub async fn cmd_config_layout(
}
layout
- .staging_parameters
+ .staging
+ .get_mut()
+ .parameters
.update(LayoutParameters { zone_redundancy: r });
println!("The zone redundancy parameter has been set to '{}'.", r);
did_something = true;
@@ -297,25 +315,166 @@ pub async fn cmd_config_layout(
Ok(())
}
+pub async fn cmd_layout_history(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let min_stored = layout.min_stored();
+
+ println!("==== LAYOUT HISTORY ====");
+ let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
+ for ver in layout
+ .versions
+ .iter()
+ .rev()
+ .chain(layout.old_versions.iter().rev())
+ {
+ let status = if ver.version == layout.current().version {
+ "current"
+ } else if ver.version >= min_stored {
+ "draining"
+ } else {
+ "historical"
+ };
+ table.push(format!(
+ "#{}\t{}\t{}\t{}",
+ ver.version,
+ status,
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
+ .count(),
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
+ .count(),
+ ));
+ }
+ format_table(table);
+ println!();
+
+ if layout.versions.len() > 1 {
+ println!("==== UPDATE TRACKERS ====");
+ println!("Several layout versions are currently live in the version, and data is being migrated.");
+ println!(
+ "This is the internal data that Garage stores to know which nodes have what data."
+ );
+ println!();
+ let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
+ let all_nodes = layout.get_all_nodes();
+ for node in all_nodes.iter() {
+ table.push(format!(
+ "{:?}\t#{}\t#{}\t#{}",
+ node,
+ layout.update_trackers.ack_map.get(node, min_stored),
+ layout.update_trackers.sync_map.get(node, min_stored),
+ layout.update_trackers.sync_ack_map.get(node, min_stored),
+ ));
+ }
+ table[1..].sort();
+ format_table(table);
+
+ println!();
+ println!(
+ "If some nodes are not catching up to the latest layout version in the update trackers,"
+ );
+ println!("it might be because they are offline or unable to complete a sync successfully.");
+ println!(
+ "You may force progress using `garage layout skip-dead-nodes --version {}`",
+ layout.current().version
+ );
+ } else {
+ println!("Your cluster is currently in a stable state with a single live layout version.");
+ println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
+ println!(
+ "so you might want to keep old nodes online until their data directories become empty."
+ );
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_layout_skip_dead_nodes(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ opt: SkipDeadNodesOpt,
+) -> Result<(), Error> {
+ let status = fetch_status(rpc_cli, rpc_host).await?;
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ if layout.versions.len() == 1 {
+ return Err(Error::Message(
+ "This command cannot be called when there is only one live cluster layout version"
+ .into(),
+ ));
+ }
+
+ let min_v = layout.min_stored();
+ if opt.version <= min_v || opt.version > layout.current().version {
+ return Err(Error::Message(format!(
+ "Invalid version, you may use the following version numbers: {}",
+ (min_v + 1..=layout.current().version)
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ .join(" ")
+ )));
+ }
+
+ let all_nodes = layout.get_all_nodes();
+ let mut did_something = false;
+ for node in all_nodes.iter() {
+ if status.iter().any(|x| x.id == *node && x.is_up) {
+ continue;
+ }
+
+ if layout.update_trackers.ack_map.set_max(*node, opt.version) {
+ println!("Increased the ACK tracker for node {:?}", node);
+ did_something = true;
+ }
+
+ if opt.allow_missing_data {
+ if layout.update_trackers.sync_map.set_max(*node, opt.version) {
+ println!("Increased the SYNC tracker for node {:?}", node);
+ did_something = true;
+ }
+ }
+ }
+
+ if did_something {
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ println!("Success.");
+ Ok(())
+ } else if !opt.allow_missing_data {
+ Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
+ } else {
+ Err(Error::Message(
+ "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
+ ))
+ }
+}
+
// --- utility ---
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
-) -> Result<ClusterLayout, Error> {
+) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
- resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
- layout: ClusterLayout,
+ layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
@@ -327,7 +486,7 @@ pub async fn send_layout(
Ok(())
}
-pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
+pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
@@ -366,21 +525,22 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
}
}
-pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- let has_role_changes = layout
- .staging_roles
+pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
+ let staging = layout.staging.get();
+ let has_role_changes = staging
+ .roles
.items()
.iter()
- .any(|(k, _, v)| layout.roles.get(k) != Some(v));
- let has_layout_changes = *layout.staging_parameters.get() != layout.parameters;
+ .any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
+ let has_layout_changes = *staging.parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging_roles.items().iter() {
- if layout.roles.get(id) == Some(role) {
+ for (id, _, role) in staging.roles.items().iter() {
+ if layout.current().roles.get(id) == Some(role) {
continue;
}
if let Some(role) = &role.0 {
@@ -402,7 +562,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
if has_layout_changes {
println!(
"Zone redundancy: {}",
- layout.staging_parameters.get().zone_redundancy
+ staging.parameters.get().zone_redundancy
);
}
true
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index be4d5bd6..40e47ee1 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -114,6 +114,14 @@ pub enum LayoutOperation {
/// Revert staged changes to cluster layout
#[structopt(name = "revert", version = garage_version())]
Revert(RevertLayoutOpt),
+
+ /// View the history of layouts in the cluster
+ #[structopt(name = "history", version = garage_version())]
+ History,
+
+ /// Skip dead nodes when awaiting for a new layout version to be synchronized
+ #[structopt(name = "skip-dead-nodes", version = garage_version())]
+ SkipDeadNodes(SkipDeadNodesOpt),
}
#[derive(StructOpt, Debug)]
@@ -166,9 +174,21 @@ pub struct ApplyLayoutOpt {
#[derive(StructOpt, Debug)]
pub struct RevertLayoutOpt {
- /// Version number of old configuration to which to revert
+ /// The revert operation will not be ran unless this flag is added
+ #[structopt(long = "yes")]
+ pub(crate) yes: bool,
+}
+
+#[derive(StructOpt, Debug)]
+pub struct SkipDeadNodesOpt {
+ /// Version number of the layout to assume is currently up-to-date.
+ /// This will generally be the current layout version.
#[structopt(long = "version")]
- pub(crate) version: Option<u64>,
+ pub(crate) version: u64,
+ /// Allow the skip even if a quorum of ndoes could not be found for
+ /// the data among the remaining nodes
+ #[structopt(long = "allow-missing-data")]
+ pub(crate) allow_missing_data: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 2232d395..0511e2b1 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -450,6 +450,8 @@ pub fn print_block_info(
if refcount != nondeleted_count {
println!();
- println!("Warning: refcount does not match number of non-deleted versions");
+ println!(
+ "Warning: refcount does not match number of non-deleted versions (see issue #644)."
+ );
}
}
diff --git a/src/garage/tests/common/ext/process.rs b/src/garage/tests/common/ext/process.rs
index ba533b6c..8e20bf7c 100644
--- a/src/garage/tests/common/ext/process.rs
+++ b/src/garage/tests/common/ext/process.rs
@@ -14,42 +14,20 @@ impl CommandExt for process::Command {
}
fn expect_success_status(&mut self, msg: &str) -> process::ExitStatus {
- let status = self.status().expect(msg);
- status.expect_success(msg);
- status
+ self.expect_success_output(msg).status
}
fn expect_success_output(&mut self, msg: &str) -> process::Output {
let output = self.output().expect(msg);
- output.expect_success(msg);
- output
- }
-}
-
-pub trait OutputExt {
- fn expect_success(&self, msg: &str);
-}
-
-impl OutputExt for process::Output {
- fn expect_success(&self, msg: &str) {
- self.status.expect_success(msg)
- }
-}
-
-pub trait ExitStatusExt {
- fn expect_success(&self, msg: &str);
-}
-
-impl ExitStatusExt for process::ExitStatus {
- fn expect_success(&self, msg: &str) {
- if !self.success() {
- match self.code() {
- Some(code) => panic!(
- "Command exited with code {code}: {msg}",
- code = code,
- msg = msg
- ),
- None => panic!("Command exited with signal: {msg}", msg = msg),
- }
+ if !output.status.success() {
+ panic!(
+ "{}: command {:?} exited with error {:?}\nSTDOUT: {}\nSTDERR: {}",
+ msg,
+ self,
+ output.status.code(),
+ String::from_utf8_lossy(&output.stdout),
+ String::from_utf8_lossy(&output.stderr)
+ );
}
+ output
}
}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index d1f0867a..ebc82f37 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -96,7 +96,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=info,garage_api=trace")
+ .env("RUST_LOG", "garage=debug,garage_api=trace")
.spawn()
.expect("Could not start garage");
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index ce0ccff0..33898e20 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_model"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index e9842a91..222cfc83 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -450,10 +450,12 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")]
{
- use garage_rpc::ring::Ring;
- use std::sync::Arc;
-
- let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
+ let node_id_vec = self
+ .0
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .to_vec();
let k2vindexes = self
.0
.k2v
@@ -462,7 +464,7 @@ impl<'a> BucketHelper<'a> {
.get_range(
&bucket_id,
None,
- Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ Some((DeletedFilter::NotDeleted, node_id_vec)),
10,
EnumerationOrder::Forward,
)
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index c0bf38d8..aa13ee7b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
-use garage_rpc::ring::Ring;
+use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -83,9 +83,9 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
}
impl<T: CountedItem> CounterEntry<T> {
- pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
- let nodes = &ring.layout.node_id_vec[..];
- self.filtered_values_with_nodes(nodes)
+ pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap<String, i64> {
+ let nodes = layout.all_nongateway_nodes();
+ self.filtered_values_with_nodes(&nodes)
}
pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 4ab44c22..e15f2df8 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -127,23 +127,21 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
K2VRpc::InsertItem(InsertedItem {
partition,
sort_key,
causal_context,
value,
}),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@@ -168,7 +166,7 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
call_list.entry(who).or_default().push(InsertedItem {
@@ -187,14 +185,12 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
Ok::<_, Error>((nodes, resp))
@@ -223,15 +219,16 @@ impl K2VRpcHandler {
},
sort_key,
};
+ // TODO figure this out with write sets, is it still appropriate???
let nodes = self
.item_table
.data
.replication
- .write_nodes(&poll_key.partition.hash());
+ .read_nodes(&poll_key.partition.hash());
- let rpc = self.system.rpc.try_call_many(
+ let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
K2VRpc::PollItem {
key: poll_key,
causal_context,
@@ -239,9 +236,11 @@ impl K2VRpcHandler {
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
+ .send_all_at_once(true)
.without_timeout(),
);
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let timeout_duration =
+ Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
@@ -283,11 +282,12 @@ impl K2VRpcHandler {
seen.restrict(&range);
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ // TODO figure this out with write sets, does it still work????
let nodes = self
.item_table
.data
.replication
- .write_nodes(&range.partition.hash());
+ .read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let msg = K2VRpc::PollRange {
range,
@@ -300,7 +300,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc_helper()
+ .call(&self.endpoint, *node, msg.clone(), rs)
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
@@ -316,8 +320,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
- let mut deadline =
- Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut deadline = Instant::now()
+ + Duration::from_millis(timeout_msec)
+ + self.system.rpc_helper().rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {
@@ -339,7 +344,7 @@ impl K2VRpcHandler {
}
if errors.len() > nodes.len() - quorum {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ return Err(Error::Quorum(quorum, None, resps.len(), nodes.len(), errors).into());
}
// Take all returned items into account to produce the response.
diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml
index a2674498..df81c437 100644
--- a/src/net/Cargo.toml
+++ b/src/net/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_net"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "AGPL-3.0"
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 5d5750c4..3e7ac635 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/rpc/graph_algo.rs b/src/rpc/layout/graph_algo.rs
index d8c6c9b9..bd33e97f 100644
--- a/src/rpc/graph_algo.rs
+++ b/src/rpc/layout/graph_algo.rs
@@ -114,16 +114,6 @@ impl Graph<FlowEdge> {
Ok(result)
}
- /// This function returns the value of the flow incoming to v.
- pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> {
- let idv = self.get_vertex_id(&v)?;
- let mut result = 0;
- for edge in self.graph[idv].iter() {
- result += max(0, self.graph[edge.dest][edge.rev].flow);
- }
- Ok(result)
- }
-
/// This function returns the value of the flow outgoing from v.
pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> {
let idv = self.get_vertex_id(&v)?;
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
new file mode 100644
index 00000000..9fb738ea
--- /dev/null
+++ b/src/rpc/layout/helper.rs
@@ -0,0 +1,294 @@
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+use super::*;
+use crate::replication_mode::ReplicationMode;
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
+pub struct RpcLayoutDigest {
+ /// Cluster layout version
+ pub current_version: u64,
+ /// Number of active layout versions
+ pub active_versions: usize,
+ /// Hash of cluster layout update trackers
+ pub trackers_hash: Hash,
+ /// Hash of cluster layout staging data
+ pub staging_hash: Hash,
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub struct SyncLayoutDigest {
+ current: u64,
+ ack_map_min: u64,
+ min_stored: u64,
+}
+
+pub struct LayoutHelper {
+ replication_mode: ReplicationMode,
+ layout: Option<LayoutHistory>,
+
+ // cached values
+ ack_map_min: u64,
+ sync_map_min: u64,
+
+ all_nodes: Vec<Uuid>,
+ all_nongateway_nodes: Vec<Uuid>,
+
+ trackers_hash: Hash,
+ staging_hash: Hash,
+
+ // ack lock: counts in-progress write operations for each
+ // layout version ; we don't increase the ack update tracker
+ // while this lock is nonzero
+ pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
+}
+
+impl Deref for LayoutHelper {
+ type Target = LayoutHistory;
+ fn deref(&self) -> &LayoutHistory {
+ self.layout()
+ }
+}
+
+impl LayoutHelper {
+ pub fn new(
+ replication_mode: ReplicationMode,
+ mut layout: LayoutHistory,
+ mut ack_lock: HashMap<u64, AtomicUsize>,
+ ) -> Self {
+ // In the new() function of the helper, we do a bunch of cleanup
+ // and calculations on the layout history to make sure things are
+ // correct and we have rapid access to important values such as
+ // the layout versions to use when reading to ensure consistency.
+
+ if !replication_mode.is_read_after_write_consistent() {
+ // Fast path for when no consistency is required.
+ // In this case we only need to keep the last version of the layout,
+ // we don't care about coordinating stuff in the cluster.
+ layout.keep_current_version_only();
+ }
+
+ layout.cleanup_old_versions();
+
+ let all_nodes = layout.get_all_nodes();
+ let all_nongateway_nodes = layout.get_all_nongateway_nodes();
+
+ layout.clamp_update_trackers(&all_nodes);
+
+ let min_version = layout.min_stored();
+
+ // ack_map_min is the minimum value of ack_map among all nodes
+ // in the cluster (gateway, non-gateway, current and previous layouts).
+ // It is the highest layout version which all of these nodes have
+ // acknowledged, indicating that they are aware of it and are no
+ // longer processing write operations that did not take it into account.
+ let ack_map_min = layout
+ .update_trackers
+ .ack_map
+ .min_among(&all_nodes, min_version);
+
+ // sync_map_min is the minimum value of sync_map among storage nodes
+ // in the cluster (non-gateway nodes only, current and previous layouts).
+ // It is the highest layout version for which we know that all relevant
+ // storage nodes have fullfilled a sync, and therefore it is safe to
+ // use a read quorum within that layout to ensure consistency.
+ // Gateway nodes are excluded here because they hold no relevant data
+ // (they store the bucket and access key tables, but we don't have
+ // consistency on those).
+ // This value is calculated using quorums to allow progress even
+ // if not all nodes have successfully completed a sync.
+ let sync_map_min =
+ layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes);
+
+ let trackers_hash = layout.calculate_trackers_hash();
+ let staging_hash = layout.calculate_staging_hash();
+
+ ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
+ ack_lock
+ .entry(layout.current().version)
+ .or_insert(AtomicUsize::new(0));
+
+ LayoutHelper {
+ replication_mode,
+ layout: Some(layout),
+ ack_map_min,
+ sync_map_min,
+ all_nodes,
+ all_nongateway_nodes,
+ trackers_hash,
+ staging_hash,
+ ack_lock,
+ }
+ }
+
+ // ------------------ single updating function --------------
+
+ fn layout(&self) -> &LayoutHistory {
+ self.layout.as_ref().unwrap()
+ }
+
+ pub(crate) fn update<F>(&mut self, f: F) -> bool
+ where
+ F: FnOnce(&mut LayoutHistory) -> bool,
+ {
+ let changed = f(self.layout.as_mut().unwrap());
+ if changed {
+ *self = Self::new(
+ self.replication_mode,
+ self.layout.take().unwrap(),
+ std::mem::take(&mut self.ack_lock),
+ );
+ }
+ changed
+ }
+
+ // ------------------ read helpers ---------------
+
+ pub fn all_nodes(&self) -> &[Uuid] {
+ &self.all_nodes
+ }
+
+ pub fn all_nongateway_nodes(&self) -> &[Uuid] {
+ &self.all_nongateway_nodes
+ }
+
+ pub fn ack_map_min(&self) -> u64 {
+ self.ack_map_min
+ }
+
+ pub fn sync_map_min(&self) -> u64 {
+ self.sync_map_min
+ }
+
+ pub fn sync_digest(&self) -> SyncLayoutDigest {
+ SyncLayoutDigest {
+ current: self.layout().current().version,
+ ack_map_min: self.ack_map_min(),
+ min_stored: self.layout().min_stored(),
+ }
+ }
+
+ pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let sync_min = self.sync_map_min;
+ let version = self
+ .layout()
+ .versions
+ .iter()
+ .find(|x| x.version == sync_min)
+ .or(self.layout().versions.last())
+ .unwrap();
+ version
+ .nodes_of(position, version.replication_factor)
+ .collect()
+ }
+
+ pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(position, x.replication_factor).collect())
+ .collect()
+ }
+
+ pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let mut ret = vec![];
+ for version in self.layout().versions.iter() {
+ ret.extend(version.nodes_of(position, version.replication_factor));
+ }
+ ret.sort();
+ ret.dedup();
+ ret
+ }
+
+ pub fn trackers_hash(&self) -> Hash {
+ self.trackers_hash
+ }
+
+ pub fn staging_hash(&self) -> Hash {
+ self.staging_hash
+ }
+
+ pub fn digest(&self) -> RpcLayoutDigest {
+ RpcLayoutDigest {
+ current_version: self.current().version,
+ active_versions: self.versions.len(),
+ trackers_hash: self.trackers_hash,
+ staging_hash: self.staging_hash,
+ }
+ }
+
+ // ------------------ helpers for update tracking ---------------
+
+ pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version which is not currently
+ // locked by an in-progress write operation
+ self.ack_max_free(local_node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(local_node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(local_node_id);
+
+ debug!("ack_map: {:?}", self.update_trackers.ack_map);
+ debug!("sync_map: {:?}", self.update_trackers.sync_map);
+ debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+
+ fn sync_first(&mut self, local_node_id: Uuid) {
+ let first_version = self.min_stored();
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_map
+ .set_max(local_node_id, first_version)
+ });
+ }
+
+ fn sync_ack(&mut self, local_node_id: Uuid) {
+ let sync_map_min = self.sync_map_min;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_ack_map
+ .set_max(local_node_id, sync_map_min)
+ });
+ }
+
+ pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
+ let max_ack = self.max_free_ack();
+ let changed = self.update(|layout| {
+ layout
+ .update_trackers
+ .ack_map
+ .set_max(local_node_id, max_ack)
+ });
+ if changed {
+ info!("ack_until updated to {}", max_ack);
+ }
+ changed
+ }
+
+ pub(crate) fn max_free_ack(&self) -> u64 {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.version)
+ .skip_while(|v| {
+ self.ack_lock
+ .get(v)
+ .map(|x| x.load(Ordering::Relaxed) == 0)
+ .unwrap_or(true)
+ })
+ .next()
+ .unwrap_or(self.current().version)
+ }
+}
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
new file mode 100644
index 00000000..b8cc27da
--- /dev/null
+++ b/src/rpc/layout/history.rs
@@ -0,0 +1,306 @@
+use std::collections::HashSet;
+
+use garage_util::crdt::{Crdt, Lww, LwwMap};
+use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
+use garage_util::error::*;
+
+use super::*;
+use crate::replication_mode::ReplicationMode;
+
+impl LayoutHistory {
+ pub fn new(replication_factor: usize) -> Self {
+ let version = LayoutVersion::new(replication_factor);
+
+ let staging = LayoutStaging {
+ parameters: Lww::<LayoutParameters>::new(version.parameters),
+ roles: LwwMap::new(),
+ };
+
+ LayoutHistory {
+ versions: vec![version],
+ old_versions: vec![],
+ update_trackers: Default::default(),
+ staging: Lww::raw(0, staging),
+ }
+ }
+
+ // ------------------ who stores what now? ---------------
+
+ pub fn current(&self) -> &LayoutVersion {
+ self.versions.last().as_ref().unwrap()
+ }
+
+ pub fn min_stored(&self) -> u64 {
+ self.versions.first().as_ref().unwrap().version
+ }
+
+ pub fn get_all_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].all_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .flat_map(|x| x.all_nodes())
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
+ }
+
+ pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
+ if self.versions.len() == 1 {
+ self.versions[0].nongateway_nodes().to_vec()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .flat_map(|x| x.nongateway_nodes())
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>()
+ }
+ }
+
+ // ---- housekeeping (all invoked by LayoutHelper) ----
+
+ pub(crate) fn keep_current_version_only(&mut self) {
+ while self.versions.len() > 1 {
+ let removed = self.versions.remove(0);
+ self.old_versions.push(removed);
+ }
+ }
+
+ pub(crate) fn cleanup_old_versions(&mut self) {
+ // If there are invalid versions before valid versions, remove them
+ if self.versions.len() > 1 && self.current().check().is_ok() {
+ while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() {
+ let removed = self.versions.remove(0);
+ info!(
+ "Layout history: pruning old invalid version {}",
+ removed.version
+ );
+ }
+ }
+
+ // If there are old versions that no one is reading from anymore,
+ // remove them (keep them in self.old_versions).
+ // ASSUMPTION: we only care about where nodes in the current layout version
+ // are reading from, as we assume older nodes are being discarded.
+ let current_nodes = &self.current().node_id_vec;
+ let min_version = self.min_stored();
+ let sync_ack_map_min = self
+ .update_trackers
+ .sync_ack_map
+ .min_among(current_nodes, min_version);
+ while self.min_stored() < sync_ack_map_min {
+ assert!(self.versions.len() > 1);
+ let removed = self.versions.remove(0);
+ info!(
+ "Layout history: moving version {} to old_versions",
+ removed.version
+ );
+ self.old_versions.push(removed);
+ }
+
+ while self.old_versions.len() > OLD_VERSION_COUNT {
+ let removed = self.old_versions.remove(0);
+ info!("Layout history: removing old_version {}", removed.version);
+ }
+ }
+
+ pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
+ let min_v = self.min_stored();
+ for node in nodes {
+ self.update_trackers.ack_map.set_max(*node, min_v);
+ self.update_trackers.sync_map.set_max(*node, min_v);
+ self.update_trackers.sync_ack_map.set_max(*node, min_v);
+ }
+ }
+
+ pub(crate) fn calculate_sync_map_min_with_quorum(
+ &self,
+ replication_mode: ReplicationMode,
+ all_nongateway_nodes: &[Uuid],
+ ) -> u64 {
+ // This function calculates the minimum layout version from which
+ // it is safe to read if we want to maintain read-after-write consistency.
+ // In the general case the computation can be a bit expensive so
+ // we try to optimize it in several ways.
+
+ // If there is only one layout version, we know that's the one
+ // we need to read from.
+ if self.versions.len() == 1 {
+ return self.current().version;
+ }
+
+ let quorum = replication_mode.write_quorum();
+
+ let min_version = self.min_stored();
+ let global_min = self
+ .update_trackers
+ .sync_map
+ .min_among(all_nongateway_nodes, min_version);
+
+ // If the write quorums are equal to the total number of nodes,
+ // i.e. no writes can succeed while they are not written to all nodes,
+ // then we must in all case wait for all nodes to complete a sync.
+ // This is represented by reading from the layout with version
+ // number global_min, the smallest layout version for which all nodes
+ // have completed a sync.
+ if quorum == self.current().replication_factor {
+ return global_min;
+ }
+
+ // In the general case, we need to look at all write sets for all partitions,
+ // and find a safe layout version to read for that partition. We then
+ // take the minimum value among all partition as the safe layout version
+ // to read in all cases (the layout version to which all reads are directed).
+ let mut current_min = self.current().version;
+ let mut sets_done = HashSet::<Vec<Uuid>>::new();
+
+ for (_, p_hash) in self.current().partitions() {
+ for v in self.versions.iter() {
+ if v.version == self.current().version {
+ // We don't care about whether nodes in the latest layout version
+ // have completed a sync or not, as the sync is push-only
+ // and by definition nodes in the latest layout version do not
+ // hold data that must be pushed to nodes in the latest layout
+ // version, since that's the same version (any data that's
+ // already in the latest version is assumed to have been written
+ // by an operation that ensured a quorum of writes within
+ // that version).
+ continue;
+ }
+
+ // Determine set of nodes for partition p in layout version v.
+ // Sort the node set to avoid duplicate computations.
+ let mut set = v
+ .nodes_of(&p_hash, v.replication_factor)
+ .collect::<Vec<Uuid>>();
+ set.sort();
+
+ // If this set was already processed, skip it.
+ if sets_done.contains(&set) {
+ continue;
+ }
+
+ // Find the value of the sync update trackers that is the
+ // highest possible minimum within a quorum of nodes.
+ let mut sync_values = set
+ .iter()
+ .map(|x| self.update_trackers.sync_map.get(x, min_version))
+ .collect::<Vec<_>>();
+ sync_values.sort();
+ let set_min = sync_values[sync_values.len() - quorum];
+ if set_min < current_min {
+ current_min = set_min;
+ }
+ // defavorable case, we know we are at the smallest possible version,
+ // so we can stop early
+ assert!(current_min >= global_min);
+ if current_min == global_min {
+ return current_min;
+ }
+
+ // Add set to already processed sets
+ sets_done.insert(set);
+ }
+ }
+
+ current_min
+ }
+
+ pub(crate) fn calculate_trackers_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
+ }
+
+ pub(crate) fn calculate_staging_hash(&self) -> Hash {
+ blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
+ }
+
+ // ================== updates to layout, public interface ===================
+
+ pub fn merge(&mut self, other: &LayoutHistory) -> bool {
+ let mut changed = false;
+
+ // Add any new versions to history
+ for v2 in other.versions.iter() {
+ if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
+ // Version is already present, check consistency
+ if v1 != v2 {
+ error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
+ }
+ } else if self.versions.iter().all(|v| v.version != v2.version - 1) {
+ error!(
+ "Cannot receive new layout version {}, version {} is missing",
+ v2.version,
+ v2.version - 1
+ );
+ } else {
+ self.versions.push(v2.clone());
+ changed = true;
+ }
+ }
+
+ // Merge trackers
+ let c = self.update_trackers.merge(&other.update_trackers);
+ changed = changed || c;
+
+ // Merge staged layout changes
+ if self.staging != other.staging {
+ let prev_staging = self.staging.clone();
+ self.staging.merge(&other.staging);
+ changed = changed || self.staging != prev_staging;
+ }
+
+ changed
+ }
+
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.current().version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ // Compute new version and add it to history
+ let (new_version, msg) = self
+ .current()
+ .clone()
+ .calculate_next_version(self.staging.get())?;
+
+ self.versions.push(new_version);
+ self.cleanup_old_versions();
+
+ // Reset the staged layout changes
+ self.staging.update(LayoutStaging {
+ parameters: self.staging.get().parameters.clone(),
+ roles: LwwMap::new(),
+ });
+
+ Ok((self, msg))
+ }
+
+ pub fn revert_staged_changes(mut self) -> Result<Self, Error> {
+ self.staging.update(LayoutStaging {
+ parameters: Lww::new(self.current().parameters),
+ roles: LwwMap::new(),
+ });
+
+ Ok(self)
+ }
+
+ pub fn check(&self) -> Result<(), String> {
+ // TODO: anything more ?
+ self.current().check()
+ }
+}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
new file mode 100644
index 00000000..0b6c7e63
--- /dev/null
+++ b/src/rpc/layout/manager.rs
@@ -0,0 +1,378 @@
+use std::collections::HashMap;
+use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
+use std::time::Duration;
+
+use tokio::sync::Notify;
+
+use garage_net::endpoint::Endpoint;
+use garage_net::peering::PeeringManager;
+use garage_net::NodeID;
+
+use garage_util::config::Config;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::persister::Persister;
+
+use super::*;
+use crate::replication_mode::ReplicationMode;
+use crate::rpc_helper::*;
+use crate::system::*;
+
+pub struct LayoutManager {
+ node_id: Uuid,
+ replication_mode: ReplicationMode,
+ persist_cluster_layout: Persister<LayoutHistory>,
+
+ layout: Arc<RwLock<LayoutHelper>>,
+ pub(crate) change_notify: Arc<Notify>,
+
+ table_sync_version: Mutex<HashMap<String, u64>>,
+
+ pub(crate) rpc_helper: RpcHelper,
+ system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+}
+
+impl LayoutManager {
+ pub fn new(
+ config: &Config,
+ node_id: NodeID,
+ system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ peering: Arc<PeeringManager>,
+ replication_mode: ReplicationMode,
+ ) -> Result<Arc<Self>, Error> {
+ let replication_factor = replication_mode.replication_factor();
+
+ let persist_cluster_layout: Persister<LayoutHistory> =
+ Persister::new(&config.metadata_dir, "cluster_layout");
+
+ let cluster_layout = match persist_cluster_layout.load() {
+ Ok(x) => {
+ if x.current().replication_factor != replication_mode.replication_factor() {
+ return Err(Error::Message(format!(
+ "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
+ x.current().replication_factor,
+ replication_factor
+ )));
+ }
+ x
+ }
+ Err(e) => {
+ info!(
+ "No valid previous cluster layout stored ({}), starting fresh.",
+ e
+ );
+ LayoutHistory::new(replication_factor)
+ }
+ };
+
+ let mut cluster_layout =
+ LayoutHelper::new(replication_mode, cluster_layout, Default::default());
+ cluster_layout.update_trackers(node_id.into());
+
+ let layout = Arc::new(RwLock::new(cluster_layout));
+ let change_notify = Arc::new(Notify::new());
+
+ let rpc_helper = RpcHelper::new(
+ node_id.into(),
+ peering,
+ layout.clone(),
+ config.rpc_timeout_msec.map(Duration::from_millis),
+ );
+
+ Ok(Arc::new(Self {
+ node_id: node_id.into(),
+ replication_mode,
+ persist_cluster_layout,
+ layout,
+ change_notify,
+ table_sync_version: Mutex::new(HashMap::new()),
+ system_endpoint,
+ rpc_helper,
+ }))
+ }
+
+ // ---- PUBLIC INTERFACE ----
+
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
+ self.layout.read().unwrap()
+ }
+
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &LayoutHistory,
+ ) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
+ pub fn add_table(&self, table_name: &'static str) {
+ let first_version = self.layout().versions.first().unwrap().version;
+
+ self.table_sync_version
+ .lock()
+ .unwrap()
+ .insert(table_name.to_string(), first_version);
+ }
+
+ pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) {
+ let mut table_sync_version = self.table_sync_version.lock().unwrap();
+ *table_sync_version.get_mut(table_name).unwrap() = version;
+ let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap();
+ drop(table_sync_version);
+
+ let mut layout = self.layout.write().unwrap();
+ if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
+ info!("sync_until updated to {}", sync_until);
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
+ fn ack_new_version(self: &Arc<Self>) {
+ let mut layout = self.layout.write().unwrap();
+ if layout.ack_max_free(self.node_id) {
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
+ // ---- ACK LOCKING ----
+
+ pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
+ let layout = self.layout();
+ let version = layout.current().version;
+ let nodes = layout.storage_sets_of(position);
+ layout
+ .ack_lock
+ .get(&version)
+ .unwrap()
+ .fetch_add(1, Ordering::Relaxed);
+ WriteLock::new(version, self, nodes)
+ }
+
+ // ---- INTERNALS ---
+
+ fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
+ let mut layout = self.layout.write().unwrap();
+ let prev_digest = layout.digest();
+ let prev_layout_check = layout.check().is_ok();
+
+ if !prev_layout_check || adv.check().is_ok() {
+ if layout.update(|l| l.merge(adv)) {
+ layout.update_trackers(self.node_id);
+ if prev_layout_check && layout.check().is_err() {
+ panic!("Merged two correct layouts and got an incorrect layout.");
+ }
+ assert!(layout.digest() != prev_digest);
+ return Some(layout.clone());
+ }
+ }
+
+ None
+ }
+
+ fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
+ let mut layout = self.layout.write().unwrap();
+ let prev_digest = layout.digest();
+
+ if layout.update_trackers != *adv {
+ if layout.update(|l| l.update_trackers.merge(adv)) {
+ layout.update_trackers(self.node_id);
+ assert!(layout.digest() != prev_digest);
+ return Some(layout.update_trackers.clone());
+ }
+ }
+
+ None
+ }
+
+ async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
+ let resp = self
+ .rpc_helper
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullClusterLayout,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
+ if let Err(e) = self.handle_advertise_cluster_layout(&layout).await {
+ warn!("In pull_cluster_layout: {}", e);
+ }
+ }
+ }
+
+ async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) {
+ let resp = self
+ .rpc_helper
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullClusterLayoutTrackers,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp {
+ if let Err(e) = self
+ .handle_advertise_cluster_layout_trackers(&trackers)
+ .await
+ {
+ warn!("In pull_cluster_layout_trackers: {}", e);
+ }
+ }
+ }
+
+ /// Save cluster layout data to disk
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
+ let layout = self.layout.read().unwrap().clone();
+ self.persist_cluster_layout
+ .save_async(&layout)
+ .await
+ .expect("Cannot save current cluster layout");
+ Ok(())
+ }
+
+ fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) {
+ tokio::spawn({
+ let this = self.clone();
+ async move {
+ if let Err(e) = this
+ .rpc_helper
+ .broadcast(
+ &this.system_endpoint,
+ rpc,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
+ }
+ });
+ }
+
+ // ---- RPC HANDLERS ----
+
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) {
+ let local = self.layout().digest();
+ if remote.current_version > local.current_version
+ || remote.active_versions != local.active_versions
+ || remote.staging_hash != local.staging_hash
+ {
+ tokio::spawn({
+ let this = self.clone();
+ async move { this.pull_cluster_layout(from).await }
+ });
+ } else if remote.trackers_hash != local.trackers_hash {
+ tokio::spawn({
+ let this = self.clone();
+ async move { this.pull_cluster_layout_trackers(from).await }
+ });
+ }
+ }
+
+ pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
+ let layout = self.layout.read().unwrap().clone();
+ SystemRpc::AdvertiseClusterLayout(layout)
+ }
+
+ pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc {
+ let layout = self.layout.read().unwrap();
+ SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone())
+ }
+
+ pub(crate) async fn handle_advertise_cluster_layout(
+ self: &Arc<Self>,
+ adv: &LayoutHistory,
+ ) -> Result<SystemRpc, Error> {
+ debug!(
+ "handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}",
+ adv.versions.len(),
+ adv.current().version,
+ adv.update_trackers
+ );
+
+ if adv.current().replication_factor != self.replication_mode.replication_factor() {
+ let msg = format!(
+ "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
+ adv.current().replication_factor,
+ self.replication_mode.replication_factor()
+ );
+ error!("{}", msg);
+ return Err(Error::Message(msg));
+ }
+
+ if let Some(new_layout) = self.merge_layout(adv) {
+ debug!("handle_advertise_cluster_layout: some changes were added to the current stuff");
+
+ self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout));
+ self.save_cluster_layout().await?;
+ }
+
+ Ok(SystemRpc::Ok)
+ }
+
+ pub(crate) async fn handle_advertise_cluster_layout_trackers(
+ self: &Arc<Self>,
+ trackers: &UpdateTrackers,
+ ) -> Result<SystemRpc, Error> {
+ debug!("handle_advertise_cluster_layout_trackers: {:?}", trackers);
+
+ if let Some(new_trackers) = self.merge_layout_trackers(trackers) {
+ self.change_notify.notify_waiters();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers));
+ self.save_cluster_layout().await?;
+ }
+
+ Ok(SystemRpc::Ok)
+ }
+}
+
+// ---- ack lock ----
+
+pub struct WriteLock<T> {
+ layout_version: u64,
+ layout_manager: Arc<LayoutManager>,
+ value: T,
+}
+
+impl<T> WriteLock<T> {
+ fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self {
+ Self {
+ layout_version: version,
+ layout_manager: layout_manager.clone(),
+ value,
+ }
+ }
+}
+
+impl<T> AsRef<T> for WriteLock<T> {
+ fn as_ref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> AsMut<T> for WriteLock<T> {
+ fn as_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+}
+
+impl<T> Drop for WriteLock<T> {
+ fn drop(&mut self) {
+ let layout = self.layout_manager.layout(); // acquire read lock
+ if let Some(counter) = layout.ack_lock.get(&self.layout_version) {
+ let prev_lock = counter.fetch_sub(1, Ordering::Relaxed);
+ if prev_lock == 1 && layout.current().version > self.layout_version {
+ drop(layout); // release read lock, write lock will be acquired
+ self.layout_manager.ack_new_version();
+ }
+ } else {
+ error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version);
+ }
+ }
+}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
new file mode 100644
index 00000000..33676c37
--- /dev/null
+++ b/src/rpc/layout/mod.rs
@@ -0,0 +1,478 @@
+use std::fmt;
+
+use bytesize::ByteSize;
+
+use garage_util::crdt::{AutoCrdt, Crdt};
+use garage_util::data::Uuid;
+
+mod graph_algo;
+mod helper;
+mod history;
+mod version;
+
+#[cfg(test)]
+mod test;
+
+pub mod manager;
+
+// ---- re-exports ----
+
+pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest};
+pub use manager::WriteLock;
+pub use version::*;
+
+// ---- defines: partitions ----
+
+/// A partition id, which is stored on 16 bits
+/// i.e. we have up to 2**16 partitions.
+/// (in practice we have exactly 2**PARTITION_BITS partitions)
+pub type Partition = u16;
+
+// TODO: make this constant parametrizable in the config file
+// For deployments with many nodes it might make sense to bump
+// it up to 10.
+// Maximum value : 16
+/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
+/// presence of numerous nodes, but exponentially bigger ring. Max 16
+pub const PARTITION_BITS: usize = 8;
+
+const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+
+// ---- defines: nodes ----
+
+// Type to store compactly the id of a node in the system
+// Change this to u16 the day we want to have more than 256 nodes in a cluster
+pub type CompactNodeType = u8;
+pub const MAX_NODE_NUMBER: usize = 256;
+
+// ======== actual data structures for the layout data ========
+// ======== that is persisted to disk ========
+// some small utility impls are at the end of this file,
+// but most of the code that actually computes stuff is in
+// version.rs, history.rs and helper.rs
+
+mod v08 {
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::LwwMap;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ // see comments in v010::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ #[serde(with = "serde_bytes")]
+ pub ring_assignation_data: Vec<CompactNodeType>,
+
+ /// Role changes which are staged for the next version of the layout
+ pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRoleV(pub Option<NodeRole>);
+
+ /// The user-assigned roles of cluster nodes
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct NodeRole {
+ /// Datacenter at which this entry belong. This information is used to
+ /// perform a better geodistribution
+ pub zone: String,
+ /// The capacity of the node
+ /// If this is set to None, the node does not participate in storing data for the system
+ /// and is only active as an API gateway to other nodes
+ pub capacity: Option<u64>,
+ /// A set of tags to recognize the node
+ pub tags: Vec<String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for ClusterLayout {}
+}
+
+mod v09 {
+ use super::v08;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ pub use v08::{NodeRole, NodeRoleV};
+
+ /// The layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ // see comments in v010::ClusterLayout
+ pub node_id_vec: Vec<Uuid>,
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+
+ /// Parameters to be used in the next partition assignment computation.
+ pub staging_parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+ }
+
+ /// This struct is used to set the parameters to be used in the assignment computation
+ /// algorithm. It is stored as a Crdt.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct LayoutParameters {
+ pub zone_redundancy: ZoneRedundancy,
+ }
+
+ /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
+ /// of each partition on at least that number of different zones.
+ /// Otherwise, copies will be stored on the maximum possible number of zones.
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub enum ZoneRedundancy {
+ AtLeast(usize),
+ Maximum,
+ }
+
+ impl garage_util::migrate::Migrate for ClusterLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09layout";
+
+ type Previous = v08::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ use itertools::Itertools;
+
+ // In the old layout, capacities are in an arbitrary unit,
+ // but in the new layout they are in bytes.
+ // Here we arbitrarily multiply everything by 1G,
+ // such that 1 old capacity unit = 1GB in the new units.
+ // This is totally arbitrary and won't work for most users.
+ let cap_mul = 1024 * 1024 * 1024;
+ let roles = multiply_all_capacities(previous.roles, cap_mul);
+ let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
+ let node_id_vec = previous.node_id_vec;
+
+ // Determine partition size
+ let mut tmp = previous.ring_assignation_data.clone();
+ tmp.sort();
+ let partition_size = tmp
+ .into_iter()
+ .dedup_with_count()
+ .map(|(npart, node)| {
+ roles
+ .get(&node_id_vec[node as usize])
+ .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
+ .unwrap_or(0) / npart as u64
+ })
+ .min()
+ .unwrap_or(0);
+
+ // By default, zone_redundancy is maximum possible value
+ let parameters = LayoutParameters {
+ zone_redundancy: ZoneRedundancy::Maximum,
+ };
+
+ Self {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size,
+ parameters,
+ roles,
+ node_id_vec,
+ ring_assignment_data: previous.ring_assignation_data,
+ staging_parameters: Lww::new(parameters),
+ staging_roles,
+ staging_hash: [0u8; 32].into(), // will be set in the next migration
+ }
+ }
+ }
+
+ fn multiply_all_capacities(
+ old_roles: LwwMap<Uuid, NodeRoleV>,
+ mul: u64,
+ ) -> LwwMap<Uuid, NodeRoleV> {
+ let mut new_roles = LwwMap::new();
+ for (node, ts, role) in old_roles.items() {
+ let mut role = role.clone();
+ if let NodeRoleV(Some(NodeRole {
+ capacity: Some(ref mut cap),
+ ..
+ })) = role
+ {
+ *cap *= mul;
+ }
+ new_roles.merge_raw(node, *ts, &role);
+ }
+ new_roles
+ }
+}
+
+mod v010 {
+ use super::v09;
+ use crate::layout::CompactNodeType;
+ use garage_util::crdt::{Lww, LwwMap};
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+ pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
+
+ /// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
+ pub const OLD_VERSION_COUNT: usize = 5;
+
+ /// The history of cluster layouts, with trackers to keep a record
+ /// of which nodes are up-to-date to current cluster data
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutHistory {
+ /// The versions currently in use in the cluster
+ pub versions: Vec<LayoutVersion>,
+ /// At most 5 of the previous versions, not used by the garage_table
+ /// module, but usefull for the garage_block module to find data blocks
+ /// that have not yet been moved
+ pub old_versions: Vec<LayoutVersion>,
+
+ /// Update trackers
+ pub update_trackers: UpdateTrackers,
+
+ /// Staged changes for the next version
+ pub staging: Lww<LayoutStaging>,
+ }
+
+ /// A version of the layout of the cluster, i.e. the list of roles
+ /// which are assigned to each cluster node
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutVersion {
+ /// The number of this version
+ pub version: u64,
+
+ /// Roles assigned to nodes in this version
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ /// Parameters used to compute the assignment currently given by
+ /// ring_assignment_data
+ pub parameters: LayoutParameters,
+
+ /// The number of replicas for each data partition
+ pub replication_factor: usize,
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+
+ /// node_id_vec: a vector of node IDs with a role assigned
+ /// in the system (this includes gateway nodes).
+ /// The order here is different than the vec stored by `roles`, because:
+ /// 1. non-gateway nodes are first so that they have lower numbers
+ /// 2. nodes that don't have a role are excluded (but they need to
+ /// stay in the CRDT as tombstones)
+ pub node_id_vec: Vec<Uuid>,
+ /// number of non-gateway nodes, which are the first ids in node_id_vec
+ pub nongateway_node_count: usize,
+ /// The assignation of data partitions to nodes, the values
+ /// are indices in node_id_vec
+ #[serde(with = "serde_bytes")]
+ pub ring_assignment_data: Vec<CompactNodeType>,
+ }
+
+ /// The staged changes for the next layout version
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutStaging {
+ /// Parameters to be used in the next partition assignment computation.
+ pub parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ }
+
+ /// The tracker of acknowlegments and data syncs around the cluster
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
+ pub struct UpdateTrackers {
+ /// The highest layout version number each node has ack'ed
+ pub ack_map: UpdateTracker,
+ /// The highest layout version number each node has synced data for
+ pub sync_map: UpdateTracker,
+ /// The highest layout version number each node has
+ /// ack'ed that all other nodes have synced data for
+ pub sync_ack_map: UpdateTracker,
+ }
+
+ /// Generic update tracker struct
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
+ pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
+
+ impl garage_util::migrate::Migrate for LayoutHistory {
+ const VERSION_MARKER: &'static [u8] = b"G010lh";
+
+ type Previous = v09::ClusterLayout;
+
+ fn migrate(previous: Self::Previous) -> Self {
+ let nongateway_node_count = previous
+ .node_id_vec
+ .iter()
+ .enumerate()
+ .filter(|(_, uuid)| {
+ let role = previous.roles.get(uuid);
+ matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
+ })
+ .map(|(i, _)| i + 1)
+ .max()
+ .unwrap_or(0);
+
+ let version = LayoutVersion {
+ version: previous.version,
+ replication_factor: previous.replication_factor,
+ partition_size: previous.partition_size,
+ parameters: previous.parameters,
+ roles: previous.roles,
+ node_id_vec: previous.node_id_vec,
+ nongateway_node_count,
+ ring_assignment_data: previous.ring_assignment_data,
+ };
+ let update_tracker = UpdateTracker(
+ version
+ .nongateway_nodes()
+ .iter()
+ .copied()
+ .map(|x| (x, version.version))
+ .collect::<BTreeMap<Uuid, u64>>(),
+ );
+ let staging = LayoutStaging {
+ parameters: previous.staging_parameters,
+ roles: previous.staging_roles,
+ };
+ Self {
+ versions: vec![version],
+ old_versions: vec![],
+ update_trackers: UpdateTrackers {
+ ack_map: update_tracker.clone(),
+ sync_map: update_tracker.clone(),
+ sync_ack_map: update_tracker,
+ },
+ staging: Lww::raw(previous.version, staging),
+ }
+ }
+ }
+}
+
+pub use v010::*;
+
+// ---- utility functions ----
+
+impl AutoCrdt for LayoutParameters {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for NodeRoleV {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for LayoutStaging {
+ fn merge(&mut self, other: &LayoutStaging) {
+ self.parameters.merge(&other.parameters);
+ self.roles.merge(&other.roles);
+ }
+}
+
+impl NodeRole {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => ByteSize::b(c).to_string_as(false),
+ None => "gateway".to_string(),
+ }
+ }
+
+ pub fn tags_string(&self) -> String {
+ self.tags.join(",")
+ }
+}
+
+impl fmt::Display for ZoneRedundancy {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ ZoneRedundancy::Maximum => write!(f, "maximum"),
+ ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
+ }
+ }
+}
+
+impl core::str::FromStr for ZoneRedundancy {
+ type Err = &'static str;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
+ x => {
+ let v = x
+ .parse::<usize>()
+ .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
+ Ok(ZoneRedundancy::AtLeast(v))
+ }
+ }
+ }
+}
+
+impl UpdateTracker {
+ fn merge(&mut self, other: &UpdateTracker) -> bool {
+ let mut changed = false;
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
+ if *v > *v_mut {
+ *v_mut = *v;
+ changed = true;
+ }
+ } else {
+ self.0.insert(*k, *v);
+ changed = true;
+ }
+ }
+ changed
+ }
+
+ /// This bumps the update tracker for a given node up to the specified value.
+ /// This has potential impacts on the correctness of Garage and should only
+ /// be used in very specific circumstances.
+ pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
+ match self.0.get_mut(&peer) {
+ Some(e) if *e < value => {
+ *e = value;
+ true
+ }
+ None => {
+ self.0.insert(peer, value);
+ true
+ }
+ _ => false,
+ }
+ }
+
+ pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
+ storage_nodes
+ .iter()
+ .map(|x| self.get(x, min_version))
+ .min()
+ .unwrap_or(min_version)
+ }
+
+ pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
+ self.0.get(node).copied().unwrap_or(min_version)
+ }
+}
+
+impl UpdateTrackers {
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
+ let c1 = self.ack_map.merge(&other.ack_map);
+ let c2 = self.sync_map.merge(&other.sync_map);
+ let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
+ c1 || c2 || c3
+ }
+}
diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs
new file mode 100644
index 00000000..88eb518e
--- /dev/null
+++ b/src/rpc/layout/test.rs
@@ -0,0 +1,157 @@
+use std::cmp::min;
+use std::collections::HashMap;
+
+use garage_util::crdt::Crdt;
+use garage_util::error::*;
+
+use crate::layout::*;
+
+// This function checks that the partition size S computed is at least better than the
+// one given by a very naive algorithm. To do so, we try to run the naive algorithm
+// assuming a partion size of S+1. If we succed, it means that the optimal assignment
+// was not optimal. The naive algorithm is the following :
+// - we compute the max number of partitions associated to every node, capped at the
+// partition number. It gives the number of tokens of every node.
+// - every zone has a number of tokens equal to the sum of the tokens of its nodes.
+// - we cycle over the partitions and associate zone tokens while respecting the
+// zone redundancy constraint.
+// NOTE: the naive algorithm is not optimal. Counter example:
+// take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
+// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
+// With these parameters, the naive algo fails, whereas there is a solution:
+// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
+fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
+ let over_size = cl.partition_size + 1;
+ let mut zone_token = HashMap::<String, usize>::new();
+
+ let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
+
+ if zones.is_empty() {
+ return Ok(false);
+ }
+
+ for z in zones.iter() {
+ zone_token.insert(z.clone(), 0);
+ }
+ for uuid in cl.nongateway_nodes() {
+ let z = cl.expect_get_node_zone(&uuid);
+ let c = cl.expect_get_node_capacity(&uuid);
+ zone_token.insert(
+ z.to_string(),
+ zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize),
+ );
+ }
+
+ // For every partition, we count the number of zone already associated and
+ // the name of the last zone associated
+
+ let mut id_zone_token = vec![0; zones.len()];
+ for (z, t) in zone_token.iter() {
+ id_zone_token[zone_to_id[z]] = *t;
+ }
+
+ let mut nb_token = vec![0; NB_PARTITIONS];
+ let mut last_zone = vec![zones.len(); NB_PARTITIONS];
+
+ let mut curr_zone = 0;
+
+ let redundancy = cl.effective_zone_redundancy();
+
+ for replic in 0..cl.replication_factor {
+ for p in 0..NB_PARTITIONS {
+ while id_zone_token[curr_zone] == 0
+ || (last_zone[p] == curr_zone
+ && redundancy - nb_token[p] <= cl.replication_factor - replic)
+ {
+ curr_zone += 1;
+ if curr_zone >= zones.len() {
+ return Ok(true);
+ }
+ }
+ id_zone_token[curr_zone] -= 1;
+ if last_zone[p] != curr_zone {
+ nb_token[p] += 1;
+ last_zone[p] = curr_zone;
+ }
+ }
+ }
+
+ return Ok(false);
+}
+
+fn show_msg(msg: &Message) {
+ for s in msg.iter() {
+ println!("{}", s);
+ }
+}
+
+fn update_layout(
+ cl: &mut LayoutHistory,
+ node_capacity_vec: &[u64],
+ node_zone_vec: &[&'static str],
+ zone_redundancy: usize,
+) {
+ let staging = cl.staging.get_mut();
+
+ for (i, (capacity, zone)) in node_capacity_vec
+ .iter()
+ .zip(node_zone_vec.iter())
+ .enumerate()
+ {
+ let node_id = [i as u8; 32].into();
+
+ let update = staging.roles.update_mutator(
+ node_id,
+ NodeRoleV(Some(NodeRole {
+ zone: zone.to_string(),
+ capacity: Some(*capacity),
+ tags: (vec![]),
+ })),
+ );
+ staging.roles.merge(&update);
+ }
+ staging.parameters.update(LayoutParameters {
+ zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy),
+ });
+}
+
+#[test]
+fn test_assignment() {
+ let mut node_capacity_vec = vec![4000, 1000, 2000];
+ let mut node_zone_vec = vec!["A", "B", "C"];
+
+ let mut cl = LayoutHistory::new(3);
+ update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
+ let v = cl.current().version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(check_against_naive(cl.current()).unwrap());
+
+ node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
+ node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"];
+ update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2);
+ let v = cl.current().version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(check_against_naive(cl.current()).unwrap());
+
+ node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
+ update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
+ let v = cl.current().version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(check_against_naive(cl.current()).unwrap());
+
+ node_capacity_vec = vec![
+ 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000,
+ ];
+ update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1);
+ let v = cl.current().version;
+ let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(check_against_naive(cl.current()).unwrap());
+}
diff --git a/src/rpc/layout.rs b/src/rpc/layout/version.rs
index e02a180b..ee4b2821 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout/version.rs
@@ -1,375 +1,55 @@
-use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
-use std::fmt;
+use std::convert::TryInto;
use bytesize::ByteSize;
use itertools::Itertools;
-use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap};
+use garage_util::crdt::{Crdt, LwwMap};
use garage_util::data::*;
-use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
-use crate::graph_algo::*;
-
-use crate::ring::*;
-
-use std::convert::TryInto;
-
-const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+use super::graph_algo::*;
+use super::*;
// The Message type will be used to collect information on the algorithm.
-type Message = Vec<String>;
-
-mod v08 {
- use crate::ring::CompactNodeType;
- use garage_util::crdt::LwwMap;
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// node_id_vec: a vector of node IDs with a role assigned
- /// in the system (this includes gateway nodes).
- /// The order here is different than the vec stored by `roles`, because:
- /// 1. non-gateway nodes are first so that they have lower numbers
- /// 2. nodes that don't have a role are excluded (but they need to
- /// stay in the CRDT as tombstones)
- pub node_id_vec: Vec<Uuid>,
- /// the assignation of data partitions to node, the values
- /// are indices in node_id_vec
- #[serde(with = "serde_bytes")]
- pub ring_assignation_data: Vec<CompactNodeType>,
-
- /// Role changes which are staged for the next version of the layout
- pub staging: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
-
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRoleV(pub Option<NodeRole>);
-
- /// The user-assigned roles of cluster nodes
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
- pub struct NodeRole {
- /// Datacenter at which this entry belong. This information is used to
- /// perform a better geodistribution
- pub zone: String,
- /// The capacity of the node
- /// If this is set to None, the node does not participate in storing data for the system
- /// and is only active as an API gateway to other nodes
- pub capacity: Option<u64>,
- /// A set of tags to recognize the node
- pub tags: Vec<String>,
- }
-
- impl garage_util::migrate::InitialFormat for ClusterLayout {}
-}
-
-mod v09 {
- use super::v08;
- use crate::ring::CompactNodeType;
- use garage_util::crdt::{Lww, LwwMap};
- use garage_util::data::{Hash, Uuid};
- use serde::{Deserialize, Serialize};
- pub use v08::{NodeRole, NodeRoleV};
-
- /// The layout of the cluster, i.e. the list of roles
- /// which are assigned to each cluster node
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub struct ClusterLayout {
- pub version: u64,
-
- pub replication_factor: usize,
-
- /// This attribute is only used to retain the previously computed partition size,
- /// to know to what extent does it change with the layout update.
- pub partition_size: u64,
- /// Parameters used to compute the assignment currently given by
- /// ring_assignment_data
- pub parameters: LayoutParameters,
-
- pub roles: LwwMap<Uuid, NodeRoleV>,
-
- /// see comment in v08::ClusterLayout
- pub node_id_vec: Vec<Uuid>,
- /// see comment in v08::ClusterLayout
- #[serde(with = "serde_bytes")]
- pub ring_assignment_data: Vec<CompactNodeType>,
-
- /// Parameters to be used in the next partition assignment computation.
- pub staging_parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub staging_roles: LwwMap<Uuid, NodeRoleV>,
- pub staging_hash: Hash,
- }
+pub type Message = Vec<String>;
- /// This struct is used to set the parameters to be used in the assignment computation
- /// algorithm. It is stored as a Crdt.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub struct LayoutParameters {
- pub zone_redundancy: ZoneRedundancy,
- }
-
- /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
- /// of each partition on at least that number of different zones.
- /// Otherwise, copies will be stored on the maximum possible number of zones.
- #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
- pub enum ZoneRedundancy {
- AtLeast(usize),
- Maximum,
- }
-
- impl garage_util::migrate::Migrate for ClusterLayout {
- const VERSION_MARKER: &'static [u8] = b"G09layout";
-
- type Previous = v08::ClusterLayout;
-
- fn migrate(previous: Self::Previous) -> Self {
- use itertools::Itertools;
-
- // In the old layout, capacities are in an arbitrary unit,
- // but in the new layout they are in bytes.
- // Here we arbitrarily multiply everything by 1G,
- // such that 1 old capacity unit = 1GB in the new units.
- // This is totally arbitrary and won't work for most users.
- let cap_mul = 1024 * 1024 * 1024;
- let roles = multiply_all_capacities(previous.roles, cap_mul);
- let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
- let node_id_vec = previous.node_id_vec;
-
- // Determine partition size
- let mut tmp = previous.ring_assignation_data.clone();
- tmp.sort();
- let partition_size = tmp
- .into_iter()
- .dedup_with_count()
- .map(|(npart, node)| {
- roles
- .get(&node_id_vec[node as usize])
- .and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
- .unwrap_or(0) / npart as u64
- })
- .min()
- .unwrap_or(0);
-
- // By default, zone_redundancy is maximum possible value
- let parameters = LayoutParameters {
- zone_redundancy: ZoneRedundancy::Maximum,
- };
-
- let mut res = Self {
- version: previous.version,
- replication_factor: previous.replication_factor,
- partition_size,
- parameters,
- roles,
- node_id_vec,
- ring_assignment_data: previous.ring_assignation_data,
- staging_parameters: Lww::new(parameters),
- staging_roles,
- staging_hash: [0u8; 32].into(),
- };
- res.staging_hash = res.calculate_staging_hash();
- res
- }
- }
-
- fn multiply_all_capacities(
- old_roles: LwwMap<Uuid, NodeRoleV>,
- mul: u64,
- ) -> LwwMap<Uuid, NodeRoleV> {
- let mut new_roles = LwwMap::new();
- for (node, ts, role) in old_roles.items() {
- let mut role = role.clone();
- if let NodeRoleV(Some(NodeRole {
- capacity: Some(ref mut cap),
- ..
- })) = role
- {
- *cap *= mul;
- }
- new_roles.merge_raw(node, *ts, &role);
- }
- new_roles
- }
-}
-
-pub use v09::*;
-
-impl AutoCrdt for LayoutParameters {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl AutoCrdt for NodeRoleV {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl NodeRole {
- pub fn capacity_string(&self) -> String {
- match self.capacity {
- Some(c) => ByteSize::b(c).to_string_as(false),
- None => "gateway".to_string(),
- }
- }
-
- pub fn tags_string(&self) -> String {
- self.tags.join(",")
- }
-}
-
-impl fmt::Display for ZoneRedundancy {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- ZoneRedundancy::Maximum => write!(f, "maximum"),
- ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
- }
- }
-}
-
-impl core::str::FromStr for ZoneRedundancy {
- type Err = &'static str;
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
- x => {
- let v = x
- .parse::<usize>()
- .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
- Ok(ZoneRedundancy::AtLeast(v))
- }
- }
- }
-}
-
-// Implementation of the ClusterLayout methods unrelated to the assignment algorithm.
-impl ClusterLayout {
+impl LayoutVersion {
pub fn new(replication_factor: usize) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum
// possible value will be used depending on the cluster topology
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
- let staging_parameters = Lww::<LayoutParameters>::new(parameters);
-
- let empty_lwwmap = LwwMap::new();
- let mut ret = ClusterLayout {
+ LayoutVersion {
version: 0,
replication_factor,
partition_size: 0,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
+ nongateway_node_count: 0,
ring_assignment_data: Vec::new(),
parameters,
- staging_parameters,
- staging_roles: empty_lwwmap,
- staging_hash: [0u8; 32].into(),
- };
- ret.staging_hash = ret.calculate_staging_hash();
- ret
- }
-
- fn calculate_staging_hash(&self) -> Hash {
- let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
- blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
- }
-
- pub fn merge(&mut self, other: &ClusterLayout) -> bool {
- match other.version.cmp(&self.version) {
- Ordering::Greater => {
- *self = other.clone();
- true
- }
- Ordering::Equal => {
- self.staging_parameters.merge(&other.staging_parameters);
- self.staging_roles.merge(&other.staging_roles);
-
- let new_staging_hash = self.calculate_staging_hash();
- let changed = new_staging_hash != self.staging_hash;
-
- self.staging_hash = new_staging_hash;
-
- changed
- }
- Ordering::Less => false,
}
}
- pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.roles.merge(&self.staging_roles);
- self.roles.retain(|(_, _, v)| v.0.is_some());
- self.parameters = *self.staging_parameters.get();
-
- self.staging_roles.clear();
- self.staging_hash = self.calculate_staging_hash();
+ // ===================== accessors ======================
- let msg = self.calculate_partition_assignment()?;
-
- self.version += 1;
-
- Ok((self, msg))
- }
-
- pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.staging_roles.clear();
- self.staging_parameters.update(self.parameters);
- self.staging_hash = self.calculate_staging_hash();
-
- self.version += 1;
-
- Ok(self)
- }
-
- /// Returns a list of IDs of nodes that currently have
- /// a role in the cluster
- pub fn node_ids(&self) -> &[Uuid] {
+ /// Returns a list of IDs of nodes that have a role in this
+ /// version of the cluster layout, including gateway nodes
+ pub fn all_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
- pub fn num_nodes(&self) -> usize {
- self.node_id_vec.len()
+ /// Returns a list of IDs of nodes that have a storage capacity
+ /// assigned in this version of the cluster layout
+ pub fn nongateway_nodes(&self) -> &[Uuid] {
+ &self.node_id_vec[..self.nongateway_node_count]
}
- /// Returns the role of a node in the layout
+ /// Returns the role of a node in the layout, if it has one
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
@@ -377,41 +57,23 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
- fn nongateway_nodes(&self) -> Vec<Uuid> {
- let mut result = Vec::<Uuid>::new();
- for uuid in self.node_id_vec.iter() {
- match self.node_role(uuid) {
- Some(role) if role.capacity.is_some() => result.push(*uuid),
- _ => (),
- }
- }
- result
- }
-
- /// Given a node uuids, this function returns the label of its zone
- fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
- match self.node_role(uuid) {
- Some(role) => Ok(role.zone.clone()),
- _ => Err(Error::Message(
- "The Uuid does not correspond to a node present in the cluster.".into(),
- )),
- }
- }
-
- /// Given a node uuids, this function returns its capacity or fails if it does not have any
- pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
+ /// Returns the capacity of a node in the layout, if it has one
+ pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> {
match self.node_role(uuid) {
Some(NodeRole {
capacity: Some(cap),
zone: _,
tags: _,
- }) => Ok(*cap),
- _ => Err(Error::Message(
- "The Uuid does not correspond to a node present in the \
- cluster or this node does not have a positive capacity."
- .into(),
- )),
+ }) => Some(*cap),
+ _ => None,
+ }
+ }
+
+ /// Given a node uuids, this function returns the label of its zone if it has one
+ pub fn get_node_zone(&self, uuid: &Uuid) -> Option<&str> {
+ match self.node_role(uuid) {
+ Some(role) => Some(&role.zone),
+ _ => None,
}
}
@@ -435,17 +97,65 @@ To know the correct value of the new layout version, invoke `garage layout show`
))
}
+ /// Get the partition in which data would fall on
+ pub fn partition_of(&self, position: &Hash) -> Partition {
+ let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
+ top >> (16 - PARTITION_BITS)
+ }
+
+ /// Get the list of partitions and the first hash of a partition key that would fall in it
+ pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ {
+ (0..(1 << PARTITION_BITS)).map(|i| {
+ let top = (i as u16) << (16 - PARTITION_BITS);
+ let mut location = [0u8; 32];
+ location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
+ (i as u16, Hash::from(location))
+ })
+ }
+
+ /// Return the n servers in which data for this hash should be replicated
+ pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ {
+ assert_eq!(n, self.replication_factor);
+
+ let data = &self.ring_assignment_data;
+
+ let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {
+ let partition_idx = self.partition_of(position) as usize;
+ let partition_start = partition_idx * self.replication_factor;
+ let partition_end = (partition_idx + 1) * self.replication_factor;
+ &data[partition_start..partition_end]
+ } else {
+ warn!("Ring not yet ready, read/writes will be lost!");
+ &[]
+ };
+
+ partition_nodes
+ .iter()
+ .map(move |i| self.node_id_vec[*i as usize])
+ }
+
+ // ===================== internal information extractors ======================
+
+ pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {
+ self.get_node_capacity(uuid)
+ .expect("non-gateway node with zero capacity")
+ }
+
+ pub(crate) fn expect_get_node_zone(&self, uuid: &Uuid) -> &str {
+ self.get_node_zone(uuid).expect("node without a zone")
+ }
+
/// Returns the sum of capacities of non gateway nodes in the cluster
- fn get_total_capacity(&self) -> Result<u64, Error> {
+ fn get_total_capacity(&self) -> u64 {
let mut total_capacity = 0;
- for uuid in self.nongateway_nodes().iter() {
- total_capacity += self.get_node_capacity(uuid)?;
+ for uuid in self.nongateway_nodes() {
+ total_capacity += self.expect_get_node_capacity(uuid);
}
- Ok(total_capacity)
+ total_capacity
}
/// Returns the effective value of the zone_redundancy parameter
- fn effective_zone_redundancy(&self) -> usize {
+ pub(crate) fn effective_zone_redundancy(&self) -> usize {
match self.parameters.zone_redundancy {
ZoneRedundancy::AtLeast(v) => v,
ZoneRedundancy::Maximum => {
@@ -465,10 +175,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// (assignment, roles, parameters, partition size)
/// returns true if consistent, false if error
pub fn check(&self) -> Result<(), String> {
- // Check that the hash of the staging data is correct
- let staging_hash = self.calculate_staging_hash();
- if staging_hash != self.staging_hash {
- return Err("staging_hash is incorrect".into());
+ // Check that the assignment data has the correct length
+ let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor;
+ if self.ring_assignment_data.len() != expected_assignment_data_len {
+ return Err(format!(
+ "ring_assignment_data has incorrect length {} instead of {}",
+ self.ring_assignment_data.len(),
+ expected_assignment_data_len
+ ));
}
// Check that node_id_vec contains the correct list of nodes
@@ -486,16 +200,6 @@ To know the correct value of the new layout version, invoke `garage layout show`
return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes));
}
- // Check that the assignment data has the correct length
- let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor;
- if self.ring_assignment_data.len() != expected_assignment_data_len {
- return Err(format!(
- "ring_assignment_data has incorrect length {} instead of {}",
- self.ring_assignment_data.len(),
- expected_assignment_data_len
- ));
- }
-
// Check that the assigned nodes are correct identifiers
// of nodes that are assigned a role
// and that role is not the role of a gateway nodes
@@ -524,10 +228,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
// Check that every partition is spread over at least zone_redundancy zones.
let zones_of_p = nodes_of_p
.iter()
- .map(|n| {
- self.get_node_zone(&self.node_id_vec[*n as usize])
- .expect("Zone not found.")
- })
+ .map(|n| self.expect_get_node_zone(&self.node_id_vec[*n as usize]))
.collect::<Vec<_>>();
if zones_of_p.iter().unique().count() < zone_redundancy {
return Err(format!(
@@ -546,7 +247,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
if *usage > 0 {
let uuid = self.node_id_vec[n];
let partusage = usage * self.partition_size;
- let nodecap = self.get_node_capacity(&uuid).unwrap();
+ let nodecap = self.expect_get_node_capacity(&uuid);
if partusage > nodecap {
return Err(format!(
"node usage ({}) is bigger than node capacity ({})",
@@ -574,12 +275,24 @@ To know the correct value of the new layout version, invoke `garage layout show`
Ok(())
}
-}
-// ====================================================================================
+ // ================== updates to layout, internals ===================
+
+ pub(crate) fn calculate_next_version(
+ mut self,
+ staging: &LayoutStaging,
+ ) -> Result<(Self, Message), Error> {
+ self.version += 1;
+
+ self.roles.merge(&staging.roles);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+ self.parameters = *staging.parameters.get();
+
+ let msg = self.calculate_partition_assignment()?;
+
+ Ok((self, msg))
+ }
-// Implementation of the ClusterLayout methods related to the assignment algorithm.
-impl ClusterLayout {
/// This function calculates a new partition-to-node assignment.
/// The computed assignment respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a
@@ -609,12 +322,12 @@ impl ClusterLayout {
// to use them as indices in the flow graphs.
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
- let nb_nongateway_nodes = self.nongateway_nodes().len();
- if nb_nongateway_nodes < self.replication_factor {
+ if self.nongateway_nodes().len() < self.replication_factor {
return Err(Error::Message(format!(
"The number of nodes with positive \
capacity ({}) is smaller than the replication factor ({}).",
- nb_nongateway_nodes, self.replication_factor
+ self.nongateway_nodes().len(),
+ self.replication_factor
)));
}
if id_to_zone.len() < zone_redundancy {
@@ -712,12 +425,14 @@ impl ClusterLayout {
.map(|(k, _, _)| *k)
.collect();
- let mut new_node_id_vec = Vec::<Uuid>::new();
- new_node_id_vec.extend(new_non_gateway_nodes);
- new_node_id_vec.extend(new_gateway_nodes);
+ let old_node_id_vec = std::mem::take(&mut self.node_id_vec);
- let old_node_id_vec = self.node_id_vec.clone();
- self.node_id_vec = new_node_id_vec.clone();
+ self.nongateway_node_count = new_non_gateway_nodes.len();
+ self.node_id_vec.clear();
+ self.node_id_vec.extend(new_non_gateway_nodes);
+ self.node_id_vec.extend(new_gateway_nodes);
+
+ let new_node_id_vec = &self.node_id_vec;
// (2) We retrieve the old association
// We rewrite the old association with the new indices. We only consider partition
@@ -756,7 +471,7 @@ impl ClusterLayout {
}
}
- // We write the ring
+ // We clear the ring assignemnt data
self.ring_assignment_data = Vec::<CompactNodeType>::new();
Ok(Some(old_assignment))
@@ -764,7 +479,9 @@ impl ClusterLayout {
/// This function generates ids for the zone of the nodes appearing in
/// self.node_id_vec.
- fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
+ pub(crate) fn generate_nongateway_zone_ids(
+ &self,
+ ) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
@@ -797,7 +514,7 @@ impl ClusterLayout {
}
let mut s_down = 1;
- let mut s_up = self.get_total_capacity()?;
+ let mut s_up = self.get_total_capacity();
while s_down + 1 < s_up {
g = self.generate_flow_graph(
(s_down + s_up) / 2,
@@ -846,7 +563,7 @@ impl ClusterLayout {
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
let vertices =
- ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
+ LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS {
@@ -866,8 +583,8 @@ impl ClusterLayout {
}
}
for n in 0..self.nongateway_nodes().len() {
- let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
+ let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]);
+ let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[n])];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS {
if !exclude_assoc.contains(&(p, n)) {
@@ -913,7 +630,7 @@ impl ClusterLayout {
// The algorithm is such that it will start with the flow that we just computed
// and find ameliorating paths from that.
for (p, n) in exclude_edge.iter() {
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
}
g.compute_maximal_flow()?;
@@ -933,7 +650,7 @@ impl ClusterLayout {
let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
@@ -988,7 +705,7 @@ impl ClusterLayout {
let mut msg = Message::new();
let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
- let total_cap = self.get_total_capacity()?;
+ let total_cap = self.get_total_capacity();
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
msg.push(format!(
"Usable capacity / total cluster capacity: {} / {} ({:.1} %)",
@@ -1035,7 +752,7 @@ impl ClusterLayout {
let mut old_zones_of_p = Vec::<usize>::new();
for n in prev_assign[p].iter() {
old_zones_of_p
- .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
+ .push(zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]);
}
if !old_zones_of_p.contains(&z) {
new_partitions_zone[z] += 1;
@@ -1077,7 +794,7 @@ impl ClusterLayout {
for z in 0..id_to_zone.len() {
let mut nodes_of_z = Vec::<usize>::new();
for n in 0..storing_nodes.len() {
- if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] {
+ if self.expect_get_node_zone(&self.node_id_vec[n]) == id_to_zone[z] {
nodes_of_z.push(n);
}
}
@@ -1091,13 +808,13 @@ impl ClusterLayout {
let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
let mut total_cap_z = 0;
for n in nodes_of_z.iter() {
- total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?;
+ total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]);
}
let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
for n in nodes_of_z.iter() {
let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
- let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?;
+ let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]);
let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string();
table.push(format!(
" {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)",
@@ -1127,167 +844,3 @@ impl ClusterLayout {
Ok(msg)
}
}
-
-// ====================================================================================
-
-#[cfg(test)]
-mod tests {
- use super::{Error, *};
- use std::cmp::min;
-
- // This function checks that the partition size S computed is at least better than the
- // one given by a very naive algorithm. To do so, we try to run the naive algorithm
- // assuming a partion size of S+1. If we succed, it means that the optimal assignment
- // was not optimal. The naive algorithm is the following :
- // - we compute the max number of partitions associated to every node, capped at the
- // partition number. It gives the number of tokens of every node.
- // - every zone has a number of tokens equal to the sum of the tokens of its nodes.
- // - we cycle over the partitions and associate zone tokens while respecting the
- // zone redundancy constraint.
- // NOTE: the naive algorithm is not optimal. Counter example:
- // take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
- // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
- // With these parameters, the naive algo fails, whereas there is a solution:
- // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
- fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
- let over_size = cl.partition_size + 1;
- let mut zone_token = HashMap::<String, usize>::new();
-
- let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
-
- if zones.is_empty() {
- return Ok(false);
- }
-
- for z in zones.iter() {
- zone_token.insert(z.clone(), 0);
- }
- for uuid in cl.nongateway_nodes().iter() {
- let z = cl.get_node_zone(uuid)?;
- let c = cl.get_node_capacity(uuid)?;
- zone_token.insert(
- z.clone(),
- zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
- );
- }
-
- // For every partition, we count the number of zone already associated and
- // the name of the last zone associated
-
- let mut id_zone_token = vec![0; zones.len()];
- for (z, t) in zone_token.iter() {
- id_zone_token[zone_to_id[z]] = *t;
- }
-
- let mut nb_token = vec![0; NB_PARTITIONS];
- let mut last_zone = vec![zones.len(); NB_PARTITIONS];
-
- let mut curr_zone = 0;
-
- let redundancy = cl.effective_zone_redundancy();
-
- for replic in 0..cl.replication_factor {
- for p in 0..NB_PARTITIONS {
- while id_zone_token[curr_zone] == 0
- || (last_zone[p] == curr_zone
- && redundancy - nb_token[p] <= cl.replication_factor - replic)
- {
- curr_zone += 1;
- if curr_zone >= zones.len() {
- return Ok(true);
- }
- }
- id_zone_token[curr_zone] -= 1;
- if last_zone[p] != curr_zone {
- nb_token[p] += 1;
- last_zone[p] = curr_zone;
- }
- }
- }
-
- return Ok(false);
- }
-
- fn show_msg(msg: &Message) {
- for s in msg.iter() {
- println!("{}", s);
- }
- }
-
- fn update_layout(
- cl: &mut ClusterLayout,
- node_id_vec: &Vec<u8>,
- node_capacity_vec: &Vec<u64>,
- node_zone_vec: &Vec<String>,
- zone_redundancy: usize,
- ) {
- for i in 0..node_id_vec.len() {
- if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) {
- cl.node_id_vec.push(x);
- }
-
- let update = cl.staging_roles.update_mutator(
- cl.node_id_vec[i],
- NodeRoleV(Some(NodeRole {
- zone: (node_zone_vec[i].to_string()),
- capacity: (Some(node_capacity_vec[i])),
- tags: (vec![]),
- })),
- );
- cl.staging_roles.merge(&update);
- }
- cl.staging_parameters.update(LayoutParameters {
- zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy),
- });
- cl.staging_hash = cl.calculate_staging_hash();
- }
-
- #[test]
- fn test_assignment() {
- let mut node_id_vec = vec![1, 2, 3];
- let mut node_capacity_vec = vec![4000, 1000, 2000];
- let mut node_zone_vec = vec!["A", "B", "C"]
- .into_iter()
- .map(|x| x.to_string())
- .collect();
-
- let mut cl = ClusterLayout::new(3);
- update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
- let v = cl.version;
- let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
- show_msg(&msg);
- assert_eq!(cl.check(), Ok(()));
- assert!(matches!(check_against_naive(&cl), Ok(true)));
-
- node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
- node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
- node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]
- .into_iter()
- .map(|x| x.to_string())
- .collect();
- update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2);
- let v = cl.version;
- let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
- show_msg(&msg);
- assert_eq!(cl.check(), Ok(()));
- assert!(matches!(check_against_naive(&cl), Ok(true)));
-
- node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
- update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
- let v = cl.version;
- let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
- show_msg(&msg);
- assert_eq!(cl.check(), Ok(()));
- assert!(matches!(check_against_naive(&cl), Ok(true)));
-
- node_capacity_vec = vec![
- 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000,
- ];
- update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1);
- let v = cl.version;
- let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
- show_msg(&msg);
- assert_eq!(cl.check(), Ok(()));
- assert!(matches!(check_against_naive(&cl), Ok(true)));
- }
-}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index a5f8fc6e..b5b31c05 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -11,10 +11,8 @@ mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
-pub mod graph_algo;
pub mod layout;
pub mod replication_mode;
-pub mod ring;
pub mod system;
pub mod rpc_helper;
diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs
index e244e063..2f7e2fec 100644
--- a/src/rpc/replication_mode.rs
+++ b/src/rpc/replication_mode.rs
@@ -54,4 +54,11 @@ impl ReplicationMode {
Self::ThreeWayDangerous => 1,
}
}
+
+ pub fn is_read_after_write_consistent(&self) -> bool {
+ match self {
+ Self::None | Self::TwoWay | Self::ThreeWay => true,
+ _ => false,
+ }
+ }
}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
deleted file mode 100644
index 6a2e5c72..00000000
--- a/src/rpc/ring.rs
+++ /dev/null
@@ -1,164 +0,0 @@
-//! Module containing types related to computing nodes which should receive a copy of data blocks
-//! and metadata
-use std::convert::TryInto;
-
-use garage_util::data::*;
-
-use crate::layout::ClusterLayout;
-
-/// A partition id, which is stored on 16 bits
-/// i.e. we have up to 2**16 partitions.
-/// (in practice we have exactly 2**PARTITION_BITS partitions)
-pub type Partition = u16;
-
-// TODO: make this constant parametrizable in the config file
-// For deployments with many nodes it might make sense to bump
-// it up to 10.
-// Maximum value : 16
-/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
-/// presence of numerous nodes, but exponentially bigger ring. Max 16
-pub const PARTITION_BITS: usize = 8;
-
-const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
-
-/// A ring distributing fairly objects to nodes
-#[derive(Clone)]
-pub struct Ring {
- /// The replication factor for this ring
- pub replication_factor: usize,
-
- /// The network configuration used to generate this ring
- pub layout: ClusterLayout,
-
- // Internal order of nodes used to make a more compact representation of the ring
- nodes: Vec<Uuid>,
-
- // The list of entries in the ring
- ring: Vec<RingEntry>,
-}
-
-// Type to store compactly the id of a node in the system
-// Change this to u16 the day we want to have more than 256 nodes in a cluster
-pub type CompactNodeType = u8;
-pub const MAX_NODE_NUMBER: usize = 256;
-
-// The maximum number of times an object might get replicated
-// This must be at least 3 because Garage supports 3-way replication
-// Here we use 6 so that the size of a ring entry is 8 bytes
-// (2 bytes partition id, 6 bytes node numbers as u8s)
-const MAX_REPLICATION: usize = 6;
-
-/// An entry in the ring
-#[derive(Clone, Debug)]
-struct RingEntry {
- // The two first bytes of the first hash that goes in this partition
- // (the next bytes are zeroes)
- hash_prefix: u16,
- // The nodes that store this partition, stored as a list of positions in the `nodes`
- // field of the Ring structure
- // Only items 0 up to ring.replication_factor - 1 are used, others are zeros
- nodes_buf: [CompactNodeType; MAX_REPLICATION],
-}
-
-impl Ring {
- pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self {
- if replication_factor != layout.replication_factor {
- warn!("Could not build ring: replication factor does not match between local configuration and network role assignment.");
- return Self::empty(layout, replication_factor);
- }
-
- if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) {
- warn!("Could not build ring: network role assignment data has invalid length");
- return Self::empty(layout, replication_factor);
- }
-
- let nodes = layout.node_id_vec.clone();
- let ring = (0..(1 << PARTITION_BITS))
- .map(|i| {
- let top = (i as u16) << (16 - PARTITION_BITS);
- let mut nodes_buf = [0u8; MAX_REPLICATION];
- nodes_buf[..replication_factor].copy_from_slice(
- &layout.ring_assignment_data
- [replication_factor * i..replication_factor * (i + 1)],
- );
- RingEntry {
- hash_prefix: top,
- nodes_buf,
- }
- })
- .collect::<Vec<_>>();
-
- Self {
- replication_factor,
- layout,
- nodes,
- ring,
- }
- }
-
- fn empty(layout: ClusterLayout, replication_factor: usize) -> Self {
- Self {
- replication_factor,
- layout,
- nodes: vec![],
- ring: vec![],
- }
- }
-
- /// Get the partition in which data would fall on
- pub fn partition_of(&self, position: &Hash) -> Partition {
- let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
- top >> (16 - PARTITION_BITS)
- }
-
- /// Get the list of partitions and the first hash of a partition key that would fall in it
- pub fn partitions(&self) -> Vec<(Partition, Hash)> {
- let mut ret = vec![];
-
- for (i, entry) in self.ring.iter().enumerate() {
- let mut location = [0u8; 32];
- location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
- ret.push((i as u16, location.into()));
- }
- if !ret.is_empty() {
- assert_eq!(ret[0].1, [0u8; 32].into());
- }
-
- ret
- }
-
- /// Walk the ring to find the n servers in which data should be replicated
- pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
- if self.ring.len() != 1 << PARTITION_BITS {
- warn!("Ring not yet ready, read/writes will be lost!");
- return vec![];
- }
-
- let partition_idx = self.partition_of(position) as usize;
- let partition = &self.ring[partition_idx];
-
- let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
- // Check that we haven't messed up our partition table, i.e. that this partition
- // table entrey indeed corresponds to the item we are storing
- assert_eq!(
- partition.hash_prefix & PARTITION_MASK_U16,
- top & PARTITION_MASK_U16
- );
-
- assert!(n <= self.replication_factor);
- partition.nodes_buf[..n]
- .iter()
- .map(|i| self.nodes[*i as usize])
- .collect::<Vec<_>>()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_ring_entry_size() {
- assert_eq!(std::mem::size_of::<RingEntry>(), 8);
- }
-}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c46e577f..977c6ed8 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,12 +1,12 @@
//! Contain structs related to making RPCs
-use std::sync::Arc;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use tokio::select;
-use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -26,8 +26,8 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use crate::layout::{LayoutHelper, LayoutHistory};
use crate::metrics::RpcMetrics;
-use crate::ring::Ring;
// Default RPC timeout = 5 minutes
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
@@ -36,11 +36,11 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Copy, Clone)]
pub struct RequestStrategy {
/// Min number of response to consider the request successful
- pub rs_quorum: Option<usize>,
- /// Should requests be dropped after enough response are received
- pub rs_interrupt_after_quorum: bool,
+ rs_quorum: Option<usize>,
+ /// Send all requests at once
+ rs_send_all_at_once: Option<bool>,
/// Request priority
- pub rs_priority: RequestPriority,
+ rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
}
@@ -57,7 +57,7 @@ impl RequestStrategy {
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
rs_quorum: None,
- rs_interrupt_after_quorum: false,
+ rs_send_all_at_once: None,
rs_priority: prio,
rs_timeout: Timeout::Default,
}
@@ -67,10 +67,9 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set if requests can be dropped after quorum has been reached
- /// In general true for read requests, and false for write
- pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
- self.rs_interrupt_after_quorum = interrupt;
+ /// Set quorum to be reached for request
+ pub fn send_all_at_once(mut self, value: bool) -> Self {
+ self.rs_send_all_at_once = Some(value);
self
}
/// Deactivate timeout for this request
@@ -91,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
peering: Arc<PeeringManager>,
- ring: watch::Receiver<Arc<Ring>>,
+ layout: Arc<RwLock<LayoutHelper>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -100,7 +99,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
peering: Arc<PeeringManager>,
- ring: watch::Receiver<Arc<Ring>>,
+ layout: Arc<RwLock<LayoutHelper>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
@@ -108,7 +107,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
peering,
- ring,
+ layout,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
@@ -130,6 +129,12 @@ impl RpcHelper {
N: IntoReq<M> + Send,
H: StreamingEndpointHandler<M>,
{
+ let tracer = opentelemetry::global::tracer("garage");
+ let span_name = format!("RPC [{}] to {:?}", endpoint.path(), to);
+ let mut span = tracer.start(span_name);
+ span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
+ span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
+
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
@@ -141,6 +146,7 @@ impl RpcHelper {
let node_id = to.into();
let rpc_call = endpoint
.call_streaming(&node_id, msg, strat.rs_priority)
+ .with_context(Context::current_with_span(span))
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
let timeout = async {
@@ -183,12 +189,17 @@ impl RpcHelper {
N: IntoReq<M>,
H: StreamingEndpointHandler<M>,
{
+ let tracer = opentelemetry::global::tracer("garage");
+ let span_name = format!("RPC [{}] call_many {} nodes", endpoint.path(), to.len());
+ let span = tracer.start(span_name);
+
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let resps = join_all(
to.iter()
.map(|to| self.call(endpoint, *to, msg.clone(), strat)),
)
+ .with_context(Context::current_with_span(span))
.await;
Ok(to
.iter()
@@ -220,6 +231,22 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
+ ///
+ /// If RequestStrategy has send_all_at_once set, then all requests will be
+ /// sent at once, and `try_call_many` will return as soon as a quorum of
+ /// responses is achieved, dropping and cancelling the remaining requests.
+ ///
+ /// Otherwise, `quorum` requests will be sent at the same time, and if an
+ /// error response is received, a new request will be sent to replace it.
+ /// The ordering of nodes to which requests are sent is determined by
+ /// the `RpcHelper::request_order` function, which takes into account
+ /// parameters such as node zones and measured ping values.
+ ///
+ /// In both cases, the basic contract of this function is that even in the
+ /// absence of failures, the RPC call might not be driven to completion
+ /// on all of the specified nodes. It is therefore unfit for broadcast
+ /// write operations where we expect all nodes to successfully store
+ /// the written date.
pub async fn try_call_many<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
@@ -236,31 +263,24 @@ impl RpcHelper {
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
- let span_name = if strategy.rs_interrupt_after_quorum {
- format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len())
- } else {
- format!(
- "RPC {} to {} (quorum {})",
- endpoint.path(),
- to.len(),
- quorum
- )
- };
+ let span_name = format!(
+ "RPC [{}] try_call_many (quorum {}/{})",
+ endpoint.path(),
+ quorum,
+ to.len()
+ );
+
let mut span = tracer.start(span_name);
span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
- span.set_attribute(KeyValue::new(
- "interrupt_after_quorum",
- strategy.rs_interrupt_after_quorum.to_string(),
- ));
- self.try_call_many_internal(endpoint, to, msg, strategy, quorum)
+ self.try_call_many_inner(endpoint, to, msg, strategy, quorum)
.with_context(Context::current_with_span(span))
.await
}
- async fn try_call_many_internal<M, N, H, S>(
+ async fn try_call_many_inner<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
@@ -274,129 +294,238 @@ impl RpcHelper {
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
- let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
+ // Once quorum is reached, other requests don't matter.
+ // What we do here is only send the required number of requests
+ // to reach a quorum, priorizing nodes with the lowest latency.
+ // When there are errors, we start new requests to compensate.
+
+ // TODO: this could be made more aggressive, e.g. if after 2x the
+ // average ping of a given request, the response is not yet received,
+ // preemptively send an additional request to any remaining nodes.
+
+ // Reorder requests to priorize closeness / low latency
+ let request_order = self.request_order(&self.0.layout.read().unwrap(), to.iter().copied());
+ let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false);
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
// object that will take care of polling them (see below)
- let requests = to.iter().cloned().map(|to| {
+ let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
+ let mut requests = request_order.into_iter().map(|to| {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
- (to, async move {
- self2.call(&endpoint2, to, msg, strategy).await
- })
+ async move { self2.call(&endpoint2, to, msg, strategy).await }
});
// Vectors in which success results and errors will be collected
let mut successes = vec![];
let mut errors = vec![];
- if strategy.rs_interrupt_after_quorum {
- // Case 1: once quorum is reached, other requests don't matter.
- // What we do here is only send the required number of requests
- // to reach a quorum, priorizing nodes with the lowest latency.
- // When there are errors, we start new requests to compensate.
-
- // Reorder requests to priorize closeness / low latency
- let request_order = self.request_order(to);
- let mut ord_requests = vec![(); request_order.len()]
- .into_iter()
- .map(|_| None)
- .collect::<Vec<_>>();
- for (to, fut) in requests {
- let i = request_order.iter().position(|x| *x == to).unwrap();
- ord_requests[i] = Some((to, fut));
+ // resp_stream will contain all of the requests that are currently in flight.
+ // (for the moment none, they will be added in the loop below)
+ let mut resp_stream = FuturesUnordered::new();
+
+ // Do some requests and collect results
+ while successes.len() < quorum {
+ // If the current set of requests that are running is not enough to possibly
+ // reach quorum, start some new requests.
+ while send_all_at_once || successes.len() + resp_stream.len() < quorum {
+ if let Some(fut) = requests.next() {
+ resp_stream.push(fut)
+ } else {
+ break;
+ }
+ }
+
+ if successes.len() + resp_stream.len() < quorum {
+ // We know we won't ever reach quorum
+ break;
}
- // Make an iterator to take requests in their sorted order
- let mut requests = ord_requests.into_iter().map(Option::unwrap);
-
- // resp_stream will contain all of the requests that are currently in flight.
- // (for the moment none, they will be added in the loop below)
- let mut resp_stream = FuturesUnordered::new();
-
- // Do some requests and collect results
- 'request_loop: while successes.len() < quorum {
- // If the current set of requests that are running is not enough to possibly
- // reach quorum, start some new requests.
- while successes.len() + resp_stream.len() < quorum {
- if let Some((req_to, fut)) = requests.next() {
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer.start(format!("RPC to {:?}", req_to));
- resp_stream.push(tokio::spawn(
- fut.with_context(Context::current_with_span(span)),
- ));
- } else {
- // If we have no request to add, we know that we won't ever
- // reach quorum: bail out now.
- break 'request_loop;
- }
+ // Wait for one request to terminate
+ match resp_stream.next().await.unwrap() {
+ Ok(msg) => {
+ successes.push(msg);
}
- assert!(!resp_stream.is_empty()); // because of loop invariants
-
- // Wait for one request to terminate
- match resp_stream.next().await.unwrap().unwrap() {
- Ok(msg) => {
- successes.push(msg);
- }
- Err(e) => {
- errors.push(e);
- }
+ Err(e) => {
+ errors.push(e);
}
}
+ }
+
+ if successes.len() >= quorum {
+ Ok(successes)
} else {
- // Case 2: all of the requests need to be sent in all cases,
- // and need to terminate. (this is the case for writes that
- // must be spread to n nodes)
- // Just start all the requests in parallel and return as soon
- // as the quorum is reached.
- let mut resp_stream = requests
- .map(|(_, fut)| fut)
- .collect::<FuturesUnordered<_>>();
-
- while let Some(resp) = resp_stream.next().await {
- match resp {
- Ok(msg) => {
- successes.push(msg);
- if successes.len() >= quorum {
- break;
- }
- }
- Err(e) => {
- errors.push(e);
- }
- }
- }
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ Err(Error::Quorum(
+ quorum,
+ None,
+ successes.len(),
+ to.len(),
+ errors,
+ ))
+ }
+ }
+
+ /// Make a RPC call to multiple servers, returning either a Vec of responses,
+ /// or an error if quorum could not be reached due to too many errors
+ ///
+ /// Contrary to try_call_many, this fuction is especially made for broadcast
+ /// write operations. In particular:
+ ///
+ /// - The request are sent to all specified nodes as soon as `try_write_many_sets`
+ /// is invoked.
+ ///
+ /// - When `try_write_many_sets` returns, all remaining requests that haven't
+ /// completed move to a background task so that they have a chance to
+ /// complete successfully if there are no failures.
+ ///
+ /// In addition, the nodes to which requests should be sent are divided in
+ /// "quorum sets", and `try_write_many_sets` only returns once a quorum
+ /// has been validated in each set. This is used in the case of cluster layout
+ /// changes, where data has to be written both in the old layout and in the
+ /// new one as long as all nodes have not successfully tranisitionned and
+ /// moved all data to the new layout.
+ pub async fn try_write_many_sets<M, N, H, S>(
+ &self,
+ endpoint: &Arc<Endpoint<M, H>>,
+ to_sets: &[Vec<Uuid>],
+ msg: N,
+ strategy: RequestStrategy,
+ ) -> Result<Vec<S>, Error>
+ where
+ M: Rpc<Response = Result<S, Error>> + 'static,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M> + 'static,
+ S: Send + 'static,
+ {
+ let quorum = strategy
+ .rs_quorum
+ .expect("internal error: missing quorum value in try_write_many_sets");
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let span_name = format!(
+ "RPC [{}] try_write_many_sets (quorum {} in {} sets)",
+ endpoint.path(),
+ quorum,
+ to_sets.len()
+ );
+
+ let mut span = tracer.start(span_name);
+ span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
+ span.set_attribute(KeyValue::new("to", format!("{:?}", to_sets)));
+ span.set_attribute(KeyValue::new("quorum", quorum as i64));
+
+ self.try_write_many_sets_inner(endpoint, to_sets, msg, strategy, quorum)
+ .with_context(Context::current_with_span(span))
+ .await
+ }
+
+ async fn try_write_many_sets_inner<M, N, H, S>(
+ &self,
+ endpoint: &Arc<Endpoint<M, H>>,
+ to_sets: &[Vec<Uuid>],
+ msg: N,
+ strategy: RequestStrategy,
+ quorum: usize,
+ ) -> Result<Vec<S>, Error>
+ where
+ M: Rpc<Response = Result<S, Error>> + 'static,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M> + 'static,
+ S: Send + 'static,
+ {
+ // Peers may appear in many quorum sets. Here, build a list of peers,
+ // mapping to the index of the quorum sets in which they appear.
+ let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
+
+ // Send one request to each peer of the quorum sets
+ let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
+ let requests = result_tracker.nodes.keys().map(|peer| {
+ let self2 = self.clone();
+ let msg = msg.clone();
+ let endpoint2 = endpoint.clone();
+ let to = *peer;
+ async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }
+ });
+ let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
+
+ // Drive requests to completion
+ while let Some((node, resp)) = resp_stream.next().await {
+ // Store the response in the correct vector and increment the
+ // appropriate counters
+ result_tracker.register_result(node, resp);
- if !resp_stream.is_empty() {
- // Continue remaining requests in background.
- // Note: these requests can get interrupted on process shutdown,
- // we must not count on them being executed for certain.
- // For all background things that have to happen with certainty,
- // they have to be put in a proper queue that is persisted to disk.
+ // If we have a quorum of ok in all quorum sets, then it's a success!
+ if result_tracker.all_quorums_ok() {
+ // Continue all other requets in background
tokio::spawn(async move {
- resp_stream.collect::<Vec<Result<_, _>>>().await;
+ resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
});
+
+ return Ok(result_tracker.success_values());
+ }
+
+ // If there is a quorum set for which too many errors were received,
+ // we know it's impossible to get a quorum, so return immediately.
+ if result_tracker.too_many_failures() {
+ break;
}
}
- if successes.len() >= quorum {
- Ok(successes)
- } else {
- let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::Quorum(quorum, successes.len(), to.len(), errors))
+ // At this point, there is no quorum and we know that a quorum
+ // will never be achieved. Currently, we drop all remaining requests.
+ // Should we still move them to background so that they can continue
+ // for non-failed nodes? Not doing so has no impact on correctness,
+ // but it means that more cancellation messages will be sent. Idk.
+ // (When an in-progress request future is dropped, Netapp automatically
+ // sends a cancellation message to the remote node to inform it that
+ // the result is no longer needed. In turn, if the remote node receives
+ // the cancellation message in time, it interrupts the task of the
+ // running request handler.)
+
+ // Failure, could not get quorum
+ Err(result_tracker.quorum_error())
+ }
+
+ // ---- functions not related to MAKING RPCs, but just determining to what nodes
+ // they should be made and in which order ----
+
+ pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
+ let layout = self.0.layout.read().unwrap();
+
+ let mut ret = Vec::with_capacity(12);
+ let ver_iter = layout
+ .versions
+ .iter()
+ .rev()
+ .chain(layout.old_versions.iter().rev());
+ for ver in ver_iter {
+ if ver.version > layout.sync_map_min() {
+ continue;
+ }
+ let nodes = ver.nodes_of(position, ver.replication_factor);
+ for node in rpc_helper.request_order(&layout, nodes) {
+ if !ret.contains(&node) {
+ ret.push(node);
+ }
+ }
}
+ ret
}
- pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
+ fn request_order(
+ &self,
+ layout: &LayoutHistory,
+ nodes: impl Iterator<Item = Uuid>,
+ ) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.peering.get_peer_list();
- let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
+ let our_zone = layout
+ .current()
+ .get_node_zone(&self.0.our_node_id)
+ .unwrap_or("");
// Augment requests with some information used to sort them.
// The tuples are as follows:
@@ -405,22 +534,18 @@ impl RpcHelper {
// By sorting this vec, we priorize ourself, then nodes in the same zone,
// and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes
- .iter()
.map(|to| {
- let peer_zone = match ring.layout.node_role(to) {
- Some(pc) => &pc.zone,
- None => "",
- };
+ let peer_zone = layout.current().get_node_zone(&to).unwrap_or("");
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(10));
(
- *to != self.0.our_node_id,
+ to != self.0.our_node_id,
peer_zone != our_zone,
peer_avg_ping,
- *to,
+ to,
)
})
.collect::<Vec<_>>();
@@ -434,3 +559,108 @@ impl RpcHelper {
.collect::<Vec<_>>()
}
}
+
+// ------- utility for tracking successes/errors among write sets --------
+
+pub struct QuorumSetResultTracker<S, E> {
+ /// The set of nodes and the index of the quorum sets they belong to
+ pub nodes: HashMap<Uuid, Vec<usize>>,
+ /// The quorum value, i.e. number of success responses to await in each set
+ pub quorum: usize,
+
+ /// The success responses received
+ pub successes: Vec<(Uuid, S)>,
+ /// The error responses received
+ pub failures: Vec<(Uuid, E)>,
+
+ /// The counters for successes in each set
+ pub success_counters: Box<[usize]>,
+ /// The counters for failures in each set
+ pub failure_counters: Box<[usize]>,
+ /// The total number of nodes in each set
+ pub set_lens: Box<[usize]>,
+}
+
+impl<S, E> QuorumSetResultTracker<S, E>
+where
+ E: std::fmt::Display,
+{
+ pub fn new<A>(sets: &[A], quorum: usize) -> Self
+ where
+ A: AsRef<[Uuid]>,
+ {
+ let mut nodes = HashMap::<Uuid, Vec<usize>>::new();
+ for (i, set) in sets.iter().enumerate() {
+ for node in set.as_ref().iter() {
+ nodes.entry(*node).or_default().push(i);
+ }
+ }
+
+ let num_nodes = nodes.len();
+ Self {
+ nodes,
+ quorum,
+ successes: Vec::with_capacity(num_nodes),
+ failures: vec![],
+ success_counters: vec![0; sets.len()].into_boxed_slice(),
+ failure_counters: vec![0; sets.len()].into_boxed_slice(),
+ set_lens: sets
+ .iter()
+ .map(|x| x.as_ref().len())
+ .collect::<Vec<_>>()
+ .into_boxed_slice(),
+ }
+ }
+
+ pub fn register_result(&mut self, node: Uuid, result: Result<S, E>) {
+ match result {
+ Ok(s) => {
+ self.successes.push((node, s));
+ for set in self.nodes.get(&node).unwrap().iter() {
+ self.success_counters[*set] += 1;
+ }
+ }
+ Err(e) => {
+ self.failures.push((node, e));
+ for set in self.nodes.get(&node).unwrap().iter() {
+ self.failure_counters[*set] += 1;
+ }
+ }
+ }
+ }
+
+ pub fn all_quorums_ok(&self) -> bool {
+ self.success_counters
+ .iter()
+ .all(|ok_cnt| *ok_cnt >= self.quorum)
+ }
+
+ pub fn too_many_failures(&self) -> bool {
+ self.failure_counters
+ .iter()
+ .zip(self.set_lens.iter())
+ .any(|(err_cnt, set_len)| *err_cnt + self.quorum > *set_len)
+ }
+
+ pub fn success_values(self) -> Vec<S> {
+ self.successes
+ .into_iter()
+ .map(|(_, x)| x)
+ .collect::<Vec<_>>()
+ }
+
+ pub fn quorum_error(self) -> Error {
+ let errors = self
+ .failures
+ .iter()
+ .map(|(n, e)| format!("{:?}: {}", n, e))
+ .collect::<Vec<_>>();
+ Error::Quorum(
+ self.quorum,
+ Some(self.set_lens.len()),
+ self.successes.len(),
+ self.nodes.len(),
+ errors,
+ )
+ }
+}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index de44e656..21156d15 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,10 +1,10 @@
//! Module containing structs related to membership management
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
@@ -13,8 +13,7 @@ use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
-use tokio::sync::watch;
-use tokio::sync::Mutex;
+use tokio::sync::{watch, Notify};
use garage_net::endpoint::{Endpoint, EndpointHandler};
use garage_net::message::*;
@@ -34,9 +33,10 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
-use crate::layout::*;
+use crate::layout::{
+ self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest,
+};
use crate::replication_mode::*;
-use crate::ring::*;
use crate::rpc_helper::*;
use crate::system_metrics::*;
@@ -47,10 +47,10 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
/// incompatible and will refuse to connect.
-pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
+pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A
/// RPC endpoint used for calls related to membership
-pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -59,17 +59,22 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID)
Connect(String),
- /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
- PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(ClusterLayout),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(LayoutHistory),
+ /// Ask other node its cluster layout update trackers.
+ PullClusterLayoutTrackers,
+ /// Advertisement of cluster layout update trackers.
+ AdvertiseClusterLayoutTrackers(layout::UpdateTrackers),
}
impl Rpc for SystemRpc {
@@ -85,7 +90,6 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -93,9 +97,8 @@ pub struct System {
pub netapp: Arc<NetApp>,
peering: Arc<PeeringManager>,
- pub rpc: RpcHelper,
- system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -107,15 +110,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ pub layout_manager: Arc<LayoutManager>,
+
metrics: SystemMetrics,
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
- update_ring: Mutex<watch::Sender<Arc<Ring>>>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
@@ -125,14 +126,13 @@ pub struct System {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
- pub hostname: String,
+ pub hostname: Option<String>,
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Cluster layout version
- pub cluster_layout_version: u64,
- /// Hash of cluster layout staging data
- pub cluster_layout_staging_hash: Hash,
+
+ /// Cluster layout digest
+ pub layout_digest: RpcLayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)]
@@ -248,8 +248,7 @@ impl System {
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
+ // ---- setup netapp RPC protocol ----
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -257,82 +256,40 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<ClusterLayout> =
- Persister::new(&config.metadata_dir, "cluster_layout");
- let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
-
- let cluster_layout = match persist_cluster_layout.load() {
- Ok(x) => {
- if x.replication_factor != replication_factor {
- return Err(Error::Message(format!(
- "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.replication_factor,
- replication_factor
- )));
- }
- x
- }
- Err(e) => {
- info!(
- "No valid previous cluster layout stored ({}), starting fresh.",
- e
- );
- ClusterLayout::new(replication_factor)
- }
- };
-
- let metrics = SystemMetrics::new(replication_factor);
-
- let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
- local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let ring = Ring::new(cluster_layout, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
-
- let rpc_public_addr = match &config.rpc_public_addr {
- Some(a_str) => {
- use std::net::ToSocketAddrs;
- match a_str.to_socket_addrs() {
- Err(e) => {
- error!(
- "Cannot resolve rpc_public_addr {} from config file: {}.",
- a_str, e
- );
- None
- }
- Ok(a) => {
- let a = a.collect::<Vec<_>>();
- if a.is_empty() {
- error!("rpc_public_addr {} resolve to no known IP address", a_str);
- }
- if a.len() > 1 {
- warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
- }
- a.into_iter().next()
- }
- }
- }
- None => {
- let addr =
- get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
- if let Some(a) = addr {
- warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
- }
- addr
- }
- };
+ // ---- setup netapp public listener and full mesh peering strategy ----
+ let rpc_public_addr = get_rpc_public_addr(config);
if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
peering.set_ping_timeout_millis(ping_timeout);
}
- let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
+
+ // ---- setup cluster layout and layout manager ----
+ let replication_factor = replication_mode.replication_factor();
+
+ let layout_manager = LayoutManager::new(
+ config,
+ netapp.id,
+ system_endpoint.clone(),
+ peering.clone(),
+ replication_mode,
+ )?;
+
+ // ---- set up metrics and status exchange ----
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
@@ -351,20 +308,14 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ // ---- done ----
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
peering: peering.clone(),
- rpc: RpcHelper::new(
- netapp.id.into(),
- peering,
- ring.clone(),
- config.rpc_timeout_msec.map(Duration::from_millis),
- ),
system_endpoint,
replication_mode,
replication_factor,
@@ -376,10 +327,9 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ layout_manager,
metrics,
- ring,
- update_ring: Mutex::new(update_ring),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -399,6 +349,20 @@ impl System {
);
}
+ // ---- Public utilities / accessors ----
+
+ pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
+ self.layout_manager.layout()
+ }
+
+ pub fn layout_notify(&self) -> Arc<Notify> {
+ self.layout_manager.change_notify.clone()
+ }
+
+ pub fn rpc_helper(&self) -> &RpcHelper {
+ &self.layout_manager.rpc_helper
+ }
+
// ---- Administrative operations (directly available and
// also available through RPC) ----
@@ -425,18 +389,6 @@ impl System {
known_nodes
}
- pub fn get_cluster_layout(&self) -> ClusterLayout {
- self.ring.borrow().layout.clone()
- }
-
- pub async fn update_cluster_layout(
- self: &Arc<Self>,
- layout: &ClusterLayout,
- ) -> Result<(), Error> {
- self.handle_advertise_cluster_layout(layout).await?;
- Ok(())
- }
-
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await
@@ -466,47 +418,63 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let ring: Arc<_> = self.ring.borrow().clone();
let quorum = self.replication_mode.write_quorum();
- let replication_factor = self.replication_factor;
+ // Gather information about running nodes.
+ // Technically, `nodes` contains currently running nodes, as well
+ // as nodes that this Garage process has been connected to at least
+ // once since it started.
let nodes = self
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false);
+
+ // Acquire a rwlock read-lock to the current cluster layout
+ let layout = self.cluster_layout();
+
+ // Obtain information about nodes that have a role as storage nodes
+ // in one of the active layout versions
+ let mut storage_nodes = HashSet::<Uuid>::with_capacity(16);
+ for ver in layout.versions.iter() {
+ storage_nodes.extend(
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .map(|(n, _, _)| *n),
+ )
+ }
+ let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
+
+ // Determine the number of partitions that have:
+ // - a quorum of up nodes for all write sets (i.e. are available)
+ // - for which all nodes in all write sets are up (i.e. are fully healthy)
+ let partitions = layout.current().partitions().collect::<Vec<_>>();
+ let mut partitions_quorum = 0;
+ let mut partitions_all_ok = 0;
+ for (_, hash) in partitions.iter() {
+ let mut write_sets = layout
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(hash, x.replication_factor));
+ let has_quorum = write_sets
+ .clone()
+ .all(|set| set.filter(|x| node_up(x)).count() >= quorum);
+ let all_ok = write_sets.all(|mut set| set.all(|x| node_up(&x)));
+ if has_quorum {
+ partitions_quorum += 1;
+ }
+ if all_ok {
+ partitions_all_ok += 1;
+ }
+ }
- let storage_nodes = ring
- .layout
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
- .collect::<Vec<_>>();
- let storage_nodes_ok = storage_nodes
- .iter()
- .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count();
-
- let partitions = ring.partitions();
- let partitions_n_up = partitions
- .iter()
- .map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count()
- })
- .collect::<Vec<usize>>();
- let partitions_all_ok = partitions_n_up
- .iter()
- .filter(|c| **c == replication_factor)
- .count();
- let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
-
+ // Determine overall cluster status
let status =
- if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ if partitions_all_ok == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy
} else if partitions_quorum == partitions.len() {
ClusterHealthStatus::Degraded
@@ -546,7 +514,7 @@ impl System {
if let Err(e) = c
.publish_consul_service(
self.netapp.id,
- &self.local_status.load_full().hostname,
+ &self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await
@@ -573,7 +541,7 @@ impl System {
if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
- &self.local_status.load_full().hostname,
+ &self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await
@@ -582,22 +550,10 @@ impl System {
}
}
- /// Save network configuration to disc
- async fn save_cluster_layout(&self) -> Result<(), Error> {
- let ring: Arc<Ring> = self.ring.borrow().clone();
- self.persist_cluster_layout
- .save_async(&ring.layout)
- .await
- .expect("Cannot save current cluster layout");
- Ok(())
- }
-
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let ring = self.ring.borrow();
- new_si.cluster_layout_version = ring.layout.version;
- new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+ new_si.layout_digest = self.layout_manager.layout().digest();
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -611,11 +567,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
- }
-
fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
@@ -635,11 +586,8 @@ impl System {
std::process::exit(1);
}
- if info.cluster_layout_version > local_info.cluster_layout_version
- || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
- {
- tokio::spawn(self.clone().pull_cluster_layout(from));
- }
+ self.layout_manager
+ .handle_advertise_status(from, &info.layout_digest);
self.node_status
.write()
@@ -649,57 +597,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_cluster_layout(
- self: &Arc<Self>,
- adv: &ClusterLayout,
- ) -> Result<SystemRpc, Error> {
- if adv.replication_factor != self.replication_factor {
- let msg = format!(
- "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.replication_factor,
- self.replication_factor
- );
- error!("{}", msg);
- return Err(Error::Message(msg));
- }
-
- let update_ring = self.update_ring.lock().await;
- let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- let ring = Ring::new(layout.clone(), self.replication_factor);
- update_ring.send(Arc::new(ring))?;
- drop(update_ring);
-
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Err(e) = self2
- .rpc
- .broadcast(
- &self2.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- });
-
- self.save_cluster_layout().await?;
- }
-
- Ok(SystemRpc::Ok)
- }
-
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@@ -707,7 +604,7 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let _ = self
- .rpc
+ .rpc_helper()
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
@@ -725,9 +622,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().layout.check().is_err();
+ let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.peering.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.ring.borrow().layout.num_nodes();
+ let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self
.peering
.get_peer_list()
@@ -832,48 +729,49 @@ impl System {
.save_async(&PeerList(peer_list))
.await
}
-
- async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc
- .call(
- &self.system_endpoint,
- peer,
- SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await;
- if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
- }
- }
}
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
+ // ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- layout functions -> LayoutManager ----
+ SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
- self.clone().handle_advertise_cluster_layout(adv).await
+ self.layout_manager
+ .handle_advertise_cluster_layout(adv)
+ .await
}
- SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+ SystemRpc::PullClusterLayoutTrackers => {
+ Ok(self.layout_manager.handle_pull_cluster_layout_trackers())
+ }
+ SystemRpc::AdvertiseClusterLayoutTrackers(adv) => {
+ self.layout_manager
+ .handle_advertise_cluster_layout_trackers(adv)
+ .await
+ }
+
+ // ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),
}
}
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ hostname: Some(
+ gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ ),
replication_factor,
- cluster_layout_version: layout.version,
- cluster_layout_staging_hash: layout.staging_hash,
+ layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -881,10 +779,9 @@ impl NodeStatus {
fn unknown() -> Self {
NodeStatus {
- hostname: "?".to_string(),
+ hostname: None,
replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ layout_digest: Default::default(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -963,6 +860,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
+ match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
+ None => {
+ let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ }
+}
+
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![];
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 4f2aed7a..cac17da6 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/table/data.rs b/src/table/data.rs
index bbfdf58b..7f6b7847 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -254,7 +254,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// of the GC algorithm, as in all cases GC is suspended if
// any node of the partition is unavailable.
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
- let nodes = self.replication.write_nodes(&pk_hash);
+ // TODO: this probably breaks when the layout changes
+ let nodes = self.replication.storage_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 5b9124a7..ef788749 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -152,7 +152,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let mut partitions = HashMap::new();
for entry in entries {
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
- let mut nodes = self.data.replication.write_nodes(&pkh);
+ let mut nodes = self.data.replication.storage_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
@@ -227,10 +227,10 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC'ing is not a critical function of the system, so it's not a big
// deal if we can't do it right now.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
GcRpc::Update(updates),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
@@ -248,10 +248,10 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// it means that the garbage collection wasn't completed and has
// to be retried later.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
GcRpc::DeleteIfEqualHash(deletes),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 4577f872..01271c58 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -13,7 +13,7 @@ use garage_util::data::*;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use crate::data::*;
use crate::replication::*;
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 18682ace..30122f39 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,15 +1,22 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@@ -19,6 +26,13 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -26,12 +40,11 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.layout.node_ids().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().layout.node_ids().len();
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
@@ -45,7 +58,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index f00815a2..78470f35 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,25 +1,44 @@
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
+ type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
+
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_sets: Vec<Vec<Uuid>>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1cf964af..8ba3700f 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -25,17 +25,21 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -45,9 +49,38 @@ impl TableReplication for TableShardedReplication {
}
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.ring.borrow().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.ring.borrow().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.ack_map_min();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let storage_sets = layout.storage_sets_of(&first_hash);
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_sets,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 92a353c6..cd080df0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,18 +6,19 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
-use rand::Rng;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, Notify};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -52,16 +53,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-#[derive(Debug, Clone)]
-struct TodoPartition {
- partition: Partition,
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@@ -91,10 +82,10 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
- ring_recv: self.system.ring.clone(),
- ring: self.system.ring.borrow().clone(),
+ layout_notify: self.system.layout_notify(),
+ layout_digest: self.system.cluster_layout().sync_digest(),
add_full_sync_rx,
- todo: vec![],
+ todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@@ -112,53 +103,56 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
- partition: &TodoPartition,
+ partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.system.id;
-
- let nodes = self
- .data
- .replication
- .write_nodes(&partition.begin)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ let my_id = self.system.id;
+ let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id));
+ if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- nodes
+ partition.storage_sets
);
- let mut sync_futures = nodes
- .iter()
+ let mut result_tracker = QuorumSetResultTracker::new(
+ &partition.storage_sets,
+ self.data.replication.write_quorum(),
+ );
+
+ let mut sync_futures = result_tracker
+ .nodes
+ .keys()
+ .copied()
.map(|node| {
- self.clone()
- .do_sync_with(partition.clone(), *node, must_exit.clone())
+ let must_exit = must_exit.clone();
+ async move {
+ if node == my_id {
+ (node, Ok(()))
+ } else {
+ (node, self.do_sync_with(partition, node, must_exit).await)
+ }
+ }
})
.collect::<FuturesUnordered<_>>();
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", F::TABLE_NAME, e);
+ while let Some((node, res)) = sync_futures.next().await {
+ if let Err(e) = &res {
+ warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e);
}
+ result_tracker.register_result(node, res);
}
- if n_errors > self.data.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
+
+ if result_tracker.too_many_failures() {
+ Err(result_tracker.quorum_error())
+ } else {
+ Ok(())
}
} else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
+ self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
+ .await
}
-
- Ok(())
}
// Offload partition: this partition is not something we are storing,
@@ -188,12 +182,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
if !items.is_empty() {
- let nodes = self
- .data
- .replication
- .write_nodes(begin)
- .into_iter()
- .collect::<Vec<_>>();
+ let nodes = self.data.replication.storage_nodes(begin);
if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
@@ -217,7 +206,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
end,
counter
);
- self.offload_items(&items, &nodes[..]).await?;
+ self.offload_items(&items, &nodes).await?;
} else {
break;
}
@@ -244,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
nodes,
@@ -284,8 +273,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
+ self: &Arc<Self>,
+ partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -305,7 +294,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// If so, do nothing.
let root_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -361,7 +350,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// and compare it with local node
let remote_node = match self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -437,7 +426,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let rpc_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -492,75 +481,41 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
- ring_recv: watch::Receiver<Arc<Ring>>,
- ring: Arc<Ring>,
+
+ layout_notify: Arc<Notify>,
+ layout_digest: SyncLayoutDigest,
+
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
- todo: Vec<TodoPartition>,
next_full_sync: Instant,
+
+ todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
- fn add_full_sync(&mut self) {
- let system = &self.syncer.system;
- let data = &self.syncer.data;
-
- let my_id = system.id;
-
- self.todo.clear();
-
- let partitions = data.replication.partitions();
-
- for i in 0..partitions.len() {
- let begin = partitions[i].1;
-
- let end = if i + 1 < partitions.len() {
- partitions[i + 1].1
- } else {
- [0xFFu8; 32].into()
- };
-
- let nodes = data.replication.write_nodes(&begin);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- match data.store.range(begin..end) {
- Ok(mut iter) => {
- if iter.next().is_none() {
- continue;
- }
- }
- Err(e) => {
- warn!("DB error in add_full_sync: {}", e);
- continue;
- }
- }
- }
-
- self.todo.push(TodoPartition {
- partition: partitions[i].0,
- begin,
- end,
- retain,
- });
+ fn check_add_full_sync(&mut self) {
+ let layout_digest = self.syncer.system.cluster_layout().sync_digest();
+ if layout_digest != self.layout_digest {
+ self.layout_digest = layout_digest;
+ info!(
+ "({}) Layout versions changed ({:?}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_digest,
+ );
+ self.add_full_sync();
}
-
- self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
+ fn add_full_sync(&mut self) {
+ let mut partitions = self.syncer.data.replication.sync_partitions();
+ info!(
+ "{}: Adding full sync for ack layout version {}",
+ F::TABLE_NAME,
+ partitions.layout_version
+ );
- let i = rand::thread_rng().gen_range(0..self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
+ partitions.partitions.shuffle(&mut thread_rng());
+ self.todo = Some(partitions);
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
}
@@ -572,14 +527,48 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
- queue_length: Some(self.todo.len() as u64),
+ queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if let Some(partition) = self.pop_task() {
- self.syncer.sync_partition(&partition, must_exit).await?;
+ self.check_add_full_sync();
+
+ if let Some(todo) = &mut self.todo {
+ let partition = todo.partitions.pop().unwrap();
+
+ // process partition
+ if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
+ error!(
+ "{}: Failed to sync partition {:?}: {}",
+ F::TABLE_NAME,
+ partition,
+ e
+ );
+ // if error, put partition back at the other side of the queue,
+ // so that other partitions will be tried in the meantime
+ todo.partitions.insert(0, partition);
+ // TODO: returning an error here will cause the background job worker
+ // to delay this task for some time, but maybe we don't want to
+ // delay it if there are lots of failures from nodes that are gone
+ // (we also don't want zero delays as that will cause lots of useless retries)
+ return Err(e);
+ }
+
+ if todo.partitions.is_empty() {
+ info!(
+ "{}: Completed full sync for ack layout version {}",
+ F::TABLE_NAME,
+ todo.layout_version
+ );
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ self.todo = None;
+ }
+
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Idle)
@@ -593,22 +582,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
},
- _ = self.ring_recv.changed() => {
- let new_ring = self.ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &self.ring) {
- self.ring = new_ring.clone();
- drop(new_ring);
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
+ _ = self.layout_notify.notified() => {
+ self.check_add_full_sync();
},
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
self.add_full_sync();
}
}
- match self.todo.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Idle,
+ match self.todo.is_some() {
+ true => WorkerState::Busy,
+ false => WorkerState::Idle,
}
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 7ad79677..a5be2910 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -20,6 +20,7 @@ use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -80,6 +81,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
+ system.layout_manager.add_table(F::TABLE_NAME);
+
let table = Arc::new(Self {
system,
data,
@@ -117,16 +120,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let who = self.data.replication.write_sets(&hash);
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
@@ -141,7 +144,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e)
}
- pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
+ pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@@ -159,51 +162,123 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
- async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
+ async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
{
- let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
-
+ // The different items will have to be stored on possibly different nodes.
+ // We will here batch all items into a single request for each concerned
+ // node, with all of the entries it must store within that request.
+ // Each entry has to be saved to a specific list of "write sets", i.e. a set
+ // of node within wich a quorum must be achieved. In normal operation, there
+ // is a single write set which corresponds to the quorum in the current
+ // cluster layout, but when the layout is updated, multiple write sets might
+ // have to be handled at once. Here, since we are sending many entries, we
+ // will have to handle many write sets in all cases. The algorihtm is thus
+ // to send one request to each node with all the items it must save,
+ // and keep track of the OK responses within each write set: if for all sets
+ // a quorum of nodes has answered OK, then the insert has succeeded and
+ // consistency properties (read-after-write) are preserved.
+
+ let quorum = self.data.replication.write_quorum();
+
+ // Serialize all entries and compute the write sets for each of them.
+ // In the case of sharded table replication, this also takes an "ack lock"
+ // to the layout manager to avoid ack'ing newer versions which are not
+ // taken into account by writes in progress (the ack can happen later, once
+ // all writes that didn't take the new layout into account are finished).
+ // These locks are released when entries_vec is dropped, i.e. when this
+ // function returns.
+ let mut entries_vec = Vec::new();
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let mut write_sets = self.data.replication.write_sets(&hash);
+ for set in write_sets.as_mut().iter_mut() {
+ // Sort nodes in each write sets to merge write sets with same
+ // nodes but in possibly different orders
+ set.sort();
+ }
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
- for node in who {
- call_list.entry(node).or_default().push(e_enc.clone());
+ entries_vec.push((write_sets, e_enc));
+ }
+
+ // Compute a deduplicated list of all of the write sets,
+ // and compute an index from each node to the position of the sets in which
+ // it takes part, to optimize the detection of a quorum.
+ let mut write_sets = entries_vec
+ .iter()
+ .flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
+ .collect::<Vec<&[Uuid]>>();
+ write_sets.sort();
+ write_sets.dedup();
+
+ let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);
+
+ // Build a map of all nodes to the entries that must be sent to that node.
+ let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
+ for (write_sets, entry_enc) in entries_vec.iter() {
+ for write_set in write_sets.as_ref().iter() {
+ for node in write_set.iter() {
+ let node_entries = call_list.entry(*node).or_default();
+ match node_entries.last() {
+ Some(x) if Arc::ptr_eq(x, entry_enc) => {
+ // skip if entry already in list to send to this node
+ // (could happen if node is in several write sets for this entry)
+ }
+ _ => {
+ node_entries.push(entry_enc.clone());
+ }
+ }
+ }
}
}
- let call_futures = call_list.drain().map(|(node, entries)| async move {
- let rpc = TableRpc::<F>::Update(entries);
-
- let resp = self
- .system
- .rpc
- .call(
- &self.endpoint,
- node,
- rpc,
- RequestStrategy::with_priority(PRIO_NORMAL),
- )
- .await?;
- Ok::<_, Error>((node, resp))
+ // Build futures to actually perform each of the corresponding RPC calls
+ let call_futures = call_list.into_iter().map(|(node, entries)| {
+ let this = self.clone();
+ async move {
+ let rpc = TableRpc::<F>::Update(entries);
+ let resp = this
+ .system
+ .rpc_helper()
+ .call(
+ &this.endpoint,
+ node,
+ rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
+ )
+ .await;
+ (node, resp)
+ }
});
+
+ // Run all requests in parallel thanks to FuturesUnordered, and collect results.
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
- let mut errors = vec![];
- while let Some(resp) = resps.next().await {
- if let Err(e) = resp {
- errors.push(e);
+ while let Some((node, resp)) = resps.next().await {
+ result_tracker.register_result(node, resp.map(|_| ()));
+
+ if result_tracker.all_quorums_ok() {
+ // Success
+
+ // Continue all other requests in background
+ tokio::spawn(async move {
+ resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
+ });
+
+ return Ok(());
+ }
+
+ if result_tracker.too_many_failures() {
+ // Too many errors in this set, we know we won't get a quorum
+ break;
}
}
- if errors.len() > self.data.replication.max_write_errors() {
- Err(Error::Message("Too many errors".into()))
- } else {
- Ok(())
- }
+
+ // Failure, could not get quorum within at least one set
+ Err(result_tracker.quorum_error())
}
pub async fn get(
@@ -236,14 +311,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -332,14 +406,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -411,7 +484,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
who,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index b4d8477c..e4c31460 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/util/error.rs b/src/util/error.rs
index e73d88ba..da9eda10 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -55,13 +55,14 @@ pub enum Error {
Timeout,
#[error(
- display = "Could not reach quorum of {}. {} of {} request succeeded, others returned errors: {:?}",
+ display = "Could not reach quorum of {} (sets={:?}). {} of {} request succeeded, others returned errors: {:?}",
_0,
_1,
_2,
- _3
+ _3,
+ _4
)]
- Quorum(usize, usize, usize, Vec<String>),
+ Quorum(usize, Option<usize>, usize, usize, Vec<String>),
#[error(display = "Unexpected RPC message: {}", _0)]
UnexpectedRpcMessage(String),
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 3add5200..49549c9b 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_web"
-version = "0.9.1"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018"
license = "AGPL-3.0"