aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli/layout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/cli/layout.rs')
-rw-r--r--src/garage/cli/layout.rs230
1 files changed, 195 insertions, 35 deletions
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index ce2b11e0..f76e33c5 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -32,6 +32,10 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
}
+ LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
+ LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
+ cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
+ }
}
}
@@ -49,6 +53,7 @@ pub async fn cmd_assign_role(
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let all_nodes = layout.get_all_nodes();
let added_nodes = args
.node_ids
@@ -58,21 +63,23 @@ pub async fn cmd_assign_role(
status
.iter()
.map(|adv| adv.id)
- .chain(layout.node_ids().iter().cloned()),
+ .chain(all_nodes.iter().cloned()),
node_id,
)
})
.collect::<Result<Vec<_>, _>>()?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
+ let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -130,7 +137,9 @@ pub async fn cmd_assign_role(
};
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -149,14 +158,16 @@ pub async fn cmd_remove_role(
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -174,13 +185,16 @@ pub async fn cmd_show_layout(
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
- print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
+ print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
- println!("Current cluster layout version: {}", layout.version);
+ println!(
+ "Current cluster layout version: {}",
+ layout.current().version
+ );
let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes {
- let v = layout.version;
+ let v = layout.current().version;
let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
@@ -189,7 +203,7 @@ pub async fn cmd_show_layout(
Ok((layout, msg)) => {
println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- print_cluster_layout(&layout, "No nodes have a role in the new layout.");
+ print_cluster_layout(layout.current(), "No nodes have a role in the new layout.");
println!();
for line in msg.iter() {
@@ -199,16 +213,12 @@ pub async fn cmd_show_layout(
println!();
println!(" garage layout apply --version {}", v + 1);
println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
Err(e) => {
println!("Error while trying to compute the assignment: {}", e);
println!("This new layout cannot yet be applied.");
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
@@ -241,9 +251,15 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
+ if !revert_opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to run the layout revert operation".into(),
+ ));
+ }
+
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.revert_staged_changes(revert_opt.version)?;
+ let layout = layout.revert_staged_changes()?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -266,11 +282,11 @@ pub async fn cmd_config_layout(
.parse::<ZoneRedundancy>()
.ok_or_message("invalid zone redundancy value")?;
if let ZoneRedundancy::AtLeast(r_int) = r {
- if r_int > layout.replication_factor {
+ if r_int > layout.current().replication_factor {
return Err(Error::Message(format!(
"The zone redundancy must be smaller or equal to the \
replication factor ({}).",
- layout.replication_factor
+ layout.current().replication_factor
)));
} else if r_int < 1 {
return Err(Error::Message(
@@ -280,7 +296,9 @@ pub async fn cmd_config_layout(
}
layout
- .staging_parameters
+ .staging
+ .get_mut()
+ .parameters
.update(LayoutParameters { zone_redundancy: r });
println!("The zone redundancy parameter has been set to '{}'.", r);
did_something = true;
@@ -297,25 +315,166 @@ pub async fn cmd_config_layout(
Ok(())
}
+pub async fn cmd_layout_history(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let min_stored = layout.min_stored();
+
+ println!("==== LAYOUT HISTORY ====");
+ let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
+ for ver in layout
+ .versions
+ .iter()
+ .rev()
+ .chain(layout.old_versions.iter().rev())
+ {
+ let status = if ver.version == layout.current().version {
+ "current"
+ } else if ver.version >= min_stored {
+ "draining"
+ } else {
+ "historical"
+ };
+ table.push(format!(
+ "#{}\t{}\t{}\t{}",
+ ver.version,
+ status,
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
+ .count(),
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
+ .count(),
+ ));
+ }
+ format_table(table);
+ println!();
+
+ if layout.versions.len() > 1 {
+ println!("==== UPDATE TRACKERS ====");
+ println!("Several layout versions are currently live in the version, and data is being migrated.");
+ println!(
+ "This is the internal data that Garage stores to know which nodes have what data."
+ );
+ println!();
+ let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
+ let all_nodes = layout.get_all_nodes();
+ for node in all_nodes.iter() {
+ table.push(format!(
+ "{:?}\t#{}\t#{}\t#{}",
+ node,
+ layout.update_trackers.ack_map.get(node, min_stored),
+ layout.update_trackers.sync_map.get(node, min_stored),
+ layout.update_trackers.sync_ack_map.get(node, min_stored),
+ ));
+ }
+ table[1..].sort();
+ format_table(table);
+
+ println!();
+ println!(
+ "If some nodes are not catching up to the latest layout version in the update trackers,"
+ );
+ println!("it might be because they are offline or unable to complete a sync successfully.");
+ println!(
+ "You may force progress using `garage layout skip-dead-nodes --version {}`",
+ layout.current().version
+ );
+ } else {
+ println!("Your cluster is currently in a stable state with a single live layout version.");
+ println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
+ println!(
+ "so you might want to keep old nodes online until their data directories become empty."
+ );
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_layout_skip_dead_nodes(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ opt: SkipDeadNodesOpt,
+) -> Result<(), Error> {
+ let status = fetch_status(rpc_cli, rpc_host).await?;
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ if layout.versions.len() == 1 {
+ return Err(Error::Message(
+ "This command cannot be called when there is only one live cluster layout version"
+ .into(),
+ ));
+ }
+
+ let min_v = layout.min_stored();
+ if opt.version <= min_v || opt.version > layout.current().version {
+ return Err(Error::Message(format!(
+ "Invalid version, you may use the following version numbers: {}",
+ (min_v + 1..=layout.current().version)
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ .join(" ")
+ )));
+ }
+
+ let all_nodes = layout.get_all_nodes();
+ let mut did_something = false;
+ for node in all_nodes.iter() {
+ if status.iter().any(|x| x.id == *node && x.is_up) {
+ continue;
+ }
+
+ if layout.update_trackers.ack_map.set_max(*node, opt.version) {
+ println!("Increased the ACK tracker for node {:?}", node);
+ did_something = true;
+ }
+
+ if opt.allow_missing_data {
+ if layout.update_trackers.sync_map.set_max(*node, opt.version) {
+ println!("Increased the SYNC tracker for node {:?}", node);
+ did_something = true;
+ }
+ }
+ }
+
+ if did_something {
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ println!("Success.");
+ Ok(())
+ } else if !opt.allow_missing_data {
+ Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
+ } else {
+ Err(Error::Message(
+ "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
+ ))
+ }
+}
+
// --- utility ---
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
-) -> Result<ClusterLayout, Error> {
+) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
- resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
- layout: ClusterLayout,
+ layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
@@ -327,7 +486,7 @@ pub async fn send_layout(
Ok(())
}
-pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
+pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
@@ -366,21 +525,22 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
}
}
-pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- let has_role_changes = layout
- .staging_roles
+pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
+ let staging = layout.staging.get();
+ let has_role_changes = staging
+ .roles
.items()
.iter()
- .any(|(k, _, v)| layout.roles.get(k) != Some(v));
- let has_layout_changes = *layout.staging_parameters.get() != layout.parameters;
+ .any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
+ let has_layout_changes = *staging.parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging_roles.items().iter() {
- if layout.roles.get(id) == Some(role) {
+ for (id, _, role) in staging.roles.items().iter() {
+ if layout.current().roles.get(id) == Some(role) {
continue;
}
if let Some(role) = &role.0 {
@@ -402,7 +562,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
if has_layout_changes {
println!(
"Zone redundancy: {}",
- layout.staging_parameters.get().zone_redundancy
+ staging.parameters.get().zone_redundancy
);
}
true