diff options
author | Alex <alex@adnab.me> | 2024-01-11 10:58:08 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-01-11 10:58:08 +0000 |
commit | 8a6ec1d6111a60e602c90ade2200b2dab5733fe3 (patch) | |
tree | b8daac4f41050339c87106d72ce7224f7eef38aa /src/garage | |
parent | 723e56b37f13f078a15e067343191fb1bf96e8b2 (diff) | |
parent | 0041b013a473e3ae72f50209d8f79db75a72848b (diff) | |
download | garage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.tar.gz garage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.zip |
Merge pull request 'NLnet task 3' (#667) from nlnet-task3 into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/667
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin/bucket.rs | 4 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 30 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 189 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 230 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 24 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 4 | ||||
-rw-r--r-- | src/garage/tests/common/ext/process.rs | 44 | ||||
-rw-r--r-- | src/garage/tests/common/garage.rs | 2 |
8 files changed, 369 insertions, 158 deletions
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 0781cb8b..9e642f57 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -70,7 +70,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mpu_counters = self @@ -79,7 +79,7 @@ impl AdminRpcHandler { .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index b6f9c426..de7851e1 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -18,7 +18,7 @@ use garage_util::error::Error as GarageError; use garage_table::replication::*; use garage_table::*; -use garage_rpc::ring::PARTITION_BITS; +use garage_rpc::layout::PARTITION_BITS; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; @@ -126,8 +126,8 @@ impl AdminRpcHandler { opt_to_send.all_nodes = false; let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); let resp = self .endpoint @@ -163,9 +163,9 @@ impl AdminRpcHandler { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { if opt.all_nodes { let mut ret = String::new(); - let ring = self.garage.system.ring.borrow().clone(); + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in ring.layout.node_ids().iter() { + for node in all_nodes.iter() { let mut opt = opt.clone(); opt.all_nodes = false; opt.skip_global = true; @@ -274,11 +274,11 @@ impl AdminRpcHandler { fn gather_cluster_stats(&self) -> String { let mut ret = String::new(); - // Gather storage node and free space statistics - let layout = &self.garage.system.ring.borrow().layout; + // Gather storage node and free space statistics for current nodes + let layout = &self.garage.system.cluster_layout(); let mut node_partition_count = HashMap::<Uuid, u64>::new(); - for short_id in layout.ring_assignment_data.iter() { - let id = layout.node_id_vec[*short_id as usize]; + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; *node_partition_count.entry(id).or_default() += 1; } let node_info = self @@ -293,8 +293,8 @@ impl AdminRpcHandler { for (id, parts) in node_partition_count.iter() { let info = node_info.get(id); let status = info.map(|x| &x.status); - let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); let capacity = role .map(|x| x.capacity_string()) @@ -440,8 +440,8 @@ impl AdminRpcHandler { ) -> Result<AdminRpc, Error> { if all_nodes { let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); match self .endpoint @@ -488,8 +488,8 @@ impl AdminRpcHandler { ) -> Result<AdminRpc, Error> { if all_nodes { let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); match self .endpoint diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 48359614..fb6dface 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use format_table::format_table; @@ -49,50 +49,61 @@ pub async fn cli_command_dispatch( } pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; + let status = fetch_status(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== HEALTHY NODES ===="); let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { - match layout.roles.get(&adv.id) { - Some(NodeRoleV(Some(cfg))) => { - let data_avail = match &adv.status.data_disk_avail { - _ if cfg.capacity.is_none() => "N/A".into(), - Some((avail, total)) => { - let pct = (*avail as f64) / (*total as f64) * 100.; - let avail = bytesize::ByteSize::b(*avail); - format!("{} ({:.1}%)", avail, pct) - } - None => "?".into(), - }; + let host = adv.status.hostname.as_deref().unwrap_or("?"); + if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) { + let data_avail = match &adv.status.data_disk_avail { + _ if cfg.capacity.is_none() => "N/A".into(), + Some((avail, total)) => { + let pct = (*avail as f64) / (*total as f64) * 100.; + let avail = bytesize::ByteSize::b(*avail); + format!("{} ({:.1}%)", avail, pct) + } + None => "?".into(), + }; + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", + id = adv.id, + host = host, + addr = adv.addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = cfg.capacity_string(), + data_avail = data_avail, + )); + } else { + let prev_role = layout + .versions + .iter() + .rev() + .find_map(|x| match x.roles.get(&adv.id) { + Some(NodeRoleV(Some(cfg))) => Some(cfg), + _ => None, + }); + if let Some(cfg) = prev_role { healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...", id = adv.id, - host = adv.status.hostname, + host = host, addr = adv.addr, tags = cfg.tags.join(","), zone = cfg.zone, - capacity = cfg.capacity_string(), - data_avail = data_avail, )); - } - _ => { - let new_role = match layout.staging_roles.get(&adv.id) { - Some(NodeRoleV(Some(_))) => "(pending)", + } else { + let new_role = match layout.staging.get().roles.get(&adv.id) { + Some(NodeRoleV(Some(_))) => "pending...", _ => "NO ROLE ASSIGNED", }; healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\t{new_role}", + "{id:?}\t{h}\t{addr}\t\t\t{new_role}", id = adv.id, - h = adv.status.hostname, + h = host, addr = adv.addr, new_role = new_role, )); @@ -101,51 +112,76 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> } format_table(healthy_nodes); - let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status - .iter() - .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); - let failure_case_2 = layout - .roles - .items() + // Determine which nodes are unhealthy and print that to stdout + let status_map = status .iter() - .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some()); - if failure_case_1 || failure_case_2 { - println!("\n==== FAILED NODES ===="); - let mut failed_nodes = - vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; - for adv in status.iter().filter(|adv| !adv.is_up) { - if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { - let tf = timeago::Formatter::new(); - failed_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = cfg.capacity_string(), - last_seen = adv - .last_seen_secs_ago - .map(|s| tf.convert(Duration::from_secs(s))) - .unwrap_or_else(|| "never seen".into()), - )); + .map(|adv| (adv.id, adv)) + .collect::<HashMap<_, _>>(); + + let tf = timeago::Formatter::new(); + let mut drain_msg = false; + let mut failed_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; + let mut listed = HashSet::new(); + for ver in layout.versions.iter().rev() { + for (node, _, role) in ver.roles.items().iter() { + let cfg = match role { + NodeRoleV(Some(role)) if role.capacity.is_some() => role, + _ => continue, + }; + + if listed.contains(node) { + continue; } - } - for (id, _, role_v) in layout.roles.items().iter() { - if let NodeRoleV(Some(cfg)) = role_v { - if !status_keys.contains(id) { - failed_nodes.push(format!( - "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen", - id = id, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } + listed.insert(*node); + + let adv = status_map.get(node); + if adv.map(|x| x.is_up).unwrap_or(false) { + continue; } + + // Node is in a layout version, is not a gateway node, and is not up: + // it is in a failed state, add proper line to the output + let (host, addr, last_seen) = match adv { + Some(adv) => ( + adv.status.hostname.as_deref().unwrap_or("?"), + adv.addr.to_string(), + adv.last_seen_secs_ago + .map(|s| tf.convert(Duration::from_secs(s))) + .unwrap_or_else(|| "never seen".into()), + ), + None => ("??", "??".into(), "never seen".into()), + }; + let capacity = if ver.version == layout.current().version { + cfg.capacity_string() + } else { + drain_msg = true; + "draining metadata...".to_string() + }; + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", + id = node, + host = host, + addr = addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = capacity, + last_seen = last_seen, + )); } + } + + if failed_nodes.len() > 1 { + println!("\n==== FAILED NODES ===="); format_table(failed_nodes); + if drain_msg { + println!(); + println!("Your cluster is expecting to drain data from nodes that are currently unavailable."); + println!("If these nodes are definitely dead, please review the layout history with"); + println!( + "`garage layout history` and use `garage layout skip-dead-nodes` to force progress." + ); + } } if print_staging_role_changes(&layout) { @@ -226,3 +262,18 @@ pub async fn cmd_admin( } Ok(()) } + +// ---- utility ---- + +pub async fn fetch_status( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<Vec<KnownNodeInfo>, Error> { + match rpc_cli + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), + resp => Err(Error::unexpected_rpc_message(resp)), + } +} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index ce2b11e0..f76e33c5 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -32,6 +32,10 @@ pub async fn cli_layout_command_dispatch( LayoutOperation::Config(config_opt) => { cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await } + LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await, + LayoutOperation::SkipDeadNodes(assume_sync_opt) => { + cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await + } } } @@ -49,6 +53,7 @@ pub async fn cmd_assign_role( }; let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let all_nodes = layout.get_all_nodes(); let added_nodes = args .node_ids @@ -58,21 +63,23 @@ pub async fn cmd_assign_role( status .iter() .map(|adv| adv.id) - .chain(layout.node_ids().iter().cloned()), + .chain(all_nodes.iter().cloned()), node_id, ) }) .collect::<Result<Vec<_>, _>>()?; - let mut roles = layout.roles.clone(); - roles.merge(&layout.staging_roles); + let mut roles = layout.current().roles.clone(); + roles.merge(&layout.staging.get().roles); for replaced in args.replace.iter() { - let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; + let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?; match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); } _ => { @@ -130,7 +137,9 @@ pub async fn cmd_assign_role( }; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); } @@ -149,14 +158,16 @@ pub async fn cmd_remove_role( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let mut roles = layout.roles.clone(); - roles.merge(&layout.staging_roles); + let mut roles = layout.current().roles.clone(); + roles.merge(&layout.staging.get().roles); let deleted_node = find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); send_layout(rpc_cli, rpc_host, layout).await?; @@ -174,13 +185,16 @@ pub async fn cmd_show_layout( let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== CURRENT CLUSTER LAYOUT ===="); - print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); + print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); println!(); - println!("Current cluster layout version: {}", layout.version); + println!( + "Current cluster layout version: {}", + layout.current().version + ); let has_role_changes = print_staging_role_changes(&layout); if has_role_changes { - let v = layout.version; + let v = layout.current().version; let res_apply = layout.apply_staged_changes(Some(v + 1)); // this will print the stats of what partitions @@ -189,7 +203,7 @@ pub async fn cmd_show_layout( Ok((layout, msg)) => { println!(); println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); - print_cluster_layout(&layout, "No nodes have a role in the new layout."); + print_cluster_layout(layout.current(), "No nodes have a role in the new layout."); println!(); for line in msg.iter() { @@ -199,16 +213,12 @@ pub async fn cmd_show_layout( println!(); println!(" garage layout apply --version {}", v + 1); println!(); - println!( - "You can also revert all proposed changes with: garage layout revert --version {}", - v + 1) + println!("You can also revert all proposed changes with: garage layout revert"); } Err(e) => { println!("Error while trying to compute the assignment: {}", e); println!("This new layout cannot yet be applied."); - println!( - "You can also revert all proposed changes with: garage layout revert --version {}", - v + 1) + println!("You can also revert all proposed changes with: garage layout revert"); } } } @@ -241,9 +251,15 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { + if !revert_opt.yes { + return Err(Error::Message( + "Please add the --yes flag to run the layout revert operation".into(), + )); + } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - let layout = layout.revert_staged_changes(revert_opt.version)?; + let layout = layout.revert_staged_changes()?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -266,11 +282,11 @@ pub async fn cmd_config_layout( .parse::<ZoneRedundancy>() .ok_or_message("invalid zone redundancy value")?; if let ZoneRedundancy::AtLeast(r_int) = r { - if r_int > layout.replication_factor { + if r_int > layout.current().replication_factor { return Err(Error::Message(format!( "The zone redundancy must be smaller or equal to the \ replication factor ({}).", - layout.replication_factor + layout.current().replication_factor ))); } else if r_int < 1 { return Err(Error::Message( @@ -280,7 +296,9 @@ pub async fn cmd_config_layout( } layout - .staging_parameters + .staging + .get_mut() + .parameters .update(LayoutParameters { zone_redundancy: r }); println!("The zone redundancy parameter has been set to '{}'.", r); did_something = true; @@ -297,25 +315,166 @@ pub async fn cmd_config_layout( Ok(()) } +pub async fn cmd_layout_history( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<(), Error> { + let layout = fetch_layout(rpc_cli, rpc_host).await?; + let min_stored = layout.min_stored(); + + println!("==== LAYOUT HISTORY ===="); + let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()]; + for ver in layout + .versions + .iter() + .rev() + .chain(layout.old_versions.iter().rev()) + { + let status = if ver.version == layout.current().version { + "current" + } else if ver.version >= min_stored { + "draining" + } else { + "historical" + }; + table.push(format!( + "#{}\t{}\t{}\t{}", + ver.version, + status, + ver.roles + .items() + .iter() + .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some())) + .count(), + ver.roles + .items() + .iter() + .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none())) + .count(), + )); + } + format_table(table); + println!(); + + if layout.versions.len() > 1 { + println!("==== UPDATE TRACKERS ===="); + println!("Several layout versions are currently live in the version, and data is being migrated."); + println!( + "This is the internal data that Garage stores to know which nodes have what data." + ); + println!(); + let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()]; + let all_nodes = layout.get_all_nodes(); + for node in all_nodes.iter() { + table.push(format!( + "{:?}\t#{}\t#{}\t#{}", + node, + layout.update_trackers.ack_map.get(node, min_stored), + layout.update_trackers.sync_map.get(node, min_stored), + layout.update_trackers.sync_ack_map.get(node, min_stored), + )); + } + table[1..].sort(); + format_table(table); + + println!(); + println!( + "If some nodes are not catching up to the latest layout version in the update trackers," + ); + println!("it might be because they are offline or unable to complete a sync successfully."); + println!( + "You may force progress using `garage layout skip-dead-nodes --version {}`", + layout.current().version + ); + } else { + println!("Your cluster is currently in a stable state with a single live layout version."); + println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,"); + println!( + "so you might want to keep old nodes online until their data directories become empty." + ); + } + + Ok(()) +} + +pub async fn cmd_layout_skip_dead_nodes( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, + opt: SkipDeadNodesOpt, +) -> Result<(), Error> { + let status = fetch_status(rpc_cli, rpc_host).await?; + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + if layout.versions.len() == 1 { + return Err(Error::Message( + "This command cannot be called when there is only one live cluster layout version" + .into(), + )); + } + + let min_v = layout.min_stored(); + if opt.version <= min_v || opt.version > layout.current().version { + return Err(Error::Message(format!( + "Invalid version, you may use the following version numbers: {}", + (min_v + 1..=layout.current().version) + .map(|x| x.to_string()) + .collect::<Vec<_>>() + .join(" ") + ))); + } + + let all_nodes = layout.get_all_nodes(); + let mut did_something = false; + for node in all_nodes.iter() { + if status.iter().any(|x| x.id == *node && x.is_up) { + continue; + } + + if layout.update_trackers.ack_map.set_max(*node, opt.version) { + println!("Increased the ACK tracker for node {:?}", node); + did_something = true; + } + + if opt.allow_missing_data { + if layout.update_trackers.sync_map.set_max(*node, opt.version) { + println!("Increased the SYNC tracker for node {:?}", node); + did_something = true; + } + } + } + + if did_something { + send_layout(rpc_cli, rpc_host, layout).await?; + println!("Success."); + Ok(()) + } else if !opt.allow_missing_data { + Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into())) + } else { + Err(Error::Message( + "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(), + )) + } +} + // --- utility --- pub async fn fetch_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, -) -> Result<ClusterLayout, Error> { +) -> Result<LayoutHistory, Error> { match rpc_cli .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { SystemRpc::AdvertiseClusterLayout(t) => Ok(t), - resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + resp => Err(Error::unexpected_rpc_message(resp)), } } pub async fn send_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, - layout: ClusterLayout, + layout: LayoutHistory, ) -> Result<(), Error> { rpc_cli .call( @@ -327,7 +486,7 @@ pub async fn send_layout( Ok(()) } -pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { +pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) { let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; for (id, _, role) in layout.roles.items().iter() { let role = match &role.0 { @@ -366,21 +525,22 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { } } -pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { - let has_role_changes = layout - .staging_roles +pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool { + let staging = layout.staging.get(); + let has_role_changes = staging + .roles .items() .iter() - .any(|(k, _, v)| layout.roles.get(k) != Some(v)); - let has_layout_changes = *layout.staging_parameters.get() != layout.parameters; + .any(|(k, _, v)| layout.current().roles.get(k) != Some(v)); + let has_layout_changes = *staging.parameters.get() != layout.current().parameters; if has_role_changes || has_layout_changes { println!(); println!("==== STAGED ROLE CHANGES ===="); if has_role_changes { let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in layout.staging_roles.items().iter() { - if layout.roles.get(id) == Some(role) { + for (id, _, role) in staging.roles.items().iter() { + if layout.current().roles.get(id) == Some(role) { continue; } if let Some(role) = &role.0 { @@ -402,7 +562,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { if has_layout_changes { println!( "Zone redundancy: {}", - layout.staging_parameters.get().zone_redundancy + staging.parameters.get().zone_redundancy ); } true diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index aba57551..6bc3da22 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -112,6 +112,14 @@ pub enum LayoutOperation { /// Revert staged changes to cluster layout #[structopt(name = "revert", version = garage_version())] Revert(RevertLayoutOpt), + + /// View the history of layouts in the cluster + #[structopt(name = "history", version = garage_version())] + History, + + /// Skip dead nodes when awaiting for a new layout version to be synchronized + #[structopt(name = "skip-dead-nodes", version = garage_version())] + SkipDeadNodes(SkipDeadNodesOpt), } #[derive(StructOpt, Debug)] @@ -164,9 +172,21 @@ pub struct ApplyLayoutOpt { #[derive(StructOpt, Debug)] pub struct RevertLayoutOpt { - /// Version number of old configuration to which to revert + /// The revert operation will not be ran unless this flag is added + #[structopt(long = "yes")] + pub(crate) yes: bool, +} + +#[derive(StructOpt, Debug)] +pub struct SkipDeadNodesOpt { + /// Version number of the layout to assume is currently up-to-date. + /// This will generally be the current layout version. #[structopt(long = "version")] - pub(crate) version: Option<u64>, + pub(crate) version: u64, + /// Allow the skip even if a quorum of ndoes could not be found for + /// the data among the remaining nodes + #[structopt(long = "allow-missing-data")] + pub(crate) allow_missing_data: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2232d395..0511e2b1 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -450,6 +450,8 @@ pub fn print_block_info( if refcount != nondeleted_count { println!(); - println!("Warning: refcount does not match number of non-deleted versions"); + println!( + "Warning: refcount does not match number of non-deleted versions (see issue #644)." + ); } } diff --git a/src/garage/tests/common/ext/process.rs b/src/garage/tests/common/ext/process.rs index ba533b6c..8e20bf7c 100644 --- a/src/garage/tests/common/ext/process.rs +++ b/src/garage/tests/common/ext/process.rs @@ -14,42 +14,20 @@ impl CommandExt for process::Command { } fn expect_success_status(&mut self, msg: &str) -> process::ExitStatus { - let status = self.status().expect(msg); - status.expect_success(msg); - status + self.expect_success_output(msg).status } fn expect_success_output(&mut self, msg: &str) -> process::Output { let output = self.output().expect(msg); - output.expect_success(msg); - output - } -} - -pub trait OutputExt { - fn expect_success(&self, msg: &str); -} - -impl OutputExt for process::Output { - fn expect_success(&self, msg: &str) { - self.status.expect_success(msg) - } -} - -pub trait ExitStatusExt { - fn expect_success(&self, msg: &str); -} - -impl ExitStatusExt for process::ExitStatus { - fn expect_success(&self, msg: &str) { - if !self.success() { - match self.code() { - Some(code) => panic!( - "Command exited with code {code}: {msg}", - code = code, - msg = msg - ), - None => panic!("Command exited with signal: {msg}", msg = msg), - } + if !output.status.success() { + panic!( + "{}: command {:?} exited with error {:?}\nSTDOUT: {}\nSTDERR: {}", + msg, + self, + output.status.code(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); } + output } } diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index d1f0867a..ebc82f37 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -96,7 +96,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" .arg("server") .stdout(stdout) .stderr(stderr) - .env("RUST_LOG", "garage=info,garage_api=trace") + .env("RUST_LOG", "garage=debug,garage_api=trace") .spawn() .expect("Could not start garage"); |