aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml9
-rw-r--r--src/garage/Cargo.toml15
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/cmd.rs214
-rw-r--r--src/garage/cli/layout.rs340
-rw-r--r--src/garage/cli/mod.rs2
-rw-r--r--src/garage/cli/structs.rs78
-rw-r--r--src/garage/main.rs4
-rw-r--r--src/model/Cargo.toml9
-rw-r--r--src/rpc/Cargo.toml6
-rw-r--r--src/rpc/layout.rs579
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/ring.rs197
-rw-r--r--src/rpc/rpc_helper.rs4
-rw-r--r--src/rpc/system.rs100
-rw-r--r--src/table/Cargo.toml7
-rw-r--r--src/table/lib.rs5
-rw-r--r--src/table/replication/fullcopy.rs4
-rw-r--r--src/util/Cargo.toml3
-rw-r--r--src/util/crdt/bool.rs (renamed from src/table/crdt/bool.rs)0
-rw-r--r--src/util/crdt/crdt.rs (renamed from src/table/crdt/crdt.rs)2
-rw-r--r--src/util/crdt/lww.rs (renamed from src/table/crdt/lww.rs)18
-rw-r--r--src/util/crdt/lww_map.rs (renamed from src/table/crdt/lww_map.rs)18
-rw-r--r--src/util/crdt/map.rs (renamed from src/table/crdt/map.rs)0
-rw-r--r--src/util/crdt/mod.rs (renamed from src/table/crdt/mod.rs)0
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/web/Cargo.toml11
27 files changed, 1193 insertions, 438 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index f06c67e4..3ca46764 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_api"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "S3 API server crate for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
@@ -13,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.4.0", path = "../model" }
-garage_table = { version = "0.4.0", path = "../table" }
-garage_util = { version = "0.4.0", path = "../util" }
+garage_model = { version = "0.5.0", path = "../model" }
+garage_table = { version = "0.5.0", path = "../table" }
+garage_util = { version = "0.5.0", path = "../util" }
base64 = "0.13"
bytes = "1.0"
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 0a3bc537..74a6ab0e 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Garage, an S3-compatible distributed object store for self-hosted deployments"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[[bin]]
name = "garage"
@@ -14,12 +15,12 @@ path = "main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.4.0", path = "../api" }
-garage_model = { version = "0.4.0", path = "../model" }
-garage_rpc = { version = "0.4.0", path = "../rpc" }
-garage_table = { version = "0.4.0", path = "../table" }
-garage_util = { version = "0.4.0", path = "../util" }
-garage_web = { version = "0.4.0", path = "../web" }
+garage_api = { version = "0.5.0", path = "../api" }
+garage_model = { version = "0.5.0", path = "../model" }
+garage_rpc = { version = "0.5.0", path = "../rpc" }
+garage_table = { version = "0.5.0", path = "../table" }
+garage_util = { version = "0.5.0", path = "../util" }
+garage_web = { version = "0.5.0", path = "../web" }
bytes = "1.0"
git-version = "0.3.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c3a83d02..f0444988 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -339,7 +339,7 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
- for node in ring.config.members.keys() {
+ for node in ring.layout.node_ids().iter() {
let node = (*node).into();
let resp = self
.endpoint
@@ -383,7 +383,7 @@ impl AdminRpcHandler {
let mut ret = String::new();
let ring = self.garage.system.ring.borrow().clone();
- for node in ring.config.members.keys() {
+ for node in ring.layout.node_ids().iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 2ff46088..a916974e 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -2,7 +2,7 @@ use std::collections::HashSet;
use garage_util::error::*;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
@@ -20,11 +20,8 @@ pub async fn cli_command_dispatch(
Command::Node(NodeOperation::Connect(connect_opt)) => {
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
}
- Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
- }
- Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
+ Command::Layout(layout_opt) => {
+ cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
@@ -48,56 +45,60 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
- let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
+ let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
- healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
- id = adv.id,
- host = adv.status.hostname,
- addr = adv.addr,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
- } else {
- healthy_nodes.push(format!(
- "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED",
- id = adv.id,
- h = adv.status.hostname,
- addr = adv.addr,
- ));
+ match layout.roles.get(&adv.id) {
+ Some(NodeRoleV(Some(cfg))) => {
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
+ _ => {
+ let new_role = match layout.staging.get(&adv.id) {
+ Some(NodeRoleV(Some(_))) => "(pending)",
+ _ => "NO ROLE ASSIGNED",
+ };
+ healthy_nodes.push(format!(
+ "{id:?}\t{h}\t{addr}\t{new_role}",
+ id = adv.id,
+ h = adv.status.hostname,
+ addr = adv.addr,
+ new_role = new_role,
+ ));
+ }
}
}
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status.iter().any(|adv| !adv.is_up);
- let failure_case_2 = config
- .members
+ let failure_case_2 = layout
+ .roles
+ .items()
.iter()
- .any(|(id, _)| !status_keys.contains(id));
+ .filter(|(_, _, v)| v.0.is_some())
+ .any(|(id, _, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
- tag = cfg.tag,
+ tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
last_seen = adv
@@ -107,20 +108,28 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
}
- for (id, cfg) in config.members.iter() {
- if !status_keys.contains(id) {
- failed_nodes.push(format!(
- "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
- id = id,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
+ for (id, _, role_v) in layout.roles.items().iter() {
+ if let NodeRoleV(Some(cfg)) = role_v {
+ if !status_keys.contains(id) {
+ failed_nodes.push(format!(
+ "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen",
+ id = id,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
}
}
format_table(failed_nodes);
}
+ if print_staging_role_changes(&layout) {
+ println!();
+ println!("Please use `garage layout show` to check the proposed new layout and apply it.");
+ println!();
+ }
+
Ok(())
}
@@ -141,115 +150,6 @@ pub async fn cmd_connect(
}
}
-pub async fn cmd_configure(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: ConfigureNodeOpt,
-) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
-
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
- if config.members.remove(&replaced_node).is_none() {
- return Err(Error::Message(format!(
- "Cannot replace node {:?} as it is not in current configuration",
- replaced_node
- )));
- }
- }
-
- if args.capacity.is_some() && args.gateway {
- return Err(Error::Message(
- "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
- }
- if args.capacity == Some(0) {
- return Err(Error::Message("Invalid capacity value: 0".into()));
- }
-
- let new_entry = match config.members.get(&added_node) {
- None => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => return Err(Error::Message(
- "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
- };
- NetworkConfigEntry {
- zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
- capacity,
- tag: args.tag.unwrap_or_default(),
- }
- }
- Some(old) => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => old.capacity,
- };
- NetworkConfigEntry {
- zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
- capacity,
- tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
- }
- }
- };
-
- config.members.insert(added_node, new_entry);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
-pub async fn cmd_remove(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: RemoveNodeOpt,
-) -> Result<(), Error> {
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
-
- if !args.yes {
- return Err(Error::Message(format!(
- "Add the flag --yes to really remove {:?} from the cluster",
- deleted_node
- )));
- }
-
- config.members.remove(&deleted_node);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
@@ -283,5 +183,3 @@ pub async fn cmd_admin(
}
Ok(())
}
-
-// --- Utility functions ----
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
new file mode 100644
index 00000000..0d9e4fa4
--- /dev/null
+++ b/src/garage/cli/layout.rs
@@ -0,0 +1,340 @@
+use garage_util::crdt::Crdt;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::layout::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
+
+use crate::cli::*;
+
+pub async fn cli_layout_command_dispatch(
+ cmd: LayoutOperation,
+ system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ match cmd {
+ LayoutOperation::Assign(configure_opt) => {
+ cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
+ }
+ LayoutOperation::Remove(remove_opt) => {
+ cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
+ }
+ LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await,
+ LayoutOperation::Apply(apply_opt) => {
+ cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await
+ }
+ LayoutOperation::Revert(revert_opt) => {
+ cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
+ }
+ }
+}
+
+pub async fn cmd_assign_role(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: AssignRoleOpt,
+) -> Result<(), Error> {
+ let status = match rpc_cli
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
+
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for replaced in args.replace.iter() {
+ let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
+ match roles.get(&replaced_node) {
+ Some(NodeRoleV(Some(_))) => {
+ layout
+ .staging
+ .merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
+ }
+ _ => {
+ return Err(Error::Message(format!(
+ "Cannot replace node {:?} as it is not currently in planned layout",
+ replaced_node
+ )));
+ }
+ }
+ }
+
+ if args.capacity.is_some() && args.gateway {
+ return Err(Error::Message(
+ "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
+ }
+ if args.capacity == Some(0) {
+ return Err(Error::Message("Invalid capacity value: 0".into()));
+ }
+
+ let new_entry = match roles.get(&added_node) {
+ Some(NodeRoleV(Some(old))) => {
+ let capacity = match args.capacity {
+ Some(c) => Some(c),
+ None if args.gateway => None,
+ None => old.capacity,
+ };
+ let tags = if args.tags.is_empty() {
+ old.tags.clone()
+ } else {
+ args.tags
+ };
+ NodeRole {
+ zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
+ capacity,
+ tags,
+ }
+ }
+ _ => {
+ let capacity = match args.capacity {
+ Some(c) => Some(c),
+ None if args.gateway => None,
+ None => return Err(Error::Message(
+ "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
+ };
+ NodeRole {
+ zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
+ capacity,
+ tags: args.tags,
+ }
+ }
+ };
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("Role change is staged but not yet commited.");
+ println!("Use `garage layout show` to view staged role changes,");
+ println!("and `garage layout apply` to enact staged changes.");
+ Ok(())
+}
+
+pub async fn cmd_remove_role(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: RemoveRoleOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ let deleted_node =
+ find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("Role removal is staged but not yet commited.");
+ println!("Use `garage layout show` to view staged role changes,");
+ println!("and `garage layout apply` to enact staged changes.");
+ Ok(())
+}
+
+pub async fn cmd_show_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ println!("==== CURRENT CLUSTER LAYOUT ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes currently have a role in the cluster.");
+ println!("See `garage status` to view available nodes.");
+ }
+ println!();
+ println!("Current cluster layout version: {}", layout.version);
+
+ if print_staging_role_changes(&layout) {
+ layout.roles.merge(&layout.staging);
+
+ println!();
+ println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes have a role in the new layout.");
+ }
+ println!();
+
+ // this will print the stats of what partitions
+ // will move around when we apply
+ if layout.calculate_partition_assignation() {
+ println!("To enact the staged role changes, type:");
+ println!();
+ println!(" garage layout apply --version {}", layout.version + 1);
+ println!();
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ layout.version + 1
+ );
+ } else {
+ println!("Not enough nodes have an assigned role to maintain enough copies of data.");
+ println!("This new layout cannot yet be applied.");
+ }
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_apply_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ apply_opt: ApplyLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ match apply_opt.version {
+ None => {
+ println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
+ println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
+ return Err(Error::Message("--version flag is missing".into()));
+ }
+ Some(v) => {
+ if v != layout.version + 1 {
+ return Err(Error::Message("Invalid value of --version flag".into()));
+ }
+ }
+ }
+
+ layout.roles.merge(&layout.staging);
+
+ if !layout.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ layout.staging.clear();
+ layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+
+ layout.version += 1;
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("New cluster layout with updated role assignation has been applied in cluster.");
+ println!("Data will now be moved around between nodes accordingly.");
+
+ Ok(())
+}
+
+pub async fn cmd_revert_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ revert_opt: RevertLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ match revert_opt.version {
+ None => {
+ println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
+ println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
+ return Err(Error::Message("--version flag is missing".into()));
+ }
+ Some(v) => {
+ if v != layout.version + 1 {
+ return Err(Error::Message("Invalid value of --version flag".into()));
+ }
+ }
+ }
+
+ layout.staging.clear();
+ layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+
+ layout.version += 1;
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("All proposed role changes in cluster layout have been canceled.");
+ Ok(())
+}
+
+// --- utility ---
+
+pub async fn fetch_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<ClusterLayout, Error> {
+ match rpc_cli
+ .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
+ resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ }
+}
+
+pub async fn send_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ layout: ClusterLayout,
+) -> Result<(), Error> {
+ rpc_cli
+ .call(
+ &rpc_host,
+ &SystemRpc::AdvertiseClusterLayout(layout),
+ PRIO_NORMAL,
+ )
+ .await??;
+ Ok(())
+}
+
+pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
+ let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ for (id, _, role) in layout.roles.items().iter() {
+ let role = match &role.0 {
+ Some(r) => r,
+ _ => continue,
+ };
+ let tags = role.tags.join(",");
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ }
+ if table.len() == 1 {
+ false
+ } else {
+ format_table(table);
+ true
+ }
+}
+
+pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
+ if !layout.staging.items().is_empty() {
+ println!();
+ println!("==== STAGED ROLE CHANGES ====");
+ let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ for (id, _, role) in layout.staging.items().iter() {
+ if let Some(role) = &role.0 {
+ let tags = role.tags.join(",");
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ } else {
+ table.push(format!("{:?}\tREMOVED", id));
+ }
+ }
+ format_table(table);
+ true
+ } else {
+ false
+ }
+}
diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs
index 1567f377..17a2d8ce 100644
--- a/src/garage/cli/mod.rs
+++ b/src/garage/cli/mod.rs
@@ -1,9 +1,11 @@
pub(crate) mod cmd;
pub(crate) mod init;
+pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) use cmd::*;
pub(crate) use init::*;
+pub(crate) use layout::*;
pub(crate) use structs::*;
pub(crate) use util::*;
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index b930d8a8..b2b5375d 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -8,23 +8,23 @@ pub enum Command {
#[structopt(name = "server")]
Server,
- /// Print identifier (public key) of this Garage node
- #[structopt(name = "node-id")]
- NodeId(NodeIdOpt),
-
/// Get network status
#[structopt(name = "status")]
Status,
- /// Garage node operations
+ /// Operations on individual Garage nodes
#[structopt(name = "node")]
Node(NodeOperation),
- /// Bucket operations
+ /// Operations on the assignation of node roles in the cluster layout
+ #[structopt(name = "layout")]
+ Layout(LayoutOperation),
+
+ /// Operations on buckets
#[structopt(name = "bucket")]
Bucket(BucketOperation),
- /// Key operations
+ /// Operations on S3 access keys
#[structopt(name = "key")]
Key(KeyOperation),
@@ -39,17 +39,13 @@ pub enum Command {
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
+ /// Print identifier (public key) of this Garage node
+ #[structopt(name = "id")]
+ NodeId(NodeIdOpt),
+
/// Connect to Garage node that is currently isolated from the system
#[structopt(name = "connect")]
Connect(ConnectNodeOpt),
-
- /// Configure Garage node
- #[structopt(name = "configure")]
- Configure(ConfigureNodeOpt),
-
- /// Remove Garage node from cluster
- #[structopt(name = "remove")]
- Remove(RemoveNodeOpt),
}
#[derive(StructOpt, Debug)]
@@ -67,8 +63,31 @@ pub struct ConnectNodeOpt {
}
#[derive(StructOpt, Debug)]
-pub struct ConfigureNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
+pub enum LayoutOperation {
+ /// Assign role to Garage node
+ #[structopt(name = "assign")]
+ Assign(AssignRoleOpt),
+
+ /// Remove role from Garage cluster node
+ #[structopt(name = "remove")]
+ Remove(RemoveRoleOpt),
+
+ /// Show roles currently assigned to nodes and changes staged for commit
+ #[structopt(name = "show")]
+ Show,
+
+ /// Apply staged changes to cluster layout
+ #[structopt(name = "apply")]
+ Apply(ApplyLayoutOpt),
+
+ /// Revert staged changes to cluster layout
+ #[structopt(name = "revert")]
+ Revert(RevertLayoutOpt),
+}
+
+#[derive(StructOpt, Debug)]
+pub struct AssignRoleOpt {
+ /// Node to which to assign role (prefix of hexadecimal node id)
pub(crate) node_id: String,
/// Location (zone or datacenter) of the node
@@ -83,9 +102,9 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "g", long = "gateway")]
pub(crate) gateway: bool,
- /// Optional node tag
+ /// Optional tags to add to node
#[structopt(short = "t", long = "tag")]
- pub(crate) tag: Option<String>,
+ pub(crate) tags: Vec<String>,
/// Replaced node(s): list of node IDs that will be removed from the current cluster
#[structopt(long = "replace")]
@@ -93,13 +112,24 @@ pub struct ConfigureNodeOpt {
}
#[derive(StructOpt, Debug)]
-pub struct RemoveNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
+pub struct RemoveRoleOpt {
+ /// Node whose role to remove (prefix of hexadecimal node id)
pub(crate) node_id: String,
+}
- /// If this flag is not given, the node won't be removed
- #[structopt(long = "yes")]
- pub(crate) yes: bool,
+#[derive(StructOpt, Debug)]
+pub struct ApplyLayoutOpt {
+ /// Version number of new configuration: this command will fail if
+ /// it is not exactly 1 + the previous configuration's version
+ #[structopt(long = "version")]
+ pub(crate) version: Option<u64>,
+}
+
+#[derive(StructOpt, Debug)]
+pub struct RevertLayoutOpt {
+ /// Version number of old configuration to which to revert
+ #[structopt(long = "version")]
+ pub(crate) version: Option<u64>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 70c959f8..69cd16e7 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -70,7 +70,9 @@ async fn main() {
server::run_server(opt.config_file).await
}
- Command::NodeId(node_id_opt) => node_id_command(opt.config_file, node_id_opt.quiet),
+ Command::Node(NodeOperation::NodeId(node_id_opt)) => {
+ node_id_command(opt.config_file, node_id_opt.quiet)
+ }
_ => cli_command(opt).await,
};
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index f4085c13..7979a79a 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_model"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Core data model for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
@@ -13,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.4.0", path = "../rpc" }
-garage_table = { version = "0.4.0", path = "../table" }
-garage_util = { version = "0.4.0", path = "../util" }
+garage_rpc = { version = "0.5.0", path = "../rpc" }
+garage_table = { version = "0.5.0", path = "../table" }
+garage_util = { version = "0.5.0", path = "../util" }
async-trait = "0.1.7"
arc-swap = "1.0"
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index ac7c2a2e..d8ebb71e 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_rpc"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Cluster membership management and RPC protocol for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
@@ -13,7 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.4.0", path = "../util" }
+garage_util = { version = "0.5.0", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
@@ -26,6 +27,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+serde_bytes = "0.11"
serde_json = "1.0"
futures = "0.3"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
new file mode 100644
index 00000000..895dbf1c
--- /dev/null
+++ b/src/rpc/layout.rs
@@ -0,0 +1,579 @@
+use std::cmp::Ordering;
+use std::collections::{HashMap, HashSet};
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
+use garage_util::data::*;
+
+use crate::ring::*;
+
+/// The layout of the cluster, i.e. the list of roles
+/// which are assigned to each cluster node
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct ClusterLayout {
+ pub version: u64,
+
+ pub replication_factor: usize,
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+
+ /// node_id_vec: a vector of node IDs with a role assigned
+ /// in the system (this includes gateway nodes).
+ /// The order here is different than the vec stored by `roles`, because:
+ /// 1. non-gateway nodes are first so that they have lower numbers
+ /// 2. nodes that don't have a role are excluded (but they need to
+ /// stay in the CRDT as tombstones)
+ pub node_id_vec: Vec<Uuid>,
+ /// the assignation of data partitions to node, the values
+ /// are indices in node_id_vec
+ #[serde(with = "serde_bytes")]
+ pub ring_assignation_data: Vec<CompactNodeType>,
+
+ /// Role changes which are staged for the next version of the layout
+ pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_hash: Hash,
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct NodeRoleV(pub Option<NodeRole>);
+
+impl AutoCrdt for NodeRoleV {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+/// The user-assigned roles of cluster nodes
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct NodeRole {
+ /// Datacenter at which this entry belong. This information might be used to perform a better
+ /// geodistribution
+ pub zone: String,
+ /// The (relative) capacity of the node
+ /// If this is set to None, the node does not participate in storing data for the system
+ /// and is only active as an API gateway to other nodes
+ pub capacity: Option<u32>,
+ /// A set of tags to recognize the node
+ pub tags: Vec<String>,
+}
+
+impl NodeRole {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => format!("{}", c),
+ None => "gateway".to_string(),
+ }
+ }
+}
+
+impl ClusterLayout {
+ pub fn new(replication_factor: usize) -> Self {
+ let empty_lwwmap = LwwMap::new();
+ let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
+
+ ClusterLayout {
+ version: 0,
+ replication_factor,
+ roles: LwwMap::new(),
+ node_id_vec: Vec::new(),
+ ring_assignation_data: Vec::new(),
+ staging: empty_lwwmap,
+ staging_hash: empty_lwwmap_hash,
+ }
+ }
+
+ pub fn merge(&mut self, other: &ClusterLayout) -> bool {
+ match other.version.cmp(&self.version) {
+ Ordering::Greater => {
+ *self = other.clone();
+ true
+ }
+ Ordering::Equal => {
+ self.staging.merge(&other.staging);
+
+ let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let changed = new_staging_hash != self.staging_hash;
+
+ self.staging_hash = new_staging_hash;
+
+ changed
+ }
+ Ordering::Less => false,
+ }
+ }
+
+ /// Returns a list of IDs of nodes that currently have
+ /// a role in the cluster
+ pub fn node_ids(&self) -> &[Uuid] {
+ &self.node_id_vec[..]
+ }
+
+ pub fn num_nodes(&self) -> usize {
+ self.node_id_vec.len()
+ }
+
+ /// Returns the role of a node in the layout
+ pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
+ match self.roles.get(node) {
+ Some(NodeRoleV(Some(v))) => Some(v),
+ _ => None,
+ }
+ }
+
+ /// Check a cluster layout for internal consistency
+ /// returns true if consistent, false if error
+ pub fn check(&self) -> bool {
+ // Check that the hash of the staging data is correct
+ let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ if staging_hash != self.staging_hash {
+ return false;
+ }
+
+ // Check that node_id_vec contains the correct list of nodes
+ let mut expected_nodes = self
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| v.0.is_some())
+ .map(|(id, _, _)| *id)
+ .collect::<Vec<_>>();
+ expected_nodes.sort();
+ let mut node_id_vec = self.node_id_vec.clone();
+ node_id_vec.sort();
+ if expected_nodes != node_id_vec {
+ return false;
+ }
+
+ // Check that the assignation data has the correct length
+ if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor {
+ return false;
+ }
+
+ // Check that the assigned nodes are correct identifiers
+ // of nodes that are assigned a role
+ // and that role is not the role of a gateway nodes
+ for x in self.ring_assignation_data.iter() {
+ if *x as usize >= self.node_id_vec.len() {
+ return false;
+ }
+ let node = self.node_id_vec[*x as usize];
+ match self.roles.get(&node) {
+ Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
+ _ => return false,
+ }
+ }
+
+ true
+ }
+
+ /// Calculate an assignation of partitions to nodes
+ pub fn calculate_partition_assignation(&mut self) -> bool {
+ let (configured_nodes, zones) = self.configured_nodes_and_zones();
+ let n_zones = zones.len();
+
+ println!("Calculating updated partition assignation, this may take some time...");
+ println!();
+
+ let old_partitions = self.parse_assignation_data();
+
+ let mut partitions = old_partitions.clone();
+ for part in partitions.iter_mut() {
+ part.nodes
+ .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false));
+ }
+
+ // When nodes are removed, or when bootstraping an assignation from
+ // scratch for a new cluster, the old partitions will have holes (or be empty).
+ // Here we add more nodes to make a complete (sub-optimal) assignation,
+ // using an initial partition assignation that is calculated using the multi-dc maglev trick
+ match self.initial_partition_assignation() {
+ Some(initial_partitions) => {
+ for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
+ for (id, info) in ipart.nodes.iter() {
+ if part.nodes.len() < self.replication_factor {
+ part.add(part.nodes.len() + 1, n_zones, id, info.unwrap());
+ }
+ }
+ assert!(part.nodes.len() == self.replication_factor);
+ }
+ }
+ None => {
+ return false;
+ }
+ }
+
+ // Calculate how many partitions each node should ideally store,
+ // and how many partitions they are storing with the current assignation
+ // This defines our target for which we will optimize in the following loop.
+ let total_capacity = configured_nodes
+ .iter()
+ .map(|(_, info)| info.capacity.unwrap_or(0))
+ .sum::<u32>() as usize;
+ let total_partitions = self.replication_factor * (1 << PARTITION_BITS);
+ let target_partitions_per_node = configured_nodes
+ .iter()
+ .map(|(id, info)| {
+ (
+ *id,
+ info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity,
+ )
+ })
+ .collect::<HashMap<&Uuid, usize>>();
+
+ let mut partitions_per_node = self.partitions_per_node(&partitions[..]);
+
+ println!("Target number of partitions per node:");
+ for (node, npart) in target_partitions_per_node.iter() {
+ println!("{:?}\t{}", node, npart);
+ }
+ println!();
+
+ // Shuffle partitions between nodes so that nodes will reach (or better approach)
+ // their target number of stored partitions
+ loop {
+ let mut option = None;
+ for (i, part) in partitions.iter_mut().enumerate() {
+ for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
+ let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32
+ - target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32;
+
+ for (idadd, infoadd) in configured_nodes.iter() {
+ // skip replacing a node by itself
+ // and skip replacing by gateway nodes
+ if idadd == idrm || infoadd.capacity.is_none() {
+ continue;
+ }
+
+ let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32
+ - target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32;
+
+ // We want to try replacing node idrm by node idadd
+ // if that brings us close to our goal.
+ let square = |i: i32| i * i;
+ let oldcost = square(suprm) + square(supadd);
+ let newcost = square(suprm - 1) + square(supadd + 1);
+ if newcost >= oldcost {
+ // not closer to our goal
+ continue;
+ }
+ let gain = oldcost - newcost;
+
+ let mut newpart = part.clone();
+
+ newpart.nodes.remove(irm);
+ if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) {
+ continue;
+ }
+ assert!(newpart.nodes.len() == self.replication_factor);
+
+ if !old_partitions[i]
+ .is_valid_transition_to(&newpart, self.replication_factor)
+ {
+ continue;
+ }
+
+ if option
+ .as_ref()
+ .map(|(old_gain, _, _, _, _)| gain > *old_gain)
+ .unwrap_or(true)
+ {
+ option = Some((gain, i, idadd, idrm, newpart));
+ }
+ }
+ }
+ }
+ if let Some((_gain, i, idadd, idrm, newpart)) = option {
+ *partitions_per_node.entry(idadd).or_insert(0) += 1;
+ *partitions_per_node.get_mut(idrm).unwrap() -= 1;
+ partitions[i] = newpart;
+ } else {
+ break;
+ }
+ }
+
+ // Check we completed the assignation correctly
+ // (this is a set of checks for the algorithm's consistency)
+ assert!(partitions.len() == (1 << PARTITION_BITS));
+ assert!(partitions
+ .iter()
+ .all(|p| p.nodes.len() == self.replication_factor));
+
+ let new_partitions_per_node = self.partitions_per_node(&partitions[..]);
+ assert!(new_partitions_per_node == partitions_per_node);
+
+ // Show statistics
+ println!("New number of partitions per node:");
+ for (node, npart) in partitions_per_node.iter() {
+ println!("{:?}\t{}", node, npart);
+ }
+ println!();
+
+ let mut diffcount = HashMap::new();
+ for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) {
+ let nminus = oldpart.txtplus(newpart);
+ let nplus = newpart.txtplus(oldpart);
+ if nminus != "[...]" || nplus != "[...]" {
+ let tup = (nminus, nplus);
+ *diffcount.entry(tup).or_insert(0) += 1;
+ }
+ }
+ if diffcount.is_empty() {
+ println!("No data will be moved between nodes.");
+ } else {
+ let mut diffcount = diffcount.into_iter().collect::<Vec<_>>();
+ diffcount.sort();
+ println!("Number of partitions that move:");
+ for ((nminus, nplus), npart) in diffcount {
+ println!("\t{}\t{} -> {}", npart, nminus, nplus);
+ }
+ }
+ println!();
+
+ // Calculate and save new assignation data
+ let (nodes, assignation_data) =
+ self.compute_assignation_data(&configured_nodes[..], &partitions[..]);
+
+ self.node_id_vec = nodes;
+ self.ring_assignation_data = assignation_data;
+
+ true
+ }
+
+ fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> {
+ let (configured_nodes, zones) = self.configured_nodes_and_zones();
+ let n_zones = zones.len();
+
+ // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
+ let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
+
+ // Prepare ring
+ let mut partitions: Vec<PartitionAss> = partitions_idx
+ .iter()
+ .map(|_i| PartitionAss::new())
+ .collect::<Vec<_>>();
+
+ // Create MagLev priority queues for each node
+ let mut queues = configured_nodes
+ .iter()
+ .filter(|(_id, info)| info.capacity.is_some())
+ .map(|(node_id, node_info)| {
+ let mut parts = partitions_idx
+ .iter()
+ .map(|i| {
+ let part_data =
+ [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
+ (*i, fasthash(&part_data[..]))
+ })
+ .collect::<Vec<_>>();
+ parts.sort_by_key(|(_i, h)| *h);
+ let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
+ (node_id, node_info, parts_i, 0)
+ })
+ .collect::<Vec<_>>();
+
+ let max_capacity = configured_nodes
+ .iter()
+ .filter_map(|(_, node_info)| node_info.capacity)
+ .fold(0, std::cmp::max);
+
+ // Fill up ring
+ for rep in 0..self.replication_factor {
+ queues.sort_by_key(|(ni, _np, _q, _p)| {
+ let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
+ fasthash(&queue_data[..])
+ });
+
+ for (_, _, _, pos) in queues.iter_mut() {
+ *pos = 0;
+ }
+
+ let mut remaining = partitions_idx.len();
+ while remaining > 0 {
+ let remaining0 = remaining;
+ for i_round in 0..max_capacity {
+ for (node_id, node_info, q, pos) in queues.iter_mut() {
+ if i_round >= node_info.capacity.unwrap() {
+ continue;
+ }
+ for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
+ if partitions[qv].add(rep + 1, n_zones, node_id, node_info) {
+ remaining -= 1;
+ *pos = pos2 + 1;
+ break;
+ }
+ }
+ }
+ }
+ if remaining == remaining0 {
+ // No progress made, exit
+ return None;
+ }
+ }
+ }
+
+ Some(partitions)
+ }
+
+ fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) {
+ let configured_nodes = self
+ .roles
+ .items()
+ .iter()
+ .filter(|(_id, _, info)| info.0.is_some())
+ .map(|(id, _, info)| (id, info.0.as_ref().unwrap()))
+ .collect::<Vec<(&Uuid, &NodeRole)>>();
+
+ let zones = configured_nodes
+ .iter()
+ .filter(|(_id, info)| info.capacity.is_some())
+ .map(|(_id, info)| info.zone.as_str())
+ .collect::<HashSet<&str>>();
+
+ (configured_nodes, zones)
+ }
+
+ fn compute_assignation_data<'a>(
+ &self,
+ configured_nodes: &[(&'a Uuid, &'a NodeRole)],
+ partitions: &[PartitionAss<'a>],
+ ) -> (Vec<Uuid>, Vec<CompactNodeType>) {
+ assert!(partitions.len() == (1 << PARTITION_BITS));
+
+ // Make a canonical order for nodes
+ let mut nodes = configured_nodes
+ .iter()
+ .filter(|(_id, info)| info.capacity.is_some())
+ .map(|(id, _)| **id)
+ .collect::<Vec<_>>();
+ let nodes_rev = nodes
+ .iter()
+ .enumerate()
+ .map(|(i, id)| (*id, i as CompactNodeType))
+ .collect::<HashMap<Uuid, CompactNodeType>>();
+
+ let mut assignation_data = vec![];
+ for partition in partitions.iter() {
+ assert!(partition.nodes.len() == self.replication_factor);
+ for (id, _) in partition.nodes.iter() {
+ assignation_data.push(*nodes_rev.get(id).unwrap());
+ }
+ }
+
+ nodes.extend(
+ configured_nodes
+ .iter()
+ .filter(|(_id, info)| info.capacity.is_none())
+ .map(|(id, _)| **id),
+ );
+
+ (nodes, assignation_data)
+ }
+
+ fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> {
+ if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) {
+ // If the previous assignation data is correct, use that
+ let mut partitions = vec![];
+ for i in 0..(1 << PARTITION_BITS) {
+ let mut part = PartitionAss::new();
+ for node_i in self.ring_assignation_data
+ [i * self.replication_factor..(i + 1) * self.replication_factor]
+ .iter()
+ {
+ let node_id = &self.node_id_vec[*node_i as usize];
+
+ if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) {
+ part.nodes.push((node_id, Some(info)));
+ } else {
+ part.nodes.push((node_id, None));
+ }
+ }
+ partitions.push(part);
+ }
+ partitions
+ } else {
+ // Otherwise start fresh
+ (0..(1 << PARTITION_BITS))
+ .map(|_| PartitionAss::new())
+ .collect()
+ }
+ }
+
+ fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> {
+ let mut partitions_per_node = HashMap::<&Uuid, usize>::new();
+ for p in partitions.iter() {
+ for (id, _) in p.nodes.iter() {
+ *partitions_per_node.entry(*id).or_insert(0) += 1;
+ }
+ }
+ partitions_per_node
+ }
+}
+
+// ---- Internal structs for partition assignation in layout ----
+
+#[derive(Clone)]
+struct PartitionAss<'a> {
+ nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>,
+}
+
+impl<'a> PartitionAss<'a> {
+ fn new() -> Self {
+ Self { nodes: Vec::new() }
+ }
+
+ fn nplus(&self, other: &PartitionAss<'a>) -> usize {
+ self.nodes
+ .iter()
+ .filter(|x| !other.nodes.contains(x))
+ .count()
+ }
+
+ fn txtplus(&self, other: &PartitionAss<'a>) -> String {
+ let mut nodes = self
+ .nodes
+ .iter()
+ .filter(|x| !other.nodes.contains(x))
+ .map(|x| format!("{:?}", x.0))
+ .collect::<Vec<_>>();
+ nodes.sort();
+ if self.nodes.iter().any(|x| other.nodes.contains(x)) {
+ nodes.push("...".into());
+ }
+ format!("[{}]", nodes.join(" "))
+ }
+
+ fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool {
+ let min_keep_nodes_per_part = (replication_factor + 1) / 2;
+ let n_removed = self.nplus(other);
+
+ if self.nodes.len() <= min_keep_nodes_per_part {
+ n_removed == 0
+ } else {
+ n_removed <= self.nodes.len() - min_keep_nodes_per_part
+ }
+ }
+
+ fn add(
+ &mut self,
+ target_len: usize,
+ n_zones: usize,
+ node: &'a Uuid,
+ role: &'a NodeRole,
+ ) -> bool {
+ if self.nodes.len() != target_len - 1 {
+ return false;
+ }
+
+ let p_zns = self
+ .nodes
+ .iter()
+ .map(|(_id, info)| info.unwrap().zone.as_str())
+ .collect::<HashSet<&str>>();
+ if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str()))
+ || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node))
+ {
+ self.nodes.push((node, Some(role)));
+ true
+ } else {
+ false
+ }
+ }
+}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index ea3f1139..b72392ab 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -5,6 +5,7 @@ extern crate log;
mod consul;
+pub mod layout;
pub mod ring;
pub mod system;
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 3cb0d233..73a126a2 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -1,12 +1,11 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
//! and metadata
-use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
-use serde::{Deserialize, Serialize};
-
use garage_util::data::*;
+use crate::layout::ClusterLayout;
+
/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
@@ -22,47 +21,6 @@ pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
-/// The user-defined configuration of the cluster's nodes
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NetworkConfig {
- /// Map of each node's id to it's configuration
- pub members: HashMap<Uuid, NetworkConfigEntry>,
- /// Version of this config
- pub version: u64,
-}
-
-impl NetworkConfig {
- pub(crate) fn new() -> Self {
- Self {
- members: HashMap::new(),
- version: 0,
- }
- }
-}
-
-/// The overall configuration of one (possibly remote) node
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NetworkConfigEntry {
- /// Datacenter at which this entry belong. This infromation might be used to perform a better
- /// geodistribution
- pub zone: String,
- /// The (relative) capacity of the node
- /// If this is set to None, the node does not participate in storing data for the system
- /// and is only active as an API gateway to other nodes
- pub capacity: Option<u32>,
- /// A tag to recognize the entry, not used for other things than display
- pub tag: String,
-}
-
-impl NetworkConfigEntry {
- pub fn capacity_string(&self) -> String {
- match self.capacity {
- Some(c) => format!("{}", c),
- None => "gateway".to_string(),
- }
- }
-}
-
/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
@@ -70,7 +28,7 @@ pub struct Ring {
pub replication_factor: usize,
/// The network configuration used to generate this ring
- pub config: NetworkConfig,
+ pub layout: ClusterLayout,
// Internal order of nodes used to make a more compact representation of the ring
nodes: Vec<Uuid>,
@@ -81,7 +39,7 @@ pub struct Ring {
// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
-type CompactNodeType = u8;
+pub type CompactNodeType = u8;
// The maximum number of times an object might get replicated
// This must be at least 3 because Garage supports 3-way replication
@@ -102,132 +60,26 @@ struct RingEntry {
}
impl Ring {
- // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
- // levels of imbrication. It is basically impossible to test, maintain, or understand for an
- // outsider.
- pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self {
- // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
- let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
-
- let zones = config
- .members
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(_id, info)| info.zone.as_str())
- .collect::<HashSet<&str>>();
- let n_zones = zones.len();
-
- // Prepare ring
- let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx
- .iter()
- .map(|_i| Vec::new())
- .collect::<Vec<_>>();
-
- // Create MagLev priority queues for each node
- let mut queues = config
- .members
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(node_id, node_info)| {
- let mut parts = partitions_idx
- .iter()
- .map(|i| {
- let part_data =
- [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
- (*i, fasthash(&part_data[..]))
- })
- .collect::<Vec<_>>();
- parts.sort_by_key(|(_i, h)| *h);
- let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
- (node_id, node_info, parts_i, 0)
- })
- .collect::<Vec<_>>();
-
- let max_capacity = config
- .members
- .iter()
- .filter_map(|(_, node_info)| node_info.capacity)
- .fold(0, std::cmp::max);
-
- assert!(replication_factor <= MAX_REPLICATION);
-
- // Fill up ring
- for rep in 0..replication_factor {
- queues.sort_by_key(|(ni, _np, _q, _p)| {
- let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
- fasthash(&queue_data[..])
- });
-
- for (_, _, _, pos) in queues.iter_mut() {
- *pos = 0;
- }
-
- let mut remaining = partitions_idx.len();
- while remaining > 0 {
- let remaining0 = remaining;
- for i_round in 0..max_capacity {
- for (node_id, node_info, q, pos) in queues.iter_mut() {
- if i_round >= node_info.capacity.unwrap() {
- continue;
- }
- for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
- if partitions[qv].len() != rep {
- continue;
- }
- let p_zns = partitions[qv]
- .iter()
- .map(|(_id, info)| info.zone.as_str())
- .collect::<HashSet<&str>>();
- if (p_zns.len() < n_zones && !p_zns.contains(&node_info.zone.as_str()))
- || (p_zns.len() == n_zones
- && !partitions[qv].iter().any(|(id, _i)| id == node_id))
- {
- partitions[qv].push((node_id, node_info));
- remaining -= 1;
- *pos = pos2 + 1;
- break;
- }
- }
- }
- }
- if remaining == remaining0 {
- // No progress made, exit
- warn!("Could not build ring, not enough nodes configured.");
- return Self {
- replication_factor,
- config,
- nodes: vec![],
- ring: vec![],
- };
- }
- }
+ pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self {
+ if replication_factor != layout.replication_factor {
+ warn!("Could not build ring: replication factor does not match between local configuration and network role assignation.");
+ return Self::empty(layout, replication_factor);
}
- // Make a canonical order for nodes
- let nodes = config
- .members
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(id, _)| *id)
- .collect::<Vec<_>>();
- let nodes_rev = nodes
- .iter()
- .enumerate()
- .map(|(i, id)| (*id, i as CompactNodeType))
- .collect::<HashMap<Uuid, CompactNodeType>>();
+ if layout.ring_assignation_data.len() != replication_factor * (1 << PARTITION_BITS) {
+ warn!("Could not build ring: network role assignation data has invalid length");
+ return Self::empty(layout, replication_factor);
+ }
- let ring = partitions
- .iter()
- .enumerate()
- .map(|(i, nodes)| {
+ let nodes = layout.node_id_vec.clone();
+ let ring = (0..(1 << PARTITION_BITS))
+ .map(|i| {
let top = (i as u16) << (16 - PARTITION_BITS);
- let nodes = nodes
- .iter()
- .map(|(id, _info)| *nodes_rev.get(id).unwrap())
- .collect::<Vec<CompactNodeType>>();
- assert!(nodes.len() == replication_factor);
let mut nodes_buf = [0u8; MAX_REPLICATION];
- nodes_buf[..replication_factor].copy_from_slice(&nodes[..]);
+ nodes_buf[..replication_factor].copy_from_slice(
+ &layout.ring_assignation_data
+ [replication_factor * i..replication_factor * (i + 1)],
+ );
RingEntry {
hash_prefix: top,
nodes_buf,
@@ -237,12 +89,21 @@ impl Ring {
Self {
replication_factor,
- config,
+ layout,
nodes,
ring,
}
}
+ fn empty(layout: ClusterLayout, replication_factor: usize) -> Self {
+ Self {
+ replication_factor,
+ layout,
+ nodes: vec![],
+ ring: vec![],
+ }
+ }
+
/// Get the partition in which data would fall on
pub fn partition_of(&self, position: &Hash) -> Partition {
let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index df0e94f8..68bdfc4f 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -225,7 +225,7 @@ impl RpcHelper {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.config.members.get(&self.0.our_node_id) {
+ let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
@@ -238,7 +238,7 @@ impl RpcHelper {
// and within a same zone we priorize nodes with the lowest latency.
let mut requests = requests
.map(|(to, fut)| {
- let peer_zone = match ring.config.members.get(&to) {
+ let peer_zone = match ring.layout.node_role(&to) {
Some(pc) => &pc.zone,
None => "",
};
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 3f5f7fb1..aa8947ea 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -23,12 +23,13 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
+use crate::layout::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -48,13 +49,13 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String),
- /// Ask other node its config. Answered with AdvertiseConfig
- PullConfig,
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
- AdvertiseConfig(NetworkConfig),
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(ClusterLayout),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
@@ -70,7 +71,7 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_config: Persister<NetworkConfig>,
+ persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
local_status: ArcSwap<NodeStatus>,
@@ -103,8 +104,10 @@ pub struct NodeStatus {
pub hostname: String,
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Configuration version
- pub config_version: u64,
+ /// Cluster layout version
+ pub cluster_layout_version: u64,
+ /// Hash of cluster layout staging data
+ pub cluster_layout_staging_hash: Hash,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -187,17 +190,17 @@ impl System {
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key()));
- let persist_config = Persister::new(&config.metadata_dir, "network_config");
+ let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
- let net_config = match persist_config.load() {
+ let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => x,
Err(e) => {
info!(
- "No valid previous network configuration stored ({}), starting fresh.",
+ "No valid previous cluster layout stored ({}), starting fresh.",
e
);
- NetworkConfig::new()
+ ClusterLayout::new(replication_factor)
}
};
@@ -206,10 +209,11 @@ impl System {
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor,
- config_version: net_config.version,
+ cluster_layout_version: cluster_layout.version,
+ cluster_layout_staging_hash: cluster_layout.staging_hash,
};
- let ring = Ring::new(net_config, replication_factor);
+ let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
if let Some(addr) = config.rpc_public_addr {
@@ -229,7 +233,7 @@ impl System {
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_config,
+ persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
@@ -292,12 +296,12 @@ impl System {
}
/// Save network configuration to disc
- async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
- self.persist_config
- .save_async(&ring.config)
+ self.persist_cluster_layout
+ .save_async(&ring.layout)
.await
- .expect("Cannot save current cluster configuration");
+ .expect("Cannot save current cluster layout");
Ok(())
}
@@ -305,7 +309,8 @@ impl System {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let ring = self.ring.borrow();
- new_si.config_version = ring.config.version;
+ new_si.cluster_layout_version = ring.layout.version;
+ new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
self.local_status.swap(Arc::new(new_si));
}
@@ -337,9 +342,9 @@ impl System {
)));
}
- fn handle_pull_config(&self) -> SystemRpc {
+ fn handle_pull_cluster_layout(&self) -> SystemRpc {
let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseConfig(ring.config.clone())
+ SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
}
fn handle_get_known_nodes(&self) -> SystemRpc {
@@ -360,7 +365,8 @@ impl System {
.unwrap_or(NodeStatus {
hostname: "?".to_string(),
replication_factor: 0,
- config_version: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
}),
})
.collect::<Vec<_>>();
@@ -381,10 +387,12 @@ impl System {
std::process::exit(1);
}
- if info.config_version > local_info.config_version {
+ if info.cluster_layout_version > local_info.cluster_layout_version
+ || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
+ {
let self2 = self.clone();
self.background.spawn_cancellable(async move {
- self2.pull_config(from).await;
+ self2.pull_cluster_layout(from).await;
Ok(())
});
}
@@ -397,32 +405,39 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_config(
+ async fn handle_advertise_cluster_layout(
self: Arc<Self>,
- adv: &NetworkConfig,
+ adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;
- let ring: Arc<Ring> = self.ring.borrow().clone();
+ let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
+
+ let prev_layout_check = layout.check();
+ if layout.merge(adv) {
+ if prev_layout_check && !layout.check() {
+ error!("New cluster layout is invalid, discarding.");
+ return Err(Error::Message(
+ "New cluster layout is invalid, discarding.".into(),
+ ));
+ }
- if adv.version > ring.config.version {
- let ring = Ring::new(adv.clone(), self.replication_factor);
+ let ring = Ring::new(layout.clone(), self.replication_factor);
update_ring.send(Arc::new(ring))?;
drop(update_ring);
let self2 = self.clone();
- let adv = adv.clone();
self.background.spawn_cancellable(async move {
self2
.rpc
.broadcast(
&self2.system_endpoint,
- SystemRpc::AdvertiseConfig(adv),
+ SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
Ok(())
});
- self.background.spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_cluster_layout());
}
Ok(SystemRpc::Ok)
@@ -456,14 +471,15 @@ impl System {
};
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().config.members.is_empty();
+ let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
+ let expected_n_nodes = self.ring.borrow().layout.num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
.iter()
.filter(|p| p.is_up())
- .count() != self.ring.borrow().config.members.len();
+ .count() != expected_n_nodes;
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
@@ -533,18 +549,18 @@ impl System {
self.persist_peer_list.save_async(&peer_list).await
}
- async fn pull_config(self: Arc<Self>, peer: Uuid) {
+ async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc
.call(
&self.system_endpoint,
peer,
- SystemRpc::PullConfig,
+ SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
)
.await;
- if let Ok(SystemRpc::AdvertiseConfig(config)) = resp {
- let _: Result<_, _> = self.handle_advertise_config(&config).await;
+ if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
+ let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
}
}
}
@@ -554,9 +570,11 @@ impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullConfig => Ok(self.handle_pull_config()),
+ SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
- SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(adv).await,
+ SystemRpc::AdvertiseClusterLayout(adv) => {
+ self.clone().handle_advertise_cluster_layout(adv).await
+ }
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 616bf275..dc37f12c 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_table"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Table sharding and replication engine (DynamoDB-like) for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
@@ -13,8 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.4.0", path = "../rpc" }
-garage_util = { version = "0.4.0", path = "../util" }
+garage_rpc = { version = "0.5.0", path = "../rpc" }
+garage_util = { version = "0.5.0", path = "../util" }
async-trait = "0.1.7"
bytes = "1.0"
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 53d2c93b..d6c19f1b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -4,7 +4,6 @@
#[macro_use]
extern crate log;
-pub mod crdt;
pub mod schema;
pub mod util;
@@ -18,3 +17,7 @@ pub mod table;
pub use schema::*;
pub use table::*;
pub use util::*;
+
+pub mod crdt {
+ pub use garage_util::crdt::*;
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 8f01fbdd..18682ace 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -28,10 +28,10 @@ impl TableReplication for TableFullReplication {
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
- ring.config.members.keys().cloned().collect::<Vec<_>>()
+ ring.layout.node_ids().to_vec()
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().config.members.len();
+ let nmembers = self.system.ring.borrow().layout.node_ids().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index f2a001fa..e33f8a66 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_util"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Utility crate for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
diff --git a/src/table/crdt/bool.rs b/src/util/crdt/bool.rs
index 53af8f82..53af8f82 100644
--- a/src/table/crdt/bool.rs
+++ b/src/util/crdt/bool.rs
diff --git a/src/table/crdt/crdt.rs b/src/util/crdt/crdt.rs
index a8f1b9aa..9b5f230d 100644
--- a/src/table/crdt/crdt.rs
+++ b/src/util/crdt/crdt.rs
@@ -1,4 +1,4 @@
-use garage_util::data::*;
+use crate::data::*;
/// Definition of a CRDT - all CRDT Rust types implement this.
///
diff --git a/src/table/crdt/lww.rs b/src/util/crdt/lww.rs
index be197d88..43d13f27 100644
--- a/src/table/crdt/lww.rs
+++ b/src/util/crdt/lww.rs
@@ -1,6 +1,8 @@
+use std::cmp::Ordering;
+
use serde::{Deserialize, Serialize};
-use garage_util::time::now_msec;
+use crate::time::now_msec;
use crate::crdt::crdt::*;
@@ -104,11 +106,15 @@ where
T: Clone + Crdt,
{
fn merge(&mut self, other: &Self) {
- if other.ts > self.ts {
- self.ts = other.ts;
- self.v = other.v.clone();
- } else if other.ts == self.ts {
- self.v.merge(&other.v);
+ match other.ts.cmp(&self.ts) {
+ Ordering::Greater => {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ }
+ Ordering::Equal => {
+ self.v.merge(&other.v);
+ }
+ Ordering::Less => (),
}
}
}
diff --git a/src/table/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index fb25fd46..3e9aba79 100644
--- a/src/table/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -1,6 +1,8 @@
+use std::cmp::Ordering;
+
use serde::{Deserialize, Serialize};
-use garage_util::time::now_msec;
+use crate::time::now_msec;
use crate::crdt::crdt::*;
@@ -135,11 +137,15 @@ where
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
Ok(i) => {
let (_, ts1, _v1) = &self.vals[i];
- if ts2 > ts1 {
- self.vals[i].1 = *ts2;
- self.vals[i].2 = v2.clone();
- } else if ts1 == ts2 {
- self.vals[i].2.merge(v2);
+ match ts2.cmp(ts1) {
+ Ordering::Greater => {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ }
+ Ordering::Equal => {
+ self.vals[i].2.merge(v2);
+ }
+ Ordering::Less => (),
}
}
Err(i) => {
diff --git a/src/table/crdt/map.rs b/src/util/crdt/map.rs
index 7553cd50..7553cd50 100644
--- a/src/table/crdt/map.rs
+++ b/src/util/crdt/map.rs
diff --git a/src/table/crdt/mod.rs b/src/util/crdt/mod.rs
index 9663a5a5..9663a5a5 100644
--- a/src/table/crdt/mod.rs
+++ b/src/util/crdt/mod.rs
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 478b9ea4..64874095 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -5,6 +5,7 @@ extern crate log;
pub mod background;
pub mod config;
+pub mod crdt;
pub mod data;
pub mod error;
pub mod persister;
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 634ce282..72701c90 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage_web"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018"
license = "AGPL-3.0"
description = "S3-like website endpoint crate for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[lib]
path = "lib.rs"
@@ -13,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.4.0", path = "../api" }
-garage_model = { version = "0.4.0", path = "../model" }
-garage_util = { version = "0.4.0", path = "../util" }
-garage_table = { version = "0.4.0", path = "../table" }
+garage_api = { version = "0.5.0", path = "../api" }
+garage_model = { version = "0.5.0", path = "../model" }
+garage_util = { version = "0.5.0", path = "../util" }
+garage_table = { version = "0.5.0", path = "../table" }
err-derive = "0.3"
log = "0.4"