diff options
author | Alex Auvolat <alex@adnab.me> | 2021-11-09 12:24:04 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-11-16 16:05:53 +0100 |
commit | c94406f4282d48e2e2ac82ffb57eafaad23f7edc (patch) | |
tree | 01fe1b272e18fdae993e2207d8d3aea4a301ec56 /src/garage/cli/layout.rs | |
parent | 53888995bdd7c672d2e3ab8bb6a3529195c127a9 (diff) | |
download | garage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.tar.gz garage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.zip |
Improve how node roles are assigned in Garagev0.5-beta1
- change the terminology: the network configuration becomes the role
table, the configuration of a nodes becomes a node's role
- the modification of the role table takes place in two steps: first,
changes are staged in a CRDT data structure. Then, once the user is
happy with the changes, they can commit them all at once (or revert
them).
- update documentation
- fix tests
- implement smarter partition assignation algorithm
This patch breaks the format of the network configuration: when
migrating, the cluster will be in a state where no roles are assigned.
All roles must be re-assigned and commited at once. This migration
should not pose an issue.
Diffstat (limited to 'src/garage/cli/layout.rs')
-rw-r--r-- | src/garage/cli/layout.rs | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs new file mode 100644 index 00000000..0d9e4fa4 --- /dev/null +++ b/src/garage/cli/layout.rs @@ -0,0 +1,340 @@ +use garage_util::crdt::Crdt; +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::layout::*; +use garage_rpc::system::*; +use garage_rpc::*; + +use crate::cli::*; + +pub async fn cli_layout_command_dispatch( + cmd: LayoutOperation, + system_rpc_endpoint: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<(), Error> { + match cmd { + LayoutOperation::Assign(configure_opt) => { + cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await + } + LayoutOperation::Remove(remove_opt) => { + cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await + } + LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await, + LayoutOperation::Apply(apply_opt) => { + cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await + } + LayoutOperation::Revert(revert_opt) => { + cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await + } + } +} + +pub async fn cmd_assign_role( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: AssignRoleOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for replaced in args.replace.iter() { + let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; + match roles.get(&replaced_node) { + Some(NodeRoleV(Some(_))) => { + layout + .staging + .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); + } + _ => { + return Err(Error::Message(format!( + "Cannot replace node {:?} as it is not currently in planned layout", + replaced_node + ))); + } + } + } + + if args.capacity.is_some() && args.gateway { + return Err(Error::Message( + "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); + } + if args.capacity == Some(0) { + return Err(Error::Message("Invalid capacity value: 0".into())); + } + + let new_entry = match roles.get(&added_node) { + Some(NodeRoleV(Some(old))) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => old.capacity, + }; + let tags = if args.tags.is_empty() { + old.tags.clone() + } else { + args.tags + }; + NodeRole { + zone: args.zone.unwrap_or_else(|| old.zone.to_string()), + capacity, + tags, + } + } + _ => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NodeRole { + zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tags: args.tags, + } + } + }; + + layout + .staging + .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("Role change is staged but not yet commited."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) +} + +pub async fn cmd_remove_role( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + args: RemoveRoleOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + let deleted_node = + find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; + + layout + .staging + .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("Role removal is staged but not yet commited."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) +} + +pub async fn cmd_show_layout( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + println!("==== CURRENT CLUSTER LAYOUT ===="); + if !print_cluster_layout(&layout) { + println!("No nodes currently have a role in the cluster."); + println!("See `garage status` to view available nodes."); + } + println!(); + println!("Current cluster layout version: {}", layout.version); + + if print_staging_role_changes(&layout) { + layout.roles.merge(&layout.staging); + + println!(); + println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); + if !print_cluster_layout(&layout) { + println!("No nodes have a role in the new layout."); + } + println!(); + + // this will print the stats of what partitions + // will move around when we apply + if layout.calculate_partition_assignation() { + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", layout.version + 1); + println!(); + println!( + "You can also revert all proposed changes with: garage layout revert --version {}", + layout.version + 1 + ); + } else { + println!("Not enough nodes have an assigned role to maintain enough copies of data."); + println!("This new layout cannot yet be applied."); + } + } + + Ok(()) +} + +pub async fn cmd_apply_layout( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + apply_opt: ApplyLayoutOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + match apply_opt.version { + None => { + println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); + println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); + return Err(Error::Message("--version flag is missing".into())); + } + Some(v) => { + if v != layout.version + 1 { + return Err(Error::Message("Invalid value of --version flag".into())); + } + } + } + + layout.roles.merge(&layout.staging); + + if !layout.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + + layout.version += 1; + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("New cluster layout with updated role assignation has been applied in cluster."); + println!("Data will now be moved around between nodes accordingly."); + + Ok(()) +} + +pub async fn cmd_revert_layout( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + revert_opt: RevertLayoutOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + match revert_opt.version { + None => { + println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); + println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); + return Err(Error::Message("--version flag is missing".into())); + } + Some(v) => { + if v != layout.version + 1 { + return Err(Error::Message("Invalid value of --version flag".into())); + } + } + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + + layout.version += 1; + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("All proposed role changes in cluster layout have been canceled."); + Ok(()) +} + +// --- utility --- + +pub async fn fetch_layout( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<ClusterLayout, Error> { + match rpc_cli + .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseClusterLayout(t) => Ok(t), + resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + } +} + +pub async fn send_layout( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + layout: ClusterLayout, +) -> Result<(), Error> { + rpc_cli + .call( + &rpc_host, + &SystemRpc::AdvertiseClusterLayout(layout), + PRIO_NORMAL, + ) + .await??; + Ok(()) +} + +pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for (id, _, role) in layout.roles.items().iter() { + let role = match &role.0 { + Some(r) => r, + _ => continue, + }; + let tags = role.tags.join(","); + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string() + )); + } + if table.len() == 1 { + false + } else { + format_table(table); + true + } +} + +pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { + if !layout.staging.items().is_empty() { + println!(); + println!("==== STAGED ROLE CHANGES ===="); + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for (id, _, role) in layout.staging.items().iter() { + if let Some(role) = &role.0 { + let tags = role.tags.join(","); + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string() + )); + } else { + table.push(format!("{:?}\tREMOVED", id)); + } + } + format_table(table); + true + } else { + false + } +} |