aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-11-09 12:24:04 +0100
committerAlex Auvolat <alex@adnab.me>2021-11-16 16:05:53 +0100
commitc94406f4282d48e2e2ac82ffb57eafaad23f7edc (patch)
tree01fe1b272e18fdae993e2207d8d3aea4a301ec56 /src/garage
parent53888995bdd7c672d2e3ab8bb6a3529195c127a9 (diff)
downloadgarage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.tar.gz
garage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.zip
Improve how node roles are assigned in Garagev0.5-beta1
- change the terminology: the network configuration becomes the role table, the configuration of a nodes becomes a node's role - the modification of the role table takes place in two steps: first, changes are staged in a CRDT data structure. Then, once the user is happy with the changes, they can commit them all at once (or revert them). - update documentation - fix tests - implement smarter partition assignation algorithm This patch breaks the format of the network configuration: when migrating, the cluster will be in a state where no roles are assigned. All roles must be re-assigned and commited at once. This migration should not pose an issue.
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml15
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/cmd.rs214
-rw-r--r--src/garage/cli/layout.rs340
-rw-r--r--src/garage/cli/mod.rs2
-rw-r--r--src/garage/cli/structs.rs78
-rw-r--r--src/garage/main.rs4
7 files changed, 465 insertions, 192 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 0a3bc537..74a6ab0e 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,11 +1,12 @@
[package]
name = "garage"
-version = "0.4.0"
+version = "0.5.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Garage, an S3-compatible distributed object store for self-hosted deployments"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
[[bin]]
name = "garage"
@@ -14,12 +15,12 @@ path = "main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.4.0", path = "../api" }
-garage_model = { version = "0.4.0", path = "../model" }
-garage_rpc = { version = "0.4.0", path = "../rpc" }
-garage_table = { version = "0.4.0", path = "../table" }
-garage_util = { version = "0.4.0", path = "../util" }
-garage_web = { version = "0.4.0", path = "../web" }
+garage_api = { version = "0.5.0", path = "../api" }
+garage_model = { version = "0.5.0", path = "../model" }
+garage_rpc = { version = "0.5.0", path = "../rpc" }
+garage_table = { version = "0.5.0", path = "../table" }
+garage_util = { version = "0.5.0", path = "../util" }
+garage_web = { version = "0.5.0", path = "../web" }
bytes = "1.0"
git-version = "0.3.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c3a83d02..f0444988 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -339,7 +339,7 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
- for node in ring.config.members.keys() {
+ for node in ring.layout.node_ids().iter() {
let node = (*node).into();
let resp = self
.endpoint
@@ -383,7 +383,7 @@ impl AdminRpcHandler {
let mut ret = String::new();
let ring = self.garage.system.ring.borrow().clone();
- for node in ring.config.members.keys() {
+ for node in ring.layout.node_ids().iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 2ff46088..a916974e 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -2,7 +2,7 @@ use std::collections::HashSet;
use garage_util::error::*;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
@@ -20,11 +20,8 @@ pub async fn cli_command_dispatch(
Command::Node(NodeOperation::Connect(connect_opt)) => {
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
}
- Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
- }
- Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
+ Command::Layout(layout_opt) => {
+ cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
@@ -48,56 +45,60 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
- let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
+ let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
- healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
- id = adv.id,
- host = adv.status.hostname,
- addr = adv.addr,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
- } else {
- healthy_nodes.push(format!(
- "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED",
- id = adv.id,
- h = adv.status.hostname,
- addr = adv.addr,
- ));
+ match layout.roles.get(&adv.id) {
+ Some(NodeRoleV(Some(cfg))) => {
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
+ _ => {
+ let new_role = match layout.staging.get(&adv.id) {
+ Some(NodeRoleV(Some(_))) => "(pending)",
+ _ => "NO ROLE ASSIGNED",
+ };
+ healthy_nodes.push(format!(
+ "{id:?}\t{h}\t{addr}\t{new_role}",
+ id = adv.id,
+ h = adv.status.hostname,
+ addr = adv.addr,
+ new_role = new_role,
+ ));
+ }
}
}
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status.iter().any(|adv| !adv.is_up);
- let failure_case_2 = config
- .members
+ let failure_case_2 = layout
+ .roles
+ .items()
.iter()
- .any(|(id, _)| !status_keys.contains(id));
+ .filter(|(_, _, v)| v.0.is_some())
+ .any(|(id, _, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
- tag = cfg.tag,
+ tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
last_seen = adv
@@ -107,20 +108,28 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
}
- for (id, cfg) in config.members.iter() {
- if !status_keys.contains(id) {
- failed_nodes.push(format!(
- "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
- id = id,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
+ for (id, _, role_v) in layout.roles.items().iter() {
+ if let NodeRoleV(Some(cfg)) = role_v {
+ if !status_keys.contains(id) {
+ failed_nodes.push(format!(
+ "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen",
+ id = id,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
}
}
format_table(failed_nodes);
}
+ if print_staging_role_changes(&layout) {
+ println!();
+ println!("Please use `garage layout show` to check the proposed new layout and apply it.");
+ println!();
+ }
+
Ok(())
}
@@ -141,115 +150,6 @@ pub async fn cmd_connect(
}
}
-pub async fn cmd_configure(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: ConfigureNodeOpt,
-) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
-
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
- if config.members.remove(&replaced_node).is_none() {
- return Err(Error::Message(format!(
- "Cannot replace node {:?} as it is not in current configuration",
- replaced_node
- )));
- }
- }
-
- if args.capacity.is_some() && args.gateway {
- return Err(Error::Message(
- "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
- }
- if args.capacity == Some(0) {
- return Err(Error::Message("Invalid capacity value: 0".into()));
- }
-
- let new_entry = match config.members.get(&added_node) {
- None => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => return Err(Error::Message(
- "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
- };
- NetworkConfigEntry {
- zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
- capacity,
- tag: args.tag.unwrap_or_default(),
- }
- }
- Some(old) => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => old.capacity,
- };
- NetworkConfigEntry {
- zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
- capacity,
- tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
- }
- }
- };
-
- config.members.insert(added_node, new_entry);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
-pub async fn cmd_remove(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: RemoveNodeOpt,
-) -> Result<(), Error> {
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
-
- if !args.yes {
- return Err(Error::Message(format!(
- "Add the flag --yes to really remove {:?} from the cluster",
- deleted_node
- )));
- }
-
- config.members.remove(&deleted_node);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
@@ -283,5 +183,3 @@ pub async fn cmd_admin(
}
Ok(())
}
-
-// --- Utility functions ----
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
new file mode 100644
index 00000000..0d9e4fa4
--- /dev/null
+++ b/src/garage/cli/layout.rs
@@ -0,0 +1,340 @@
+use garage_util::crdt::Crdt;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::layout::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
+
+use crate::cli::*;
+
+pub async fn cli_layout_command_dispatch(
+ cmd: LayoutOperation,
+ system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ match cmd {
+ LayoutOperation::Assign(configure_opt) => {
+ cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
+ }
+ LayoutOperation::Remove(remove_opt) => {
+ cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
+ }
+ LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await,
+ LayoutOperation::Apply(apply_opt) => {
+ cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await
+ }
+ LayoutOperation::Revert(revert_opt) => {
+ cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
+ }
+ }
+}
+
+pub async fn cmd_assign_role(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: AssignRoleOpt,
+) -> Result<(), Error> {
+ let status = match rpc_cli
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
+
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ for replaced in args.replace.iter() {
+ let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
+ match roles.get(&replaced_node) {
+ Some(NodeRoleV(Some(_))) => {
+ layout
+ .staging
+ .merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
+ }
+ _ => {
+ return Err(Error::Message(format!(
+ "Cannot replace node {:?} as it is not currently in planned layout",
+ replaced_node
+ )));
+ }
+ }
+ }
+
+ if args.capacity.is_some() && args.gateway {
+ return Err(Error::Message(
+ "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
+ }
+ if args.capacity == Some(0) {
+ return Err(Error::Message("Invalid capacity value: 0".into()));
+ }
+
+ let new_entry = match roles.get(&added_node) {
+ Some(NodeRoleV(Some(old))) => {
+ let capacity = match args.capacity {
+ Some(c) => Some(c),
+ None if args.gateway => None,
+ None => old.capacity,
+ };
+ let tags = if args.tags.is_empty() {
+ old.tags.clone()
+ } else {
+ args.tags
+ };
+ NodeRole {
+ zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
+ capacity,
+ tags,
+ }
+ }
+ _ => {
+ let capacity = match args.capacity {
+ Some(c) => Some(c),
+ None if args.gateway => None,
+ None => return Err(Error::Message(
+ "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
+ };
+ NodeRole {
+ zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
+ capacity,
+ tags: args.tags,
+ }
+ }
+ };
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("Role change is staged but not yet commited.");
+ println!("Use `garage layout show` to view staged role changes,");
+ println!("and `garage layout apply` to enact staged changes.");
+ Ok(())
+}
+
+pub async fn cmd_remove_role(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ args: RemoveRoleOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut roles = layout.roles.clone();
+ roles.merge(&layout.staging);
+
+ let deleted_node =
+ find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
+
+ layout
+ .staging
+ .merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("Role removal is staged but not yet commited.");
+ println!("Use `garage layout show` to view staged role changes,");
+ println!("and `garage layout apply` to enact staged changes.");
+ Ok(())
+}
+
+pub async fn cmd_show_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ println!("==== CURRENT CLUSTER LAYOUT ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes currently have a role in the cluster.");
+ println!("See `garage status` to view available nodes.");
+ }
+ println!();
+ println!("Current cluster layout version: {}", layout.version);
+
+ if print_staging_role_changes(&layout) {
+ layout.roles.merge(&layout.staging);
+
+ println!();
+ println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes have a role in the new layout.");
+ }
+ println!();
+
+ // this will print the stats of what partitions
+ // will move around when we apply
+ if layout.calculate_partition_assignation() {
+ println!("To enact the staged role changes, type:");
+ println!();
+ println!(" garage layout apply --version {}", layout.version + 1);
+ println!();
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ layout.version + 1
+ );
+ } else {
+ println!("Not enough nodes have an assigned role to maintain enough copies of data.");
+ println!("This new layout cannot yet be applied.");
+ }
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_apply_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ apply_opt: ApplyLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ match apply_opt.version {
+ None => {
+ println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
+ println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
+ return Err(Error::Message("--version flag is missing".into()));
+ }
+ Some(v) => {
+ if v != layout.version + 1 {
+ return Err(Error::Message("Invalid value of --version flag".into()));
+ }
+ }
+ }
+
+ layout.roles.merge(&layout.staging);
+
+ if !layout.calculate_partition_assignation() {
+ return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
+ }
+
+ layout.staging.clear();
+ layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+
+ layout.version += 1;
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("New cluster layout with updated role assignation has been applied in cluster.");
+ println!("Data will now be moved around between nodes accordingly.");
+
+ Ok(())
+}
+
+pub async fn cmd_revert_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ revert_opt: RevertLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ match revert_opt.version {
+ None => {
+ println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
+ println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
+ return Err(Error::Message("--version flag is missing".into()));
+ }
+ Some(v) => {
+ if v != layout.version + 1 {
+ return Err(Error::Message("Invalid value of --version flag".into()));
+ }
+ }
+ }
+
+ layout.staging.clear();
+ layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+
+ layout.version += 1;
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+
+ println!("All proposed role changes in cluster layout have been canceled.");
+ Ok(())
+}
+
+// --- utility ---
+
+pub async fn fetch_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<ClusterLayout, Error> {
+ match rpc_cli
+ .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
+ resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ }
+}
+
+pub async fn send_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ layout: ClusterLayout,
+) -> Result<(), Error> {
+ rpc_cli
+ .call(
+ &rpc_host,
+ &SystemRpc::AdvertiseClusterLayout(layout),
+ PRIO_NORMAL,
+ )
+ .await??;
+ Ok(())
+}
+
+pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
+ let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ for (id, _, role) in layout.roles.items().iter() {
+ let role = match &role.0 {
+ Some(r) => r,
+ _ => continue,
+ };
+ let tags = role.tags.join(",");
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ }
+ if table.len() == 1 {
+ false
+ } else {
+ format_table(table);
+ true
+ }
+}
+
+pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
+ if !layout.staging.items().is_empty() {
+ println!();
+ println!("==== STAGED ROLE CHANGES ====");
+ let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ for (id, _, role) in layout.staging.items().iter() {
+ if let Some(role) = &role.0 {
+ let tags = role.tags.join(",");
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ } else {
+ table.push(format!("{:?}\tREMOVED", id));
+ }
+ }
+ format_table(table);
+ true
+ } else {
+ false
+ }
+}
diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs
index 1567f377..17a2d8ce 100644
--- a/src/garage/cli/mod.rs
+++ b/src/garage/cli/mod.rs
@@ -1,9 +1,11 @@
pub(crate) mod cmd;
pub(crate) mod init;
+pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) use cmd::*;
pub(crate) use init::*;
+pub(crate) use layout::*;
pub(crate) use structs::*;
pub(crate) use util::*;
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index b930d8a8..b2b5375d 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -8,23 +8,23 @@ pub enum Command {
#[structopt(name = "server")]
Server,
- /// Print identifier (public key) of this Garage node
- #[structopt(name = "node-id")]
- NodeId(NodeIdOpt),
-
/// Get network status
#[structopt(name = "status")]
Status,
- /// Garage node operations
+ /// Operations on individual Garage nodes
#[structopt(name = "node")]
Node(NodeOperation),
- /// Bucket operations
+ /// Operations on the assignation of node roles in the cluster layout
+ #[structopt(name = "layout")]
+ Layout(LayoutOperation),
+
+ /// Operations on buckets
#[structopt(name = "bucket")]
Bucket(BucketOperation),
- /// Key operations
+ /// Operations on S3 access keys
#[structopt(name = "key")]
Key(KeyOperation),
@@ -39,17 +39,13 @@ pub enum Command {
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
+ /// Print identifier (public key) of this Garage node
+ #[structopt(name = "id")]
+ NodeId(NodeIdOpt),
+
/// Connect to Garage node that is currently isolated from the system
#[structopt(name = "connect")]
Connect(ConnectNodeOpt),
-
- /// Configure Garage node
- #[structopt(name = "configure")]
- Configure(ConfigureNodeOpt),
-
- /// Remove Garage node from cluster
- #[structopt(name = "remove")]
- Remove(RemoveNodeOpt),
}
#[derive(StructOpt, Debug)]
@@ -67,8 +63,31 @@ pub struct ConnectNodeOpt {
}
#[derive(StructOpt, Debug)]
-pub struct ConfigureNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
+pub enum LayoutOperation {
+ /// Assign role to Garage node
+ #[structopt(name = "assign")]
+ Assign(AssignRoleOpt),
+
+ /// Remove role from Garage cluster node
+ #[structopt(name = "remove")]
+ Remove(RemoveRoleOpt),
+
+ /// Show roles currently assigned to nodes and changes staged for commit
+ #[structopt(name = "show")]
+ Show,
+
+ /// Apply staged changes to cluster layout
+ #[structopt(name = "apply")]
+ Apply(ApplyLayoutOpt),
+
+ /// Revert staged changes to cluster layout
+ #[structopt(name = "revert")]
+ Revert(RevertLayoutOpt),
+}
+
+#[derive(StructOpt, Debug)]
+pub struct AssignRoleOpt {
+ /// Node to which to assign role (prefix of hexadecimal node id)
pub(crate) node_id: String,
/// Location (zone or datacenter) of the node
@@ -83,9 +102,9 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "g", long = "gateway")]
pub(crate) gateway: bool,
- /// Optional node tag
+ /// Optional tags to add to node
#[structopt(short = "t", long = "tag")]
- pub(crate) tag: Option<String>,
+ pub(crate) tags: Vec<String>,
/// Replaced node(s): list of node IDs that will be removed from the current cluster
#[structopt(long = "replace")]
@@ -93,13 +112,24 @@ pub struct ConfigureNodeOpt {
}
#[derive(StructOpt, Debug)]
-pub struct RemoveNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
+pub struct RemoveRoleOpt {
+ /// Node whose role to remove (prefix of hexadecimal node id)
pub(crate) node_id: String,
+}
- /// If this flag is not given, the node won't be removed
- #[structopt(long = "yes")]
- pub(crate) yes: bool,
+#[derive(StructOpt, Debug)]
+pub struct ApplyLayoutOpt {
+ /// Version number of new configuration: this command will fail if
+ /// it is not exactly 1 + the previous configuration's version
+ #[structopt(long = "version")]
+ pub(crate) version: Option<u64>,
+}
+
+#[derive(StructOpt, Debug)]
+pub struct RevertLayoutOpt {
+ /// Version number of old configuration to which to revert
+ #[structopt(long = "version")]
+ pub(crate) version: Option<u64>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 70c959f8..69cd16e7 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -70,7 +70,9 @@ async fn main() {
server::run_server(opt.config_file).await
}
- Command::NodeId(node_id_opt) => node_id_command(opt.config_file, node_id_opt.quiet),
+ Command::Node(NodeOperation::NodeId(node_id_opt)) => {
+ node_id_command(opt.config_file, node_id_opt.quiet)
+ }
_ => cli_command(opt).await,
};