aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli/cmd.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-11-09 12:24:04 +0100
committerAlex Auvolat <alex@adnab.me>2021-11-16 16:05:53 +0100
commitc94406f4282d48e2e2ac82ffb57eafaad23f7edc (patch)
tree01fe1b272e18fdae993e2207d8d3aea4a301ec56 /src/garage/cli/cmd.rs
parent53888995bdd7c672d2e3ab8bb6a3529195c127a9 (diff)
downloadgarage-0.5-beta1.tar.gz
garage-0.5-beta1.zip
Improve how node roles are assigned in Garagev0.5-beta1
- change the terminology: the network configuration becomes the role table, the configuration of a nodes becomes a node's role - the modification of the role table takes place in two steps: first, changes are staged in a CRDT data structure. Then, once the user is happy with the changes, they can commit them all at once (or revert them). - update documentation - fix tests - implement smarter partition assignation algorithm This patch breaks the format of the network configuration: when migrating, the cluster will be in a state where no roles are assigned. All roles must be re-assigned and commited at once. This migration should not pose an issue.
Diffstat (limited to 'src/garage/cli/cmd.rs')
-rw-r--r--src/garage/cli/cmd.rs214
1 files changed, 56 insertions, 158 deletions
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 2ff46088..a916974e 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -2,7 +2,7 @@ use std::collections::HashSet;
use garage_util::error::*;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
@@ -20,11 +20,8 @@ pub async fn cli_command_dispatch(
Command::Node(NodeOperation::Connect(connect_opt)) => {
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
}
- Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
- }
- Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
+ Command::Layout(layout_opt) => {
+ cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
@@ -48,56 +45,60 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
- let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
+ let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
- healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
- id = adv.id,
- host = adv.status.hostname,
- addr = adv.addr,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
- } else {
- healthy_nodes.push(format!(
- "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED",
- id = adv.id,
- h = adv.status.hostname,
- addr = adv.addr,
- ));
+ match layout.roles.get(&adv.id) {
+ Some(NodeRoleV(Some(cfg))) => {
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
+ id = adv.id,
+ host = adv.status.hostname,
+ addr = adv.addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
+ _ => {
+ let new_role = match layout.staging.get(&adv.id) {
+ Some(NodeRoleV(Some(_))) => "(pending)",
+ _ => "NO ROLE ASSIGNED",
+ };
+ healthy_nodes.push(format!(
+ "{id:?}\t{h}\t{addr}\t{new_role}",
+ id = adv.id,
+ h = adv.status.hostname,
+ addr = adv.addr,
+ new_role = new_role,
+ ));
+ }
}
}
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status.iter().any(|adv| !adv.is_up);
- let failure_case_2 = config
- .members
+ let failure_case_2 = layout
+ .roles
+ .items()
.iter()
- .any(|(id, _)| !status_keys.contains(id));
+ .filter(|(_, _, v)| v.0.is_some())
+ .any(|(id, _, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
- tag = cfg.tag,
+ tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
last_seen = adv
@@ -107,20 +108,28 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
}
- for (id, cfg) in config.members.iter() {
- if !status_keys.contains(id) {
- failed_nodes.push(format!(
- "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
- id = id,
- tag = cfg.tag,
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
+ for (id, _, role_v) in layout.roles.items().iter() {
+ if let NodeRoleV(Some(cfg)) = role_v {
+ if !status_keys.contains(id) {
+ failed_nodes.push(format!(
+ "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen",
+ id = id,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ ));
+ }
}
}
format_table(failed_nodes);
}
+ if print_staging_role_changes(&layout) {
+ println!();
+ println!("Please use `garage layout show` to check the proposed new layout and apply it.");
+ println!();
+ }
+
Ok(())
}
@@ -141,115 +150,6 @@ pub async fn cmd_connect(
}
}
-pub async fn cmd_configure(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: ConfigureNodeOpt,
-) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
-
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
- if config.members.remove(&replaced_node).is_none() {
- return Err(Error::Message(format!(
- "Cannot replace node {:?} as it is not in current configuration",
- replaced_node
- )));
- }
- }
-
- if args.capacity.is_some() && args.gateway {
- return Err(Error::Message(
- "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
- }
- if args.capacity == Some(0) {
- return Err(Error::Message("Invalid capacity value: 0".into()));
- }
-
- let new_entry = match config.members.get(&added_node) {
- None => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => return Err(Error::Message(
- "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
- };
- NetworkConfigEntry {
- zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
- capacity,
- tag: args.tag.unwrap_or_default(),
- }
- }
- Some(old) => {
- let capacity = match args.capacity {
- Some(c) => Some(c),
- None if args.gateway => None,
- _ => old.capacity,
- };
- NetworkConfigEntry {
- zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
- capacity,
- tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
- }
- }
- };
-
- config.members.insert(added_node, new_entry);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
-pub async fn cmd_remove(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
- args: RemoveNodeOpt,
-) -> Result<(), Error> {
- let mut config = match rpc_cli
- .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
- .await??
- {
- SystemRpc::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
-
- if !args.yes {
- return Err(Error::Message(format!(
- "Add the flag --yes to really remove {:?} from the cluster",
- deleted_node
- )));
- }
-
- config.members.remove(&deleted_node);
- config.version += 1;
-
- rpc_cli
- .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
- .await??;
- Ok(())
-}
-
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
@@ -283,5 +183,3 @@ pub async fn cmd_admin(
}
Ok(())
}
-
-// --- Utility functions ----