From ceac3713d6639f9170fc3b4475fae4a30b34483c Mon Sep 17 00:00:00 2001 From: Mendes Date: Wed, 5 Oct 2022 15:29:48 +0200 Subject: modifications in several files to : - have consistent error return types - store the zone redundancy in a Lww - print the error and message in the CLI (TODO: for the server Api, should msg be returned in the body response?) --- src/garage/cli/layout.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 3884bb92..a5b838e7 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -188,19 +188,23 @@ pub async fn cmd_show_layout( // this will print the stats of what partitions // will move around when we apply - if layout.calculate_partition_assignation() { - println!("To enact the staged role changes, type:"); - println!(); - println!(" garage layout apply --version {}", layout.version + 1); - println!(); - println!( - "You can also revert all proposed changes with: garage layout revert --version {}", - layout.version + 1 - ); - } else { - println!("Not enough nodes have an assigned role to maintain enough copies of data."); - println!("This new layout cannot yet be applied."); - } + match layout.calculate_partition_assignation() { + Ok(msg) => { + for line in msg.iter() { + println!("{}", line); + } + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", layout.version + 1); + println!(); + println!( + "You can also revert all proposed changes with: garage layout revert --version {}", + layout.version + 1)}, + Err(Error::Message(s)) => { + println!("Error while trying to compute the assignation: {}", s); + println!("This new layout cannot yet be applied.");}, + _ => { println!("Unknown Error"); }, + } } Ok(()) @@ -213,7 +217,10 @@ pub async fn cmd_apply_layout( ) -> Result<(), Error> { let layout = fetch_layout(rpc_cli, rpc_host).await?; - let layout = layout.apply_staged_changes(apply_opt.version)?; + let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?; + for line in msg.iter() { + println!("{}", line); + } send_layout(rpc_cli, rpc_host, layout).await?; -- cgit v1.2.3 From a951b6c45273e59b98f974937aebb8ada8816ab8 Mon Sep 17 00:00:00 2001 From: Mendes Date: Wed, 5 Oct 2022 16:04:19 +0200 Subject: Added a CLI command to update the parameters for the layout computation (for now, only the zone redundancy) --- src/garage/cli/layout.rs | 35 +++++++++++++++++++++++++++++++++-- src/garage/cli/structs.rs | 14 +++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index a5b838e7..6b86e46d 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -14,8 +14,8 @@ pub async fn cli_layout_command_dispatch( rpc_host: NodeID, ) -> Result<(), Error> { match cmd { - LayoutOperation::Assign(configure_opt) => { - cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await + LayoutOperation::Assign(assign_opt) => { + cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await } LayoutOperation::Remove(remove_opt) => { cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await @@ -27,6 +27,9 @@ pub async fn cli_layout_command_dispatch( LayoutOperation::Revert(revert_opt) => { cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await } + LayoutOperation::Config(config_opt) => { + cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await + } } } @@ -245,6 +248,34 @@ pub async fn cmd_revert_layout( Ok(()) } +pub async fn cmd_config_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, + config_opt: ConfigLayoutOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + match config_opt.redundancy { + None => (), + Some(r) => { + if r > layout.replication_factor { + println!("The zone redundancy must be smaller or equal to the \ + replication factor ({}).", layout.replication_factor); + } + else if r < 1 { + println!("The zone redundancy must be at least 1."); + } + else { + layout.parameters.update(LayoutParameters{ zone_redundancy: r }); + println!("The new zone redundancy has been staged."); + } + } + } + + send_layout(rpc_cli, rpc_host, layout).await?; + Ok(()) +} + // --- utility --- pub async fn fetch_layout( diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 06548e89..896379bb 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -86,6 +86,10 @@ pub enum LayoutOperation { /// Remove role from Garage cluster node #[structopt(name = "remove", version = garage_version())] Remove(RemoveRoleOpt), + + /// Configure parameters value for the layout computation + #[structopt(name = "config", version = garage_version())] + Config(ConfigLayoutOpt), /// Show roles currently assigned to nodes and changes staged for commit #[structopt(name = "show", version = garage_version())] @@ -100,6 +104,7 @@ pub enum LayoutOperation { Revert(RevertLayoutOpt), } + #[derive(StructOpt, Debug)] pub struct AssignRoleOpt { /// Node(s) to which to assign role (prefix of hexadecimal node id) @@ -110,7 +115,7 @@ pub struct AssignRoleOpt { #[structopt(short = "z", long = "zone")] pub(crate) zone: Option, - /// Capacity (in relative terms, use 1 to represent your smallest server) + /// Capacity (in relative terms) #[structopt(short = "c", long = "capacity")] pub(crate) capacity: Option, @@ -133,6 +138,13 @@ pub struct RemoveRoleOpt { pub(crate) node_id: String, } +#[derive(StructOpt, Debug)] +pub struct ConfigLayoutOpt { + /// Zone redundancy parameter + #[structopt(short = "r", long = "redundancy")] + pub(crate) redundancy: Option, +} + #[derive(StructOpt, Debug)] pub struct ApplyLayoutOpt { /// Version number of new configuration: this command will fail if -- cgit v1.2.3 From 9407df60cc00fc70c10f73bc4b600085789d5353 Mon Sep 17 00:00:00 2001 From: Mendes Date: Thu, 6 Oct 2022 12:54:51 +0200 Subject: Corrected two bugs: - self.node_id_vec was not properly updated when the previous ring was empty - ClusterLayout::merge was not considering changes in the layout parameters --- src/garage/cli/layout.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 6b86e46d..9e5bdaea 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -188,6 +188,10 @@ pub async fn cmd_show_layout( println!("No nodes have a role in the new layout."); } println!(); + + println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); + println!("Zone redundancy: {}", layout.parameters.get().zone_redundancy); + println!(); // this will print the stats of what partitions // will move around when we apply @@ -267,7 +271,7 @@ pub async fn cmd_config_layout( } else { layout.parameters.update(LayoutParameters{ zone_redundancy: r }); - println!("The new zone redundancy has been staged."); + println!("The new zone redundancy has been saved ({}).", r); } } } -- cgit v1.2.3 From fcf9ac674a2842b2b55d933e60af5af93dcc4592 Mon Sep 17 00:00:00 2001 From: Mendes Date: Mon, 10 Oct 2022 17:19:25 +0200 Subject: Tests written in layout.rs added staged_parameters to ClusterLayout removed the serde(default) -> will need a migration function --- src/garage/cli/layout.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 9e5bdaea..32f637eb 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -190,7 +190,7 @@ pub async fn cmd_show_layout( println!(); println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); - println!("Zone redundancy: {}", layout.parameters.get().zone_redundancy); + println!("Zone redundancy: {}", layout.staged_parameters.get().zone_redundancy); println!(); // this will print the stats of what partitions @@ -270,7 +270,7 @@ pub async fn cmd_config_layout( println!("The zone redundancy must be at least 1."); } else { - layout.parameters.update(LayoutParameters{ zone_redundancy: r }); + layout.staged_parameters.update(LayoutParameters{ zone_redundancy: r }); println!("The new zone redundancy has been saved ({}).", r); } } -- cgit v1.2.3 From 4abab246f1113a9a1988fdfca81c1dd8ffa323c8 Mon Sep 17 00:00:00 2001 From: Mendes Date: Mon, 10 Oct 2022 17:21:13 +0200 Subject: cargo fmt --- src/garage/cli/layout.rs | 94 ++++++++++++++++++++++++++--------------------- src/garage/cli/structs.rs | 7 ++-- 2 files changed, 55 insertions(+), 46 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 32f637eb..f747fbe4 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -27,9 +27,9 @@ pub async fn cli_layout_command_dispatch( LayoutOperation::Revert(revert_opt) => { cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await } - LayoutOperation::Config(config_opt) => { - cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await - } + LayoutOperation::Config(config_opt) => { + cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await + } } } @@ -188,30 +188,37 @@ pub async fn cmd_show_layout( println!("No nodes have a role in the new layout."); } println!(); - + println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); - println!("Zone redundancy: {}", layout.staged_parameters.get().zone_redundancy); + println!( + "Zone redundancy: {}", + layout.staged_parameters.get().zone_redundancy + ); println!(); // this will print the stats of what partitions // will move around when we apply - match layout.calculate_partition_assignation() { - Ok(msg) => { - for line in msg.iter() { - println!("{}", line); - } - println!("To enact the staged role changes, type:"); - println!(); - println!(" garage layout apply --version {}", layout.version + 1); - println!(); - println!( + match layout.calculate_partition_assignation() { + Ok(msg) => { + for line in msg.iter() { + println!("{}", line); + } + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", layout.version + 1); + println!(); + println!( "You can also revert all proposed changes with: garage layout revert --version {}", - layout.version + 1)}, - Err(Error::Message(s)) => { - println!("Error while trying to compute the assignation: {}", s); - println!("This new layout cannot yet be applied.");}, - _ => { println!("Unknown Error"); }, - } + layout.version + 1) + } + Err(Error::Message(s)) => { + println!("Error while trying to compute the assignation: {}", s); + println!("This new layout cannot yet be applied."); + } + _ => { + println!("Unknown Error"); + } + } } Ok(()) @@ -225,9 +232,9 @@ pub async fn cmd_apply_layout( let layout = fetch_layout(rpc_cli, rpc_host).await?; let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?; - for line in msg.iter() { - println!("{}", line); - } + for line in msg.iter() { + println!("{}", line); + } send_layout(rpc_cli, rpc_host, layout).await?; @@ -258,26 +265,29 @@ pub async fn cmd_config_layout( config_opt: ConfigLayoutOpt, ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match config_opt.redundancy { - None => (), - Some(r) => { - if r > layout.replication_factor { - println!("The zone redundancy must be smaller or equal to the \ - replication factor ({}).", layout.replication_factor); - } - else if r < 1 { - println!("The zone redundancy must be at least 1."); - } - else { - layout.staged_parameters.update(LayoutParameters{ zone_redundancy: r }); - println!("The new zone redundancy has been saved ({}).", r); - } - } - } + + match config_opt.redundancy { + None => (), + Some(r) => { + if r > layout.replication_factor { + println!( + "The zone redundancy must be smaller or equal to the \ + replication factor ({}).", + layout.replication_factor + ); + } else if r < 1 { + println!("The zone redundancy must be at least 1."); + } else { + layout + .staged_parameters + .update(LayoutParameters { zone_redundancy: r }); + println!("The new zone redundancy has been saved ({}).", r); + } + } + } send_layout(rpc_cli, rpc_host, layout).await?; - Ok(()) + Ok(()) } // --- utility --- diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 896379bb..02ed8992 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -86,10 +86,10 @@ pub enum LayoutOperation { /// Remove role from Garage cluster node #[structopt(name = "remove", version = garage_version())] Remove(RemoveRoleOpt), - - /// Configure parameters value for the layout computation + + /// Configure parameters value for the layout computation #[structopt(name = "config", version = garage_version())] - Config(ConfigLayoutOpt), + Config(ConfigLayoutOpt), /// Show roles currently assigned to nodes and changes staged for commit #[structopt(name = "show", version = garage_version())] @@ -104,7 +104,6 @@ pub enum LayoutOperation { Revert(RevertLayoutOpt), } - #[derive(StructOpt, Debug)] pub struct AssignRoleOpt { /// Node(s) to which to assign role (prefix of hexadecimal node id) -- cgit v1.2.3 From e5664c9822c6ed1ecb30cac41b6a4125da3f88e7 Mon Sep 17 00:00:00 2001 From: Mendes Date: Tue, 11 Oct 2022 17:17:13 +0200 Subject: Improved the statistics displayed in layout show corrected a few bugs --- src/garage/cli/layout.rs | 69 +++++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 24 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index f747fbe4..5056e57d 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -169,7 +169,7 @@ pub async fn cmd_show_layout( rpc_cli: &Endpoint, rpc_host: NodeID, ) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== CURRENT CLUSTER LAYOUT ===="); if !print_cluster_layout(&layout) { @@ -179,41 +179,40 @@ pub async fn cmd_show_layout( println!(); println!("Current cluster layout version: {}", layout.version); - if print_staging_role_changes(&layout) { - layout.roles.merge(&layout.staging); - - println!(); - println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); - if !print_cluster_layout(&layout) { - println!("No nodes have a role in the new layout."); - } - println!(); - - println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); - println!( - "Zone redundancy: {}", - layout.staged_parameters.get().zone_redundancy - ); - println!(); + let has_role_changes = print_staging_role_changes(&layout); + let has_param_changes = print_staging_parameters_changes(&layout); + if has_role_changes || has_param_changes { + let v = layout.version; + let res_apply = layout.apply_staged_changes(Some(v + 1)); // this will print the stats of what partitions // will move around when we apply - match layout.calculate_partition_assignation() { - Ok(msg) => { + match res_apply { + Ok((layout, msg)) => { + println!(); + println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); + if !print_cluster_layout(&layout) { + println!("No nodes have a role in the new layout."); + } + println!(); + for line in msg.iter() { println!("{}", line); } println!("To enact the staged role changes, type:"); println!(); - println!(" garage layout apply --version {}", layout.version + 1); + println!(" garage layout apply --version {}", v + 1); println!(); println!( "You can also revert all proposed changes with: garage layout revert --version {}", - layout.version + 1) + v + 1) } Err(Error::Message(s)) => { println!("Error while trying to compute the assignation: {}", s); println!("This new layout cannot yet be applied."); + println!( + "You can also revert all proposed changes with: garage layout revert --version {}", + v + 1) } _ => { println!("Unknown Error"); @@ -321,21 +320,29 @@ pub async fn send_layout( } pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { - let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable".to_string()]; for (id, _, role) in layout.roles.items().iter() { let role = match &role.0 { Some(r) => r, _ => continue, }; let tags = role.tags.join(","); + let usage = layout.get_node_usage(id).unwrap_or(0); + let capacity = layout.get_node_capacity(id).unwrap_or(1); table.push(format!( - "{:?}\t{}\t{}\t{}", + "{:?}\t{}\t{}\t{}\t{} ({:.1}%)", id, tags, role.zone, - role.capacity_string() + role.capacity_string(), + usage as u32 * layout.partition_size, + (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) )); } + println!(); + println!("Parameters of the layout computation:"); + println!("Zone redundancy: {}", layout.parameters.zone_redundancy); + println!(); if table.len() == 1 { false } else { @@ -344,6 +351,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } } +pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { + let has_changes = layout.staged_parameters.get().clone() != layout.parameters; + if has_changes { + println!(); + println!("==== NEW LAYOUT PARAMETERS ===="); + println!( + "Zone redundancy: {}", + layout.staged_parameters.get().zone_redundancy + ); + println!(); + } + has_changes +} + pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { let has_changes = layout .staging -- cgit v1.2.3 From ea5afc251106b3f6e2d07f942ba1f88abeef8765 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 7 Nov 2022 19:34:40 +0100 Subject: Style improvements --- src/garage/cli/cmd.rs | 2 +- src/garage/cli/layout.rs | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index c8b96489..e352ddf2 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -71,7 +71,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> )); } _ => { - let new_role = match layout.staging.get(&adv.id) { + let new_role = match layout.staging_roles.get(&adv.id) { Some(NodeRoleV(Some(_))) => "(pending)", _ => "NO ROLE ASSIGNED", }; diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 5056e57d..4b23a096 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -63,14 +63,14 @@ pub async fn cmd_assign_role( .collect::, _>>()?; let mut roles = layout.roles.clone(); - roles.merge(&layout.staging); + roles.merge(&layout.staging_roles); for replaced in args.replace.iter() { let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout - .staging + .staging_roles .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); } _ => { @@ -128,7 +128,7 @@ pub async fn cmd_assign_role( }; layout - .staging + .staging_roles .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); } @@ -148,13 +148,13 @@ pub async fn cmd_remove_role( let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut roles = layout.roles.clone(); - roles.merge(&layout.staging); + roles.merge(&layout.staging_roles); let deleted_node = find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; layout - .staging + .staging_roles .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); send_layout(rpc_cli, rpc_host, layout).await?; @@ -278,7 +278,7 @@ pub async fn cmd_config_layout( println!("The zone redundancy must be at least 1."); } else { layout - .staged_parameters + .staging_parameters .update(LayoutParameters { zone_redundancy: r }); println!("The new zone redundancy has been saved ({}).", r); } @@ -352,13 +352,13 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { - let has_changes = layout.staged_parameters.get().clone() != layout.parameters; + let has_changes = layout.staging_parameters.get().clone() != layout.parameters; if has_changes { println!(); println!("==== NEW LAYOUT PARAMETERS ===="); println!( "Zone redundancy: {}", - layout.staged_parameters.get().zone_redundancy + layout.staging_parameters.get().zone_redundancy ); println!(); } @@ -367,7 +367,7 @@ pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { let has_changes = layout - .staging + .staging_roles .items() .iter() .any(|(k, _, v)| layout.roles.get(k) != Some(v)); @@ -376,7 +376,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { println!(); println!("==== STAGED ROLE CHANGES ===="); let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in layout.staging.items().iter() { + for (id, _, role) in layout.staging_roles.items().iter() { if layout.roles.get(id) == Some(role) { continue; } -- cgit v1.2.3 From 73a4ca8b1515f95bf7860fc292c12db83d3c6228 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 7 Nov 2022 21:12:11 +0100 Subject: Use bytes as capacity units --- src/garage/cli/layout.rs | 18 ++++++++++++++---- src/garage/cli/structs.rs | 4 ++-- 2 files changed, 16 insertions(+), 6 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 4b23a096..85af345a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,3 +1,5 @@ +use bytesize::ByteSize; + use garage_util::crdt::Crdt; use garage_util::error::*; use garage_util::formater::format_table; @@ -86,7 +88,7 @@ pub async fn cmd_assign_role( return Err(Error::Message( "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); } - if args.capacity == Some(0) { + if args.capacity == Some(ByteSize::b(0)) { return Err(Error::Message("Invalid capacity value: 0".into())); } @@ -94,7 +96,7 @@ pub async fn cmd_assign_role( let new_entry = match roles.get(&added_node) { Some(NodeRoleV(Some(old))) => { let capacity = match args.capacity { - Some(c) => Some(c), + Some(c) => Some(c.as_u64()), None if args.gateway => None, None => old.capacity, }; @@ -111,7 +113,7 @@ pub async fn cmd_assign_role( } _ => { let capacity = match args.capacity { - Some(c) => Some(c), + Some(c) => Some(c.as_u64()), None if args.gateway => None, None => return Err(Error::Message( "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), @@ -265,6 +267,7 @@ pub async fn cmd_config_layout( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let mut did_something = false; match config_opt.redundancy { None => (), Some(r) => { @@ -282,9 +285,16 @@ pub async fn cmd_config_layout( .update(LayoutParameters { zone_redundancy: r }); println!("The new zone redundancy has been saved ({}).", r); } + did_something = true; } } + if !did_something { + return Err(Error::Message( + "Please specify an action for `garage layout config` to do".into(), + )); + } + send_layout(rpc_cli, rpc_host, layout).await?; Ok(()) } @@ -335,7 +345,7 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { tags, role.zone, role.capacity_string(), - usage as u32 * layout.partition_size, + ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false), (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) )); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 64798952..49a1f267 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -114,9 +114,9 @@ pub struct AssignRoleOpt { #[structopt(short = "z", long = "zone")] pub(crate) zone: Option, - /// Capacity (in relative terms) + /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB) #[structopt(short = "c", long = "capacity")] - pub(crate) capacity: Option, + pub(crate) capacity: Option, /// Gateway-only node #[structopt(short = "g", long = "gateway")] -- cgit v1.2.3 From d75b37b018fc0ce8e3832c8531d9556ff7a345c9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 8 Nov 2022 14:23:08 +0100 Subject: Return more info when layout's .check() fails, fix compilation, fix test --- src/garage/cli/layout.rs | 32 +++++++++++++++++++++----------- src/garage/main.rs | 3 +++ 2 files changed, 24 insertions(+), 11 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 85af345a..53430e6b 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -330,7 +330,7 @@ pub async fn send_layout( } pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { - let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable".to_string()]; + let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; for (id, _, role) in layout.roles.items().iter() { let role = match &role.0 { Some(r) => r, @@ -338,16 +338,26 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { }; let tags = role.tags.join(","); let usage = layout.get_node_usage(id).unwrap_or(0); - let capacity = layout.get_node_capacity(id).unwrap_or(1); - table.push(format!( - "{:?}\t{}\t{}\t{}\t{} ({:.1}%)", - id, - tags, - role.zone, - role.capacity_string(), - ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false), - (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) - )); + let capacity = layout.get_node_capacity(id).unwrap_or(0); + if capacity > 0 { + table.push(format!( + "{:?}\t{}\t{}\t{}\t{} ({:.1}%)", + id, + tags, + role.zone, + role.capacity_string(), + ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false), + (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) + )); + } else { + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string(), + )); + }; } println!(); println!("Parameters of the layout computation:"); diff --git a/src/garage/main.rs b/src/garage/main.rs index edda734b..8e64273f 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -17,6 +17,9 @@ compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled #[cfg(all(feature = "bundled-libs", feature = "system-libs"))] compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled"); +#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] +compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); + use std::net::SocketAddr; use std::path::PathBuf; -- cgit v1.2.3 From fc2729cd810b94c47d89e9039ea65726c9a85988 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 8 Nov 2022 15:13:37 +0100 Subject: Fix integration test --- src/garage/tests/common/garage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index 44d727f9..a539abb7 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -126,7 +126,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" self.command() .args(["layout", "assign"]) .arg(node_short_id) - .args(["-c", "1", "-z", "unzonned"]) + .args(["-c", "1G", "-z", "unzonned"]) .quiet() .expect_success_status("Could not assign garage node layout"); self.command() -- cgit v1.2.3 From ec12d6c8ddde0f1dc908e43fef0ecc88d1e5406b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 8 Nov 2022 16:15:45 +0100 Subject: Slightly simplify code at places --- src/garage/cli/layout.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 53430e6b..27bb7eb8 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -209,16 +209,13 @@ pub async fn cmd_show_layout( "You can also revert all proposed changes with: garage layout revert --version {}", v + 1) } - Err(Error::Message(s)) => { - println!("Error while trying to compute the assignation: {}", s); + Err(e) => { + println!("Error while trying to compute the assignation: {}", e); println!("This new layout cannot yet be applied."); println!( "You can also revert all proposed changes with: garage layout revert --version {}", v + 1) } - _ => { - println!("Unknown Error"); - } } } @@ -355,7 +352,7 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { id, tags, role.zone, - role.capacity_string(), + role.capacity_string() )); }; } @@ -372,7 +369,7 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { - let has_changes = layout.staging_parameters.get().clone() != layout.parameters; + let has_changes = *layout.staging_parameters.get() != layout.parameters; if has_changes { println!(); println!("==== NEW LAYOUT PARAMETERS ===="); -- cgit v1.2.3 From 8be862aa193ebe3081d1a74c3c5fc493ae9c82b0 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 2 Jan 2023 13:35:26 +0000 Subject: Changed all instances of 'key new' to 'key create' to make it consistent as bucket commands issued normally around the same time. --- src/garage/admin.rs | 2 +- src/garage/cli/structs.rs | 6 +++--- src/garage/tests/common/garage.rs | 3 +-- 3 files changed, 5 insertions(+), 6 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 1ca3698a..af9e9ea9 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -571,7 +571,7 @@ impl AdminRpcHandler { match cmd { KeyOperation::List => self.handle_list_keys().await, KeyOperation::Info(query) => self.handle_key_info(query).await, - KeyOperation::New(query) => self.handle_create_key(query).await, + KeyOperation::Create(query) => self.handle_create_key(query).await, KeyOperation::Rename(query) => self.handle_rename_key(query).await, KeyOperation::Delete(query) => self.handle_delete_key(query).await, KeyOperation::Allow(query) => self.handle_allow_key(query).await, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index e2f632f3..fe2c2a26 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -320,8 +320,8 @@ pub enum KeyOperation { Info(KeyOpt), /// Create new key - #[structopt(name = "new", version = garage_version())] - New(KeyNewOpt), + #[structopt(name = "create", version = garage_version())] + Create(KeyNewOpt), /// Rename key #[structopt(name = "rename", version = garage_version())] @@ -353,7 +353,7 @@ pub struct KeyOpt { #[derive(Serialize, Deserialize, StructOpt, Debug)] pub struct KeyNewOpt { /// Name of the key - #[structopt(long = "name", default_value = "Unnamed key")] + #[structopt(default_value = "Unnamed key")] pub name: String, } diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index 44d727f9..730d5889 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -172,8 +172,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" let output = self .command() - .args(["key", "new"]) - .args(["--name", name]) + .args(["key", "create", name]) .expect_success_output("Could not create key"); let stdout = String::from_utf8(output.stdout).unwrap(); -- cgit v1.2.3 From cb07e6145cf26a9bbbe44fd06090a099030d0750 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 5 Jan 2023 11:09:25 +0000 Subject: Changed all instances of assignation to assignment. --- src/garage/cli/layout.rs | 4 ++-- src/garage/cli/structs.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 27bb7eb8..cf8631a4 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -210,7 +210,7 @@ pub async fn cmd_show_layout( v + 1) } Err(e) => { - println!("Error while trying to compute the assignation: {}", e); + println!("Error while trying to compute the assignment: {}", e); println!("This new layout cannot yet be applied."); println!( "You can also revert all proposed changes with: garage layout revert --version {}", @@ -236,7 +236,7 @@ pub async fn cmd_apply_layout( send_layout(rpc_cli, rpc_host, layout).await?; - println!("New cluster layout with updated role assignation has been applied in cluster."); + println!("New cluster layout with updated role assignment has been applied in cluster."); println!("Data will now be moved around between nodes accordingly."); Ok(()) diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 531501bf..dcb9fef9 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -17,7 +17,7 @@ pub enum Command { #[structopt(name = "node", version = garage_version())] Node(NodeOperation), - /// Operations on the assignation of node roles in the cluster layout + /// Operations on the assignment of node roles in the cluster layout #[structopt(name = "layout", version = garage_version())] Layout(LayoutOperation), -- cgit v1.2.3 From 19639705e6242861bd43a123456793e2d8f2c69a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 17 May 2023 14:30:53 +0200 Subject: Mark sled as deprecated, make lmdb default, and improve sqlite and lmdb defaults --- src/garage/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 2b366ff1..7f7c3287 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -74,7 +74,7 @@ base64 = "0.21" [features] -default = [ "bundled-libs", "metrics", "sled", "k2v" ] +default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ] k2v = [ "garage_util/k2v", "garage_api/k2v" ] -- cgit v1.2.3 From 8a74e1c2bd7f202dfaa55ecc16a91ad84812fff9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Jun 2023 15:39:15 +0200 Subject: Split garage/admin.rs into smaller files --- src/garage/admin.rs | 1298 -------------------------------------------- src/garage/admin/block.rs | 160 ++++++ src/garage/admin/bucket.rs | 496 +++++++++++++++++ src/garage/admin/key.rs | 149 +++++ src/garage/admin/mod.rs | 537 ++++++++++++++++++ 5 files changed, 1342 insertions(+), 1298 deletions(-) delete mode 100644 src/garage/admin.rs create mode 100644 src/garage/admin/block.rs create mode 100644 src/garage/admin/bucket.rs create mode 100644 src/garage/admin/key.rs create mode 100644 src/garage/admin/mod.rs (limited to 'src/garage') diff --git a/src/garage/admin.rs b/src/garage/admin.rs deleted file mode 100644 index e4e50520..00000000 --- a/src/garage/admin.rs +++ /dev/null @@ -1,1298 +0,0 @@ -use std::collections::HashMap; -use std::fmt::Write; -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_util::background::BackgroundRunner; -use garage_util::crdt::*; -use garage_util::data::*; -use garage_util::error::Error as GarageError; -use garage_util::formater::format_table_to_string; -use garage_util::time::*; - -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::ring::PARTITION_BITS; -use garage_rpc::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::bucket_alias_table::*; -use garage_model::bucket_table::*; -use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::key_table::*; -use garage_model::migrate::Migrate; -use garage_model::permission::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::Version; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - Migrate(MigrateOpt), - Stats(StatsOpt), - Worker(WorkerOperation), - BlockOperation(BlockOperation), - - // Replies - Ok(String), - BucketList(Vec), - BucketInfo { - bucket: Bucket, - relevant_keys: HashMap, - counters: HashMap, - }, - KeyList(Vec<(String, String)>), - KeyInfo(Key, HashMap), - WorkerList( - HashMap, - WorkerListOpt, - ), - WorkerVars(Vec<(Uuid, String, String)>), - WorkerInfo(usize, garage_util::background::WorkerInfo), - BlockErrorList(Vec), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec>, - }, -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - background: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc, background: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ BUCKET COMMANDS ==================== - - async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { - match cmd { - BucketOperation::List => self.handle_list_buckets().await, - BucketOperation::Info(query) => self.handle_bucket_info(query).await, - BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await, - BucketOperation::Delete(query) => self.handle_delete_bucket(query).await, - BucketOperation::Alias(query) => self.handle_alias_bucket(query).await, - BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await, - BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, - BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, - BucketOperation::Website(query) => self.handle_bucket_website(query).await, - BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, - BucketOperation::CleanupIncompleteUploads(query) => { - self.handle_bucket_cleanup_incomplete_uploads(query).await - } - } - } - - async fn handle_list_buckets(&self) -> Result { - let buckets = self - .garage - .bucket_table - .get_range( - &EmptyKey, - None, - Some(DeletedFilter::NotDeleted), - 10000, - EnumerationOrder::Forward, - ) - .await?; - - Ok(AdminRpc::BucketList(buckets)) - } - - async fn handle_bucket_info(&self, query: &BucketOpt) -> Result { - let bucket_id = self - .garage - .bucket_helper() - .resolve_global_bucket_name(&query.name) - .await? - .ok_or_bad_request("Bucket not found")?; - - let bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let counters = self - .garage - .object_counter_table - .table - .get(&bucket_id, &EmptyKey) - .await? - .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) - .unwrap_or_default(); - - let mut relevant_keys = HashMap::new(); - for (k, _) in bucket - .state - .as_option() - .unwrap() - .authorized_keys - .items() - .iter() - { - if let Some(key) = self - .garage - .key_table - .get(&EmptyKey, k) - .await? - .filter(|k| !k.is_deleted()) - { - relevant_keys.insert(k.clone(), key); - } - } - for ((k, _), _, _) in bucket - .state - .as_option() - .unwrap() - .local_aliases - .items() - .iter() - { - if relevant_keys.contains_key(k) { - continue; - } - if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? { - relevant_keys.insert(k.clone(), key); - } - } - - Ok(AdminRpc::BucketInfo { - bucket, - relevant_keys, - counters, - }) - } - - #[allow(clippy::ptr_arg)] - async fn handle_create_bucket(&self, name: &String) -> Result { - if !is_valid_bucket_name(name) { - return Err(Error::BadRequest(format!( - "{}: {}", - name, INVALID_BUCKET_NAME_MESSAGE - ))); - } - - if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { - if alias.state.get().is_some() { - return Err(Error::BadRequest(format!("Bucket {} already exists", name))); - } - } - - // ---- done checking, now commit ---- - - let bucket = Bucket::new(); - self.garage.bucket_table.insert(&bucket).await?; - - self.garage - .bucket_helper() - .set_global_bucket_alias(bucket.id, name) - .await?; - - Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) - } - - async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result { - let helper = self.garage.bucket_helper(); - - let bucket_id = helper - .resolve_global_bucket_name(&query.name) - .await? - .ok_or_bad_request("Bucket not found")?; - - // Get the alias, but keep in minde here the bucket name - // given in parameter can also be directly the bucket's ID. - // In that case bucket_alias will be None, and - // we can still delete the bucket if it has zero aliases - // (a condition which we try to prevent but that could still happen somehow). - // We just won't try to delete an alias entry because there isn't one. - let bucket_alias = self - .garage - .bucket_alias_table - .get(&EmptyKey, &query.name) - .await?; - - // Check bucket doesn't have other aliases - let mut bucket = helper.get_existing_bucket(bucket_id).await?; - let bucket_state = bucket.state.as_option().unwrap(); - if bucket_state - .aliases - .items() - .iter() - .filter(|(_, _, active)| *active) - .any(|(name, _, _)| name != &query.name) - { - return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name))); - } - if bucket_state - .local_aliases - .items() - .iter() - .any(|(_, _, active)| *active) - { - return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name))); - } - - // Check bucket is empty - if !helper.is_bucket_empty(bucket_id).await? { - return Err(Error::BadRequest(format!( - "Bucket {} is not empty", - query.name - ))); - } - - if !query.yes { - return Err(Error::BadRequest( - "Add --yes flag to really perform this operation".to_string(), - )); - } - - // --- done checking, now commit --- - // 1. delete authorization from keys that had access - for (key_id, _) in bucket.authorized_keys() { - helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) - .await?; - } - - // 2. delete bucket alias - if bucket_alias.is_some() { - helper - .purge_global_bucket_alias(bucket_id, &query.name) - .await?; - } - - // 3. delete bucket - bucket.state = Deletable::delete(); - self.garage.bucket_table.insert(&bucket).await?; - - Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) - } - - async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result { - let helper = self.garage.bucket_helper(); - let key_helper = self.garage.key_helper(); - - let bucket_id = helper - .resolve_global_bucket_name(&query.existing_bucket) - .await? - .ok_or_bad_request("Bucket not found")?; - - if let Some(key_pattern) = &query.local { - let key = key_helper.get_existing_matching_key(key_pattern).await?; - - helper - .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) - .await?; - Ok(AdminRpc::Ok(format!( - "Alias {} now points to bucket {:?} in namespace of key {}", - query.new_name, bucket_id, key.key_id - ))) - } else { - helper - .set_global_bucket_alias(bucket_id, &query.new_name) - .await?; - Ok(AdminRpc::Ok(format!( - "Alias {} now points to bucket {:?}", - query.new_name, bucket_id - ))) - } - } - - async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result { - let helper = self.garage.bucket_helper(); - let key_helper = self.garage.key_helper(); - - if let Some(key_pattern) = &query.local { - let key = key_helper.get_existing_matching_key(key_pattern).await?; - - let bucket_id = key - .state - .as_option() - .unwrap() - .local_aliases - .get(&query.name) - .cloned() - .flatten() - .ok_or_bad_request("Bucket not found")?; - - helper - .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name) - .await?; - - Ok(AdminRpc::Ok(format!( - "Alias {} no longer points to bucket {:?} in namespace of key {}", - &query.name, bucket_id, key.key_id - ))) - } else { - let bucket_id = helper - .resolve_global_bucket_name(&query.name) - .await? - .ok_or_bad_request("Bucket not found")?; - - helper - .unset_global_bucket_alias(bucket_id, &query.name) - .await?; - - Ok(AdminRpc::Ok(format!( - "Alias {} no longer points to bucket {:?}", - &query.name, bucket_id - ))) - } - } - - async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result { - let helper = self.garage.bucket_helper(); - let key_helper = self.garage.key_helper(); - - let bucket_id = helper - .resolve_global_bucket_name(&query.bucket) - .await? - .ok_or_bad_request("Bucket not found")?; - let key = key_helper - .get_existing_matching_key(&query.key_pattern) - .await?; - - let allow_read = query.read || key.allow_read(&bucket_id); - let allow_write = query.write || key.allow_write(&bucket_id); - let allow_owner = query.owner || key.allow_owner(&bucket_id); - - helper - .set_bucket_key_permissions( - bucket_id, - &key.key_id, - BucketKeyPerm { - timestamp: now_msec(), - allow_read, - allow_write, - allow_owner, - }, - ) - .await?; - - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}, owner {}.", - &key.key_id, &query.bucket, allow_read, allow_write, allow_owner - ))) - } - - async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result { - let helper = self.garage.bucket_helper(); - let key_helper = self.garage.key_helper(); - - let bucket_id = helper - .resolve_global_bucket_name(&query.bucket) - .await? - .ok_or_bad_request("Bucket not found")?; - let key = key_helper - .get_existing_matching_key(&query.key_pattern) - .await?; - - let allow_read = !query.read && key.allow_read(&bucket_id); - let allow_write = !query.write && key.allow_write(&bucket_id); - let allow_owner = !query.owner && key.allow_owner(&bucket_id); - - helper - .set_bucket_key_permissions( - bucket_id, - &key.key_id, - BucketKeyPerm { - timestamp: now_msec(), - allow_read, - allow_write, - allow_owner, - }, - ) - .await?; - - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}, owner {}.", - &key.key_id, &query.bucket, allow_read, allow_write, allow_owner - ))) - } - - async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result { - let bucket_id = self - .garage - .bucket_helper() - .resolve_global_bucket_name(&query.bucket) - .await? - .ok_or_bad_request("Bucket not found")?; - - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option_mut().unwrap(); - - if !(query.allow ^ query.deny) { - return Err(Error::BadRequest( - "You must specify exactly one flag, either --allow or --deny".to_string(), - )); - } - - let website = if query.allow { - Some(WebsiteConfig { - index_document: query.index_document.clone(), - error_document: query.error_document.clone(), - }) - } else { - None - }; - - bucket_state.website_config.update(website); - self.garage.bucket_table.insert(&bucket).await?; - - let msg = if query.allow { - format!("Website access allowed for {}", &query.bucket) - } else { - format!("Website access denied for {}", &query.bucket) - }; - - Ok(AdminRpc::Ok(msg)) - } - - async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result { - let bucket_id = self - .garage - .bucket_helper() - .resolve_global_bucket_name(&query.bucket) - .await? - .ok_or_bad_request("Bucket not found")?; - - let mut bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option_mut().unwrap(); - - if query.max_size.is_none() && query.max_objects.is_none() { - return Err(Error::BadRequest( - "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), - )); - } - - let mut quotas = bucket_state.quotas.get().clone(); - - match query.max_size.as_ref().map(String::as_ref) { - Some("none") => quotas.max_size = None, - Some(v) => { - let bs = v - .parse::() - .ok_or_bad_request(format!("Invalid size specified: {}", v))?; - quotas.max_size = Some(bs.as_u64()); - } - _ => (), - } - - match query.max_objects.as_ref().map(String::as_ref) { - Some("none") => quotas.max_objects = None, - Some(v) => { - let mo = v - .parse::() - .ok_or_bad_request(format!("Invalid number specified: {}", v))?; - quotas.max_objects = Some(mo); - } - _ => (), - } - - bucket_state.quotas.update(quotas); - self.garage.bucket_table.insert(&bucket).await?; - - Ok(AdminRpc::Ok(format!( - "Quotas updated for {}", - &query.bucket - ))) - } - - async fn handle_bucket_cleanup_incomplete_uploads( - &self, - query: &CleanupIncompleteUploadsOpt, - ) -> Result { - let mut bucket_ids = vec![]; - for b in query.buckets.iter() { - bucket_ids.push( - self.garage - .bucket_helper() - .resolve_global_bucket_name(b) - .await? - .ok_or_bad_request(format!("Bucket not found: {}", b))?, - ); - } - - let duration = parse_duration::parse::parse(&query.older_than) - .ok_or_bad_request("Invalid duration passed for --older-than parameter")?; - - let mut ret = String::new(); - for bucket in bucket_ids { - let count = self - .garage - .bucket_helper() - .cleanup_incomplete_uploads(&bucket, duration) - .await?; - writeln!( - &mut ret, - "Bucket {:?}: {} incomplete uploads aborted", - bucket, count - ) - .unwrap(); - } - - Ok(AdminRpc::Ok(ret)) - } - - // ================ KEY COMMANDS ==================== - - async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { - match cmd { - KeyOperation::List => self.handle_list_keys().await, - KeyOperation::Info(query) => self.handle_key_info(query).await, - KeyOperation::Create(query) => self.handle_create_key(query).await, - KeyOperation::Rename(query) => self.handle_rename_key(query).await, - KeyOperation::Delete(query) => self.handle_delete_key(query).await, - KeyOperation::Allow(query) => self.handle_allow_key(query).await, - KeyOperation::Deny(query) => self.handle_deny_key(query).await, - KeyOperation::Import(query) => self.handle_import_key(query).await, - } - } - - async fn handle_list_keys(&self) -> Result { - let key_ids = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - EnumerationOrder::Forward, - ) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone())) - .collect::>(); - Ok(AdminRpc::KeyList(key_ids)) - } - - async fn handle_key_info(&self, query: &KeyOpt) -> Result { - let key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - self.key_info_result(key).await - } - - async fn handle_create_key(&self, query: &KeyNewOpt) -> Result { - let key = Key::new(&query.name); - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - key.params_mut() - .unwrap() - .name - .update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result { - let key_helper = self.garage.key_helper(); - - let mut key = key_helper - .get_existing_matching_key(&query.key_pattern) - .await?; - - if !query.yes { - return Err(Error::BadRequest( - "Add --yes flag to really perform this operation".to_string(), - )); - } - - key_helper.delete_key(&mut key).await?; - - Ok(AdminRpc::Ok(format!( - "Key {} was deleted successfully.", - key.key_id - ))) - } - - async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - if query.create_bucket { - key.params_mut().unwrap().allow_create_bucket.update(true); - } - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result { - let mut key = self - .garage - .key_helper() - .get_existing_matching_key(&query.key_pattern) - .await?; - if query.create_bucket { - key.params_mut().unwrap().allow_create_bucket.update(false); - } - self.garage.key_table.insert(&key).await?; - self.key_info_result(key).await - } - - async fn handle_import_key(&self, query: &KeyImportOpt) -> Result { - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; - if prev_key.is_some() { - return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); - } - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); - self.garage.key_table.insert(&imported_key).await?; - - self.key_info_result(imported_key).await - } - - async fn key_info_result(&self, key: Key) -> Result { - let mut relevant_buckets = HashMap::new(); - - for (id, _) in key - .state - .as_option() - .unwrap() - .authorized_buckets - .items() - .iter() - { - if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? { - relevant_buckets.insert(*id, b); - } - } - - Ok(AdminRpc::KeyInfo(key, relevant_buckets)) - } - - // ================ MIGRATION COMMANDS ==================== - - async fn handle_migrate(self: &Arc, opt: MigrateOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate migration operation.".to_string(), - )); - } - - let m = Migrate { - garage: self.garage.clone(), - }; - match opt.what { - MigrateWhat::Buckets050 => m.migrate_buckets050().await, - }?; - Ok(AdminRpc::Ok("Migration successfull.".into())) - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let ring = self.garage.system.ring.borrow().clone(); - - for node in ring.layout.node_ids().iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); - table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?); - table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?); - table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = if opt.detailed { - self.garage.block_manager.rc_len()?.to_string() - } else { - self.garage - .block_manager - .rc_fast_len()? - .map(|x| x.to_string()) - .unwrap_or_else(|| "NC".into()) - }; - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.detailed { - writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap(); - } - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics - let layout = &self.garage.system.ring.borrow().layout; - let mut node_partition_count = HashMap::::new(); - for short_id in layout.ring_assignment_data.iter() { - let id = layout.node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats( - &self, - t: &Arc>, - detailed: bool, - ) -> Result - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let (data_len, mkl_len) = if detailed { - ( - t.data.store.len().map_err(GarageError::from)?.to_string(), - t.merkle_updater.merkle_tree_len()?.to_string(), - ) - } else { - ( - t.data - .store - .fast_len() - .map_err(GarageError::from)? - .map(|x| x.to_string()) - .unwrap_or_else(|| "NC".into()), - t.merkle_updater - .merkle_tree_fast_len()? - .map(|x| x.to_string()) - .unwrap_or_else(|| "NC".into()), - ) - }; - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } - - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.layout.node_ids().iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) - } - } - - // ================ BLOCK COMMANDS ==================== - - async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result { - match cmd { - BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( - self.garage.block_manager.list_resync_errors()?, - )), - BlockOperation::Info { hash } => { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - let refcount = self.garage.block_manager.get_block_rc(&hash)?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - let mut versions = vec![]; - for br in block_refs { - if let Some(v) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - versions.push(Ok(v)); - } else { - versions.push(Err(br.version)); - } - } - Ok(AdminRpc::BlockInfo { - hash, - refcount, - versions, - }) - } - BlockOperation::RetryNow { all, blocks } => { - if *all { - if !blocks.is_empty() { - return Err(Error::BadRequest( - "--all was specified, cannot also specify blocks".into(), - )); - } - let blocks = self.garage.block_manager.list_resync_errors()?; - for b in blocks.iter() { - self.garage.block_manager.resync.clear_backoff(&b.hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } else { - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - self.garage.block_manager.resync.clear_backoff(&hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } - } - BlockOperation::Purge { yes, blocks } => { - if !yes { - return Err(Error::BadRequest( - "Pass the --yes flag to confirm block purge operation.".into(), - )); - } - - let mut obj_dels = 0; - let mut ver_dels = 0; - - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - - for br in block_refs { - let version = match self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - Some(v) => v, - None => continue, - }; - - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } - } - } - - if !version.deleted.get() { - let deleted_version = Version::new( - version.uuid, - version.bucket_id, - version.key.clone(), - true, - ); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } - Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", - blocks.len(), - obj_dels, - ver_dels - ))) - } - } - } -} - -#[async_trait] -impl EndpointHandler for AdminRpcHandler { - async fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> Result { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, - AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } -} diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs new file mode 100644 index 00000000..e9e3ff96 --- /dev/null +++ b/src/garage/admin/block.rs @@ -0,0 +1,160 @@ +use garage_util::data::*; + +use garage_table::*; + +use garage_model::helper::error::{Error, OkOrBadRequest}; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::cli::*; + +use super::*; + +impl AdminRpcHandler { + pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result { + match cmd { + BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( + self.garage.block_manager.list_resync_errors()?, + )), + BlockOperation::Info { hash } => self.handle_block_info(hash).await, + BlockOperation::RetryNow { all, blocks } => { + self.handle_block_retry_now(*all, blocks).await + } + BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await, + } + } + + async fn handle_block_info(&self, hash: &String) -> Result { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let refcount = self.garage.block_manager.get_block_rc(&hash)?; + let block_refs = self + .garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = self + .garage + .version_table + .get(&br.version, &EmptyKey) + .await? + { + versions.push(Ok(v)); + } else { + versions.push(Err(br.version)); + } + } + Ok(AdminRpc::BlockInfo { + hash, + refcount, + versions, + }) + } + + async fn handle_block_retry_now( + &self, + all: bool, + blocks: &[String], + ) -> Result { + if all { + if !blocks.is_empty() { + return Err(Error::BadRequest( + "--all was specified, cannot also specify blocks".into(), + )); + } + let blocks = self.garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + self.garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } else { + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + self.garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } + } + + async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result { + if !yes { + return Err(Error::BadRequest( + "Pass the --yes flag to confirm block purge operation.".into(), + )); + } + + let mut obj_dels = 0; + let mut ver_dels = 0; + + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = self + .garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + let version = match self + .garage + .version_table + .get(&br.version, &EmptyKey) + .await? + { + Some(v) => v, + None => continue, + }; + + if let Some(object) = self + .garage + .object_table + .get(&version.bucket_id, &version.key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == br.version { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + version.bucket_id, + version.key.clone(), + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + obj_dels += 1; + } + } + } + + if !version.deleted.get() { + let deleted_version = + Version::new(version.uuid, version.bucket_id, version.key.clone(), true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + Ok(AdminRpc::Ok(format!( + "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + blocks.len(), + obj_dels, + ver_dels + ))) + } +} diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs new file mode 100644 index 00000000..11bb8730 --- /dev/null +++ b/src/garage/admin/bucket.rs @@ -0,0 +1,496 @@ +use std::collections::HashMap; +use std::fmt::Write; + +use garage_util::crdt::*; +use garage_util::time::*; + +use garage_table::*; + +use garage_model::bucket_alias_table::*; +use garage_model::bucket_table::*; +use garage_model::helper::error::{Error, OkOrBadRequest}; +use garage_model::permission::*; + +use crate::cli::*; + +use super::*; + +impl AdminRpcHandler { + pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { + match cmd { + BucketOperation::List => self.handle_list_buckets().await, + BucketOperation::Info(query) => self.handle_bucket_info(query).await, + BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await, + BucketOperation::Delete(query) => self.handle_delete_bucket(query).await, + BucketOperation::Alias(query) => self.handle_alias_bucket(query).await, + BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await, + BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, + BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, + BucketOperation::Website(query) => self.handle_bucket_website(query).await, + BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, + BucketOperation::CleanupIncompleteUploads(query) => { + self.handle_bucket_cleanup_incomplete_uploads(query).await + } + } + } + + async fn handle_list_buckets(&self) -> Result { + let buckets = self + .garage + .bucket_table + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) + .await?; + + Ok(AdminRpc::BucketList(buckets)) + } + + async fn handle_bucket_info(&self, query: &BucketOpt) -> Result { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.name) + .await? + .ok_or_bad_request("Bucket not found")?; + + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let counters = self + .garage + .object_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + + let mut relevant_keys = HashMap::new(); + for (k, _) in bucket + .state + .as_option() + .unwrap() + .authorized_keys + .items() + .iter() + { + if let Some(key) = self + .garage + .key_table + .get(&EmptyKey, k) + .await? + .filter(|k| !k.is_deleted()) + { + relevant_keys.insert(k.clone(), key); + } + } + for ((k, _), _, _) in bucket + .state + .as_option() + .unwrap() + .local_aliases + .items() + .iter() + { + if relevant_keys.contains_key(k) { + continue; + } + if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? { + relevant_keys.insert(k.clone(), key); + } + } + + Ok(AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + }) + } + + #[allow(clippy::ptr_arg)] + async fn handle_create_bucket(&self, name: &String) -> Result { + if !is_valid_bucket_name(name) { + return Err(Error::BadRequest(format!( + "{}: {}", + name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + if alias.state.get().is_some() { + return Err(Error::BadRequest(format!("Bucket {} already exists", name))); + } + } + + // ---- done checking, now commit ---- + + let bucket = Bucket::new(); + self.garage.bucket_table.insert(&bucket).await?; + + self.garage + .bucket_helper() + .set_global_bucket_alias(bucket.id, name) + .await?; + + Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) + } + + async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result { + let helper = self.garage.bucket_helper(); + + let bucket_id = helper + .resolve_global_bucket_name(&query.name) + .await? + .ok_or_bad_request("Bucket not found")?; + + // Get the alias, but keep in minde here the bucket name + // given in parameter can also be directly the bucket's ID. + // In that case bucket_alias will be None, and + // we can still delete the bucket if it has zero aliases + // (a condition which we try to prevent but that could still happen somehow). + // We just won't try to delete an alias entry because there isn't one. + let bucket_alias = self + .garage + .bucket_alias_table + .get(&EmptyKey, &query.name) + .await?; + + // Check bucket doesn't have other aliases + let mut bucket = helper.get_existing_bucket(bucket_id).await?; + let bucket_state = bucket.state.as_option().unwrap(); + if bucket_state + .aliases + .items() + .iter() + .filter(|(_, _, active)| *active) + .any(|(name, _, _)| name != &query.name) + { + return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + if bucket_state + .local_aliases + .items() + .iter() + .any(|(_, _, active)| *active) + { + return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + + // Check bucket is empty + if !helper.is_bucket_empty(bucket_id).await? { + return Err(Error::BadRequest(format!( + "Bucket {} is not empty", + query.name + ))); + } + + if !query.yes { + return Err(Error::BadRequest( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, _) in bucket.authorized_keys() { + helper + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + + // 2. delete bucket alias + if bucket_alias.is_some() { + helper + .purge_global_bucket_alias(bucket_id, &query.name) + .await?; + } + + // 3. delete bucket + bucket.state = Deletable::delete(); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) + } + + async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result { + let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); + + let bucket_id = helper + .resolve_global_bucket_name(&query.existing_bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + + if let Some(key_pattern) = &query.local { + let key = key_helper.get_existing_matching_key(key_pattern).await?; + + helper + .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name) + .await?; + Ok(AdminRpc::Ok(format!( + "Alias {} now points to bucket {:?} in namespace of key {}", + query.new_name, bucket_id, key.key_id + ))) + } else { + helper + .set_global_bucket_alias(bucket_id, &query.new_name) + .await?; + Ok(AdminRpc::Ok(format!( + "Alias {} now points to bucket {:?}", + query.new_name, bucket_id + ))) + } + } + + async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result { + let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); + + if let Some(key_pattern) = &query.local { + let key = key_helper.get_existing_matching_key(key_pattern).await?; + + let bucket_id = key + .state + .as_option() + .unwrap() + .local_aliases + .get(&query.name) + .cloned() + .flatten() + .ok_or_bad_request("Bucket not found")?; + + helper + .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name) + .await?; + + Ok(AdminRpc::Ok(format!( + "Alias {} no longer points to bucket {:?} in namespace of key {}", + &query.name, bucket_id, key.key_id + ))) + } else { + let bucket_id = helper + .resolve_global_bucket_name(&query.name) + .await? + .ok_or_bad_request("Bucket not found")?; + + helper + .unset_global_bucket_alias(bucket_id, &query.name) + .await?; + + Ok(AdminRpc::Ok(format!( + "Alias {} no longer points to bucket {:?}", + &query.name, bucket_id + ))) + } + } + + async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result { + let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); + + let bucket_id = helper + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; + + let allow_read = query.read || key.allow_read(&bucket_id); + let allow_write = query.write || key.allow_write(&bucket_id); + let allow_owner = query.owner || key.allow_owner(&bucket_id); + + helper + .set_bucket_key_permissions( + bucket_id, + &key.key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + allow_owner, + }, + ) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}, owner {}.", + &key.key_id, &query.bucket, allow_read, allow_write, allow_owner + ))) + } + + async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result { + let helper = self.garage.bucket_helper(); + let key_helper = self.garage.key_helper(); + + let bucket_id = helper + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + let key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; + + let allow_read = !query.read && key.allow_read(&bucket_id); + let allow_write = !query.write && key.allow_write(&bucket_id); + let allow_owner = !query.owner && key.allow_owner(&bucket_id); + + helper + .set_bucket_key_permissions( + bucket_id, + &key.key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + allow_owner, + }, + ) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}, owner {}.", + &key.key_id, &query.bucket, allow_read, allow_write, allow_owner + ))) + } + + async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option_mut().unwrap(); + + if !(query.allow ^ query.deny) { + return Err(Error::BadRequest( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); + } + + let website = if query.allow { + Some(WebsiteConfig { + index_document: query.index_document.clone(), + error_document: query.error_document.clone(), + }) + } else { + None + }; + + bucket_state.website_config.update(website); + self.garage.bucket_table.insert(&bucket).await?; + + let msg = if query.allow { + format!("Website access allowed for {}", &query.bucket) + } else { + format!("Website access denied for {}", &query.bucket) + }; + + Ok(AdminRpc::Ok(msg)) + } + + async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option_mut().unwrap(); + + if query.max_size.is_none() && query.max_objects.is_none() { + return Err(Error::BadRequest( + "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), + )); + } + + let mut quotas = bucket_state.quotas.get().clone(); + + match query.max_size.as_ref().map(String::as_ref) { + Some("none") => quotas.max_size = None, + Some(v) => { + let bs = v + .parse::() + .ok_or_bad_request(format!("Invalid size specified: {}", v))?; + quotas.max_size = Some(bs.as_u64()); + } + _ => (), + } + + match query.max_objects.as_ref().map(String::as_ref) { + Some("none") => quotas.max_objects = None, + Some(v) => { + let mo = v + .parse::() + .ok_or_bad_request(format!("Invalid number specified: {}", v))?; + quotas.max_objects = Some(mo); + } + _ => (), + } + + bucket_state.quotas.update(quotas); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!( + "Quotas updated for {}", + &query.bucket + ))) + } + + async fn handle_bucket_cleanup_incomplete_uploads( + &self, + query: &CleanupIncompleteUploadsOpt, + ) -> Result { + let mut bucket_ids = vec![]; + for b in query.buckets.iter() { + bucket_ids.push( + self.garage + .bucket_helper() + .resolve_global_bucket_name(b) + .await? + .ok_or_bad_request(format!("Bucket not found: {}", b))?, + ); + } + + let duration = parse_duration::parse::parse(&query.older_than) + .ok_or_bad_request("Invalid duration passed for --older-than parameter")?; + + let mut ret = String::new(); + for bucket in bucket_ids { + let count = self + .garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket, duration) + .await?; + writeln!( + &mut ret, + "Bucket {:?}: {} incomplete uploads aborted", + bucket, count + ) + .unwrap(); + } + + Ok(AdminRpc::Ok(ret)) + } +} diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs new file mode 100644 index 00000000..a4064539 --- /dev/null +++ b/src/garage/admin/key.rs @@ -0,0 +1,149 @@ +use std::collections::HashMap; + +use garage_table::*; + +use garage_model::helper::error::Error; +use garage_model::key_table::*; + +use crate::cli::*; + +use super::*; + +impl AdminRpcHandler { + pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { + match cmd { + KeyOperation::List => self.handle_list_keys().await, + KeyOperation::Info(query) => self.handle_key_info(query).await, + KeyOperation::Create(query) => self.handle_create_key(query).await, + KeyOperation::Rename(query) => self.handle_rename_key(query).await, + KeyOperation::Delete(query) => self.handle_delete_key(query).await, + KeyOperation::Allow(query) => self.handle_allow_key(query).await, + KeyOperation::Deny(query) => self.handle_deny_key(query).await, + KeyOperation::Import(query) => self.handle_import_key(query).await, + } + } + + async fn handle_list_keys(&self) -> Result { + let key_ids = self + .garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + EnumerationOrder::Forward, + ) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone())) + .collect::>(); + Ok(AdminRpc::KeyList(key_ids)) + } + + async fn handle_key_info(&self, query: &KeyOpt) -> Result { + let key = self + .garage + .key_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; + self.key_info_result(key).await + } + + async fn handle_create_key(&self, query: &KeyNewOpt) -> Result { + let key = Key::new(&query.name); + self.garage.key_table.insert(&key).await?; + self.key_info_result(key).await + } + + async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result { + let mut key = self + .garage + .key_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; + key.params_mut() + .unwrap() + .name + .update(query.new_name.clone()); + self.garage.key_table.insert(&key).await?; + self.key_info_result(key).await + } + + async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result { + let key_helper = self.garage.key_helper(); + + let mut key = key_helper + .get_existing_matching_key(&query.key_pattern) + .await?; + + if !query.yes { + return Err(Error::BadRequest( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + key_helper.delete_key(&mut key).await?; + + Ok(AdminRpc::Ok(format!( + "Key {} was deleted successfully.", + key.key_id + ))) + } + + async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result { + let mut key = self + .garage + .key_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; + if query.create_bucket { + key.params_mut().unwrap().allow_create_bucket.update(true); + } + self.garage.key_table.insert(&key).await?; + self.key_info_result(key).await + } + + async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result { + let mut key = self + .garage + .key_helper() + .get_existing_matching_key(&query.key_pattern) + .await?; + if query.create_bucket { + key.params_mut().unwrap().allow_create_bucket.update(false); + } + self.garage.key_table.insert(&key).await?; + self.key_info_result(key).await + } + + async fn handle_import_key(&self, query: &KeyImportOpt) -> Result { + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + if prev_key.is_some() { + return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + self.garage.key_table.insert(&imported_key).await?; + + self.key_info_result(imported_key).await + } + + async fn key_info_result(&self, key: Key) -> Result { + let mut relevant_buckets = HashMap::new(); + + for (id, _) in key + .state + .as_option() + .unwrap() + .authorized_buckets + .items() + .iter() + { + if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? { + relevant_buckets.insert(*id, b); + } + } + + Ok(AdminRpc::KeyInfo(key, relevant_buckets)) + } +} diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs new file mode 100644 index 00000000..93f6dd08 --- /dev/null +++ b/src/garage/admin/mod.rs @@ -0,0 +1,537 @@ +mod block; +mod bucket; +mod key; + +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::formater::format_table_to_string; + +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::ring::PARTITION_BITS; +use garage_rpc::*; + +use garage_block::manager::BlockResyncErrorInfo; + +use garage_model::bucket_table::*; +use garage_model::garage::Garage; +use garage_model::helper::error::{Error, OkOrBadRequest}; +use garage_model::key_table::*; +use garage_model::migrate::Migrate; +use garage_model::s3::version_table::Version; + +use crate::cli::*; +use crate::repair::online::launch_online_repair; + +pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; + +#[derive(Debug, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] +pub enum AdminRpc { + BucketOperation(BucketOperation), + KeyOperation(KeyOperation), + LaunchRepair(RepairOpt), + Migrate(MigrateOpt), + Stats(StatsOpt), + Worker(WorkerOperation), + BlockOperation(BlockOperation), + + // Replies + Ok(String), + BucketList(Vec), + BucketInfo { + bucket: Bucket, + relevant_keys: HashMap, + counters: HashMap, + }, + KeyList(Vec<(String, String)>), + KeyInfo(Key, HashMap), + WorkerList( + HashMap, + WorkerListOpt, + ), + WorkerVars(Vec<(Uuid, String, String)>), + WorkerInfo(usize, garage_util::background::WorkerInfo), + BlockErrorList(Vec), + BlockInfo { + hash: Hash, + refcount: u64, + versions: Vec>, + }, +} + +impl Rpc for AdminRpc { + type Response = Result; +} + +pub struct AdminRpcHandler { + garage: Arc, + background: Arc, + endpoint: Arc>, +} + +impl AdminRpcHandler { + pub fn new(garage: Arc, background: Arc) -> Arc { + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { + garage, + background, + endpoint, + }); + admin.endpoint.set_handler(admin.clone()); + admin + } + + // ================ MIGRATION COMMANDS ==================== + + async fn handle_migrate(self: &Arc, opt: MigrateOpt) -> Result { + if !opt.yes { + return Err(Error::BadRequest( + "Please provide the --yes flag to initiate migration operation.".to_string(), + )); + } + + let m = Migrate { + garage: self.garage.clone(), + }; + match opt.what { + MigrateWhat::Buckets050 => m.migrate_buckets050().await, + }?; + Ok(AdminRpc::Ok("Migration successfull.".into())) + } + + // ================ REPAIR COMMANDS ==================== + + async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { + if !opt.yes { + return Err(Error::BadRequest( + "Please provide the --yes flag to initiate repair operations.".to_string(), + )); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + + let mut failures = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + let resp = self + .endpoint + .call( + &node, + AdminRpc::LaunchRepair(opt_to_send.clone()), + PRIO_NORMAL, + ) + .await; + if !matches!(resp, Ok(Ok(_))) { + failures.push(node); + } + } + if failures.is_empty() { + Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) + } else { + Err(Error::BadRequest(format!( + "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", + failures + ))) + } + } else { + launch_online_repair(&self.garage, &self.background, opt).await?; + Ok(AdminRpc::Ok(format!( + "Repair launched on {:?}", + self.garage.system.id + ))) + } + } + + // ================ STATS COMMANDS ==================== + + async fn handle_stats(&self, opt: StatsOpt) -> Result { + if opt.all_nodes { + let mut ret = String::new(); + let ring = self.garage.system.ring.borrow().clone(); + + for node in ring.layout.node_ids().iter() { + let mut opt = opt.clone(); + opt.all_nodes = false; + opt.skip_global = true; + + writeln!(&mut ret, "\n======================").unwrap(); + writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); + + let node_id = (*node).into(); + match self + .endpoint + .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) + .await + { + Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), + Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), + Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), + Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), + } + } + + writeln!(&mut ret, "\n======================").unwrap(); + write!( + &mut ret, + "Cluster statistics:\n\n{}", + self.gather_cluster_stats() + ) + .unwrap(); + + Ok(AdminRpc::Ok(ret)) + } else { + Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) + } + } + + fn gather_stats_local(&self, opt: StatsOpt) -> Result { + let mut ret = String::new(); + writeln!( + &mut ret, + "\nGarage version: {} [features: {}]\nRust compiler version: {}", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), + ) + .unwrap(); + + writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); + + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?); + table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + // Gather block manager statistics + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + let rc_len = if opt.detailed { + self.garage.block_manager.rc_len()?.to_string() + } else { + self.garage + .block_manager + .rc_fast_len()? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()) + }; + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); + writeln!( + &mut ret, + " resync queue length: {}", + self.garage.block_manager.resync.queue_len()? + ) + .unwrap(); + writeln!( + &mut ret, + " blocks with resync errors: {}", + self.garage.block_manager.resync.errors_len()? + ) + .unwrap(); + + if !opt.detailed { + writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap(); + } + + if !opt.skip_global { + write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); + } + + Ok(ret) + } + + fn gather_cluster_stats(&self) -> String { + let mut ret = String::new(); + + // Gather storage node and free space statistics + let layout = &self.garage.system.ring.borrow().layout; + let mut node_partition_count = HashMap::::new(); + for short_id in layout.ring_assignment_data.iter() { + let id = layout.node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = self + .garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + ret + } + + fn gather_table_stats( + &self, + t: &Arc>, + detailed: bool, + ) -> Result + where + F: TableSchema + 'static, + R: TableReplication + 'static, + { + let (data_len, mkl_len) = if detailed { + ( + t.data.store.len().map_err(GarageError::from)?.to_string(), + t.merkle_updater.merkle_tree_len()?.to_string(), + ) + } else { + ( + t.data + .store + .fast_len() + .map_err(GarageError::from)? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()), + t.merkle_updater + .merkle_tree_fast_len()? + .map(|x| x.to_string()) + .unwrap_or_else(|| "NC".into()), + ) + }; + + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) + } + + // ================ WORKER COMMANDS ==================== + + async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { + match cmd { + WorkerOperation::List { opt } => { + let workers = self.background.get_worker_info(); + Ok(AdminRpc::WorkerList(workers, *opt)) + } + WorkerOperation::Info { tid } => { + let info = self + .background + .get_worker_info() + .get(tid) + .ok_or_bad_request(format!("No worker with TID {}", tid))? + .clone(); + Ok(AdminRpc::WorkerInfo(*tid, info)) + } + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), + } + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), + } + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) + } + } +} + +#[async_trait] +impl EndpointHandler for AdminRpcHandler { + async fn handle( + self: &Arc, + message: &AdminRpc, + _from: NodeID, + ) -> Result { + match message { + AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, + AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, + AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, + AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, + m => Err(GarageError::unexpected_rpc_message(m).into()), + } + } +} -- cgit v1.2.3 From 1e466b11eb9a3d5de2b8247fc6b635f9278bc3ac Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Jun 2023 13:23:08 +0200 Subject: Revert integration tests to using Sled as LMDB causes failures --- src/garage/tests/common/garage.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/garage') diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index 3beed7c4..cb1b7ea5 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -52,6 +52,7 @@ impl Instance { r#" metadata_dir = "{path}/meta" data_dir = "{path}/data" +db_engine = "sled" replication_mode = "1" -- cgit v1.2.3 From 87be8eeb930f37e7ebc23037eecf7f79f173434a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 16:17:40 +0200 Subject: updaet block admin for new multipartupload models --- src/garage/admin/block.rs | 113 ++++++++++++++++++++++++++++++---------------- src/garage/admin/mod.rs | 2 + src/garage/cli/cmd.rs | 3 +- src/garage/cli/util.rs | 43 +++++++++++++----- 4 files changed, 109 insertions(+), 52 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index e9e3ff96..2d84b5cf 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -34,6 +34,7 @@ impl AdminRpcHandler { .get_range(&hash, None, None, 10000, Default::default()) .await?; let mut versions = vec![]; + let mut uploads = vec![]; for br in block_refs { if let Some(v) = self .garage @@ -41,6 +42,11 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { + if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { + if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + uploads.push(u); + } + } versions.push(Ok(v)); } else { versions.push(Err(br.version)); @@ -50,6 +56,7 @@ impl AdminRpcHandler { hash, refcount, versions, + uploads, }) } @@ -93,6 +100,7 @@ impl AdminRpcHandler { } let mut obj_dels = 0; + let mut mpu_dels = 0; let mut ver_dels = 0; for hash in blocks { @@ -105,56 +113,81 @@ impl AdminRpcHandler { .await?; for br in block_refs { - let version = match self + if let Some(version) = self .garage .version_table .get(&br.version, &EmptyKey) .await? { - Some(v) => v, - None => continue, - }; + self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } - } - } + if !version.deleted.get() { + let deleted_version = + Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.bucket_id, version.key.clone(), true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), + ver_dels, obj_dels, - ver_dels + mpu_dels, ))) + } + + async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object{bucket_id, key} => { + (*bucket_id, key.clone(), version.uuid) + } + VersionBacklink::MultipartUpload{upload_id} => { + if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self + .garage + .object_table + .get(&bucket_id, &key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) } + } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 93f6dd08..07b0012d 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,6 +27,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -66,6 +67,7 @@ pub enum AdminRpc { hash: Hash, refcount: u64, versions: Vec>, + uploads: Vec, }, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 905b14d3..fb77a927 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -215,8 +215,9 @@ pub async fn cmd_admin( hash, refcount, versions, + uploads, } => { - print_block_info(hash, refcount, versions); + print_block_info(hash, refcount, versions, uploads); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2c6be2f4..22a3442d 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -use garage_model::s3::version_table::Version; +use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -385,29 +386,49 @@ pub fn print_block_error_list(el: Vec) { format_table(table); } -pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec>) { +pub fn print_block_info( + hash: Hash, + refcount: u64, + versions: Vec>, + uploads: Vec, +) { println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { Ok(ver) => { - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}", - ver.uuid, - ver.bucket_id, - ver.key, - ver.deleted.get() - )); + match &ver.backlink { + VersionBacklink::Object { bucket_id, key } => { + table.push(format!( + "{:?}\t{:?}\t{}\t\t{:?}", + ver.uuid, + bucket_id, + key, + ver.deleted.get() + )); + } + VersionBacklink::MultipartUpload { upload_id } => { + let upload = uploads.iter().find(|x| x.upload_id == *upload_id); + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}\t{:?}", + ver.uuid, + upload.map(|u| u.bucket_id).unwrap_or_default(), + upload.map(|u| u.key.as_str()).unwrap_or_default(), + upload_id, + ver.deleted.get() + )); + } + } if !ver.deleted.get() { nondeleted_count += 1; } } Err(vh) => { - table.push(format!("{:?}\t\t\tyes", vh)); + table.push(format!("{:?}\t\t\t\tyes", vh)); } } } -- cgit v1.2.3 From bb176ebcb87ea77b07480b696c9e87be92136c70 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 16:43:36 +0200 Subject: cargo fmt --- src/garage/admin/block.rs | 123 +++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 62 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index 2d84b5cf..6c2649e0 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -119,75 +119,74 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { - self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; - - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } - } + self.handle_block_purge_version_backlink( + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; + + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } Ok(AdminRpc::Ok(format!( "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), ver_dels, obj_dels, - mpu_dels, + mpu_dels, ))) - } - - async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> { - let (bucket_id, key, ov_id) = match &version.backlink { - VersionBacklink::Object{bucket_id, key} => { - (*bucket_id, key.clone(), version.uuid) - } - VersionBacklink::MultipartUpload{upload_id} => { - if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { - if !mpu.deleted.get() { - mpu.parts.clear(); - mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; - *mpu_dels += 1; - } - (mpu.bucket_id, mpu.key.clone(), *upload_id) - } else { - return Ok(()); - } - } - }; - - if let Some(object) = self - .garage - .object_table - .get(&bucket_id, &key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == ov_id { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - bucket_id, - key, - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - *obj_dels += 1; - } - } - } - - Ok(()) } + async fn handle_block_purge_version_backlink( + &self, + version: &Version, + obj_dels: &mut usize, + mpu_dels: &mut usize, + ) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) + } } -- cgit v1.2.3 From 75a0e013725f984077a6d0fe85138afee82cebcc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 19:21:35 +0200 Subject: fix online repair --- src/garage/repair/online.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) (limited to 'src/garage') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 0e14ed51..9b6b3cad 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -107,28 +107,29 @@ impl Worker for RepairVersionsWorker { let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, + let version_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => { + let object = self.garage.object_table.get(&bucket_id, &key).await?; + match object { + Some(o) => o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }), + None => false, + } + } + VersionBacklink::MultipartUpload { upload_id } => { + let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + match mpu { + Some(u) => !u.deleted.get(), + None => false, + } + } }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) + .insert(&Version::new(version.uuid, version.backlink, true)) .await?; } } -- cgit v1.2.3 From 8644376ac2dd8015e9212c19f30df63811426e1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:09:52 +0200 Subject: fix test; simplify code --- src/garage/tests/s3/multipart.rs | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) (limited to 'src/garage') diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..ee1373cc 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -65,7 +65,8 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 1); - let fp = ps.iter().find(|x| x.part_number == 2).unwrap(); + assert_eq!(ps[0].part_number, 2); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), @@ -100,13 +101,24 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 2); - let fp = ps.iter().find(|x| x.part_number == 1).unwrap(); + + assert_eq!(ps[0].part_number, 1); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" ); assert_eq!(fp.size, SZ_5MB as i64); + + assert_eq!(ps[1].part_number, 2); + let fp = &ps[1]; + assert!(fp.last_modified.is_some()); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + assert_eq!(fp.size, SZ_5MB as i64); } { @@ -123,12 +135,19 @@ async fn test_uploadlistpart() { .unwrap(); assert!(r.part_number_marker.is_none()); - assert!(r.next_part_number_marker.is_some()); + assert_eq!(r.next_part_number_marker.as_deref(), Some("1")); assert_eq!(r.max_parts, 1_i32); assert!(r.is_truncated); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r.parts.unwrap().len(), 1); + let parts = r.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 1); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3c484266f9315485694556e6c693bfa2\"" + ); let r2 = ctx .client @@ -147,10 +166,18 @@ async fn test_uploadlistpart() { r.next_part_number_marker.as_ref().unwrap() ); assert_eq!(r2.max_parts, 1_i32); - assert!(r2.is_truncated); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r2.parts.unwrap().len(), 1); + let parts = r2.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 2); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + //assert!(r2.is_truncated); // WHY? (this was the test before) + assert!(!r2.is_truncated); } let cmp = CompletedMultipartUpload::builder() -- cgit v1.2.3 From 058518c22b701d5d2dc3e838518d88ce9a4cc875 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:36:48 +0200 Subject: refactor repair workers with a trait --- src/garage/repair/online.rs | 149 ++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 68 deletions(-) (limited to 'src/garage') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9b6b3cad..6d8a91fe 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,15 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +36,11 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,49 +71,100 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table; + + async fn process( + &mut self, + garage: &Garage, + entry: <::T as TableSchema>::E, + ) -> Result; +} + +struct TableRepairWorker { garage: Arc, pos: Vec, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc) -> Self { +impl TableRepairWorker { + fn new(garage: Arc, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl Worker for TableRepairWorker { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; + let entry = ::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let version_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => { - let object = self.garage.object_table.get(&bucket_id, &key).await?; + let object = garage.object_table.get(&bucket_id, &key).await?; match object { Some(o) => o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted @@ -118,7 +173,7 @@ impl Worker for RepairVersionsWorker { } } VersionBacklink::MultipartUpload { upload_id } => { - let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; + let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; match mpu { Some(u) => !u.deleted.get(), None => false, @@ -127,69 +182,33 @@ impl Worker for RepairVersionsWorker { }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); - self.garage + garage .version_table .insert(&Version::new(version.uuid, version.backlink, true)) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } // ---- -struct RepairBlockrefsWorker { - garage: Arc, - pos: Vec, - counter: usize, -} - -impl RepairBlockrefsWorker { - fn new(garage: Arc) -> Self { - Self { - garage, - pos: vec![], - counter: 0, - } - } -} +struct RepairBlockRefs; #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = self - .garage + let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; @@ -200,7 +219,7 @@ impl Worker for RepairBlockrefsWorker { "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage + garage .block_ref_table .insert(&BlockRef { block: block_ref.block, @@ -208,16 +227,10 @@ impl Worker for RepairBlockrefsWorker { deleted: true.into(), }) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } } -- cgit v1.2.3 From 4ea53dc75930d813b84b79c3427b194b6e664ce7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:45:44 +0200 Subject: Add multipart upload repair --- src/garage/cli/structs.rs | 3 ++ src/garage/repair/online.rs | 102 +++++++++++++++++++++++++++++++------------- 2 files changed, 75 insertions(+), 30 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 986592ae..6444d374 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -452,6 +452,9 @@ pub enum RepairWhat { /// Only redo the propagation of object deletions to the version table (slow) #[structopt(name = "versions", version = garage_version())] Versions, + /// Only redo the propagation of object deletions to the multipart upload table (slow) + #[structopt(name = "mpu", version = garage_version())] + MultipartUploads, /// Only redo the propagation of version deletions to the block ref table (extremely slow) #[structopt(name = "block_refs", version = garage_version())] BlockRefs, diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6d8a91fe..9de6166d 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -8,6 +8,7 @@ use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -38,6 +39,10 @@ pub async fn launch_online_repair( info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); @@ -162,25 +167,26 @@ impl TableRepair for RepairVersions { async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(&bucket_id, &key) + .await? + .map(|o| { + o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(&upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), }; - if !version_exists { + + if !ref_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table @@ -206,27 +212,63 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table { + &garage.mpu_table + } + + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; return Ok(true); } } -- cgit v1.2.3 From 511e07ecd489fa72040171fe908323873a57ac19 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 11:49:23 +0200 Subject: fix mpu counter (add missing workers) and report info at appropriate places --- src/garage/admin/bucket.rs | 10 ++++++++++ src/garage/admin/mod.rs | 1 + src/garage/cli/cmd.rs | 3 ++- src/garage/cli/structs.rs | 12 ++++++------ src/garage/cli/util.rs | 25 +++++++++++++++++-------- 5 files changed, 36 insertions(+), 15 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 11bb8730..0781cb8b 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -73,6 +73,15 @@ impl AdminRpcHandler { .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) .unwrap_or_default(); + let mpu_counters = self + .garage + .mpu_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -112,6 +121,7 @@ impl AdminRpcHandler { bucket, relevant_keys, counters, + mpu_counters, }) } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 07b0012d..33c21eba 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -53,6 +53,7 @@ pub enum AdminRpc { bucket: Bucket, relevant_keys: HashMap, counters: HashMap, + mpu_counters: HashMap, }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap), diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index fb77a927..045f050c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -190,8 +190,9 @@ pub async fn cmd_admin( bucket, relevant_keys, counters, + mpu_counters, } => { - print_bucket_info(&bucket, &relevant_keys, &counters); + print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 6444d374..5dc99a0d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -443,22 +443,22 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { - /// Only do a full sync of metadata tables + /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] Tables, - /// Only repair (resync/rebalance) the set of stored blocks + /// Repair (resync/rebalance) the set of stored blocks #[structopt(name = "blocks", version = garage_version())] Blocks, - /// Only redo the propagation of object deletions to the version table (slow) + /// Repropagate object deletions to the version table #[structopt(name = "versions", version = garage_version())] Versions, - /// Only redo the propagation of object deletions to the multipart upload table (slow) + /// Repropagate object deletions to the multipart upload table #[structopt(name = "mpu", version = garage_version())] MultipartUploads, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) + /// Repropagate version deletions to the block ref table #[structopt(name = "block_refs", version = garage_version())] BlockRefs, - /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { #[structopt(subcommand)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 22a3442d..d87f9eab 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,8 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; +use garage_model::s3::mpu_table::{self, MultipartUpload}; +use garage_model::s3::object_table; use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -136,6 +136,7 @@ pub fn print_bucket_info( bucket: &Bucket, relevant_keys: &HashMap, counters: &HashMap, + mpu_counters: &HashMap, ) { let key_name = |k| { relevant_keys @@ -149,7 +150,7 @@ pub fn print_bucket_info( Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { let size = - bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64); println!( "\nSize: {} ({})", size.to_string_as(true), @@ -157,14 +158,22 @@ pub fn print_bucket_info( ); println!( "Objects: {}", - counters.get(OBJECTS).cloned().unwrap_or_default() + *counters.get(object_table::OBJECTS).unwrap_or(&0) + ); + println!( + "Unfinished uploads (multipart and non-multipart): {}", + *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0) ); println!( "Unfinished multipart uploads: {}", - counters - .get(UNFINISHED_UPLOADS) - .cloned() - .unwrap_or_default() + *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0) + ); + let mpu_size = + bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64); + println!( + "Size of unfinished multipart uploads: {} ({})", + mpu_size.to_string_as(true), + mpu_size.to_string_as(false), ); println!("\nWebsite access: {}", p.website_config.get().is_some()); -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/garage/admin/block.rs | 2 +- src/garage/repair/online.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index 6c2649e0..c4a45738 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -153,7 +153,7 @@ impl AdminRpcHandler { let (bucket_id, key, ov_id) = match &version.backlink { VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { + if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { if !mpu.deleted.get() { mpu.parts.clear(); mpu.deleted.set(); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9de6166d..abfaf9f9 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -170,7 +170,7 @@ impl TableRepair for RepairVersions { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage .object_table - .get(&bucket_id, &key) + .get(bucket_id, key) .await? .map(|o| { o.versions().iter().any(|x| { @@ -180,7 +180,7 @@ impl TableRepair for RepairVersions { .unwrap_or(false), VersionBacklink::MultipartUpload { upload_id } => garage .mpu_table - .get(&upload_id, &EmptyKey) + .get(upload_id, &EmptyKey) .await? .map(|u| !u.deleted.get()) .unwrap_or(false), -- cgit v1.2.3 From c14d3735e5514c395a691a2ab4bb93aef57035e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 9 May 2023 13:02:39 +0200 Subject: Add test for multipart uploads and fix part renumbering --- src/garage/tests/s3/multipart.rs | 192 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 188 insertions(+), 4 deletions(-) (limited to 'src/garage') diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index ee1373cc..8ae6b66e 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -5,6 +5,190 @@ use aws_sdk_s3::types::ByteStream; const SZ_5MB: usize = 5 * 1024 * 1024; const SZ_10MB: usize = 10 * 1024 * 1024; +#[tokio::test] +async fn test_multipart_upload() { + let ctx = common::context(); + let bucket = ctx.create_bucket("testmpu"); + + let u1 = vec![0x11; SZ_5MB]; + let u2 = vec![0x22; SZ_5MB]; + let u3 = vec![0x33; SZ_5MB]; + let u4 = vec![0x44; SZ_5MB]; + let u5 = vec![0x55; SZ_5MB]; + + let up = ctx + .client + .create_multipart_upload() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + assert!(up.upload_id.is_some()); + + let uid = up.upload_id.as_ref().unwrap(); + + let p3 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(3) + .body(ByteStream::from(u3.clone())) + .send() + .await + .unwrap(); + + let _p1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u1)) + .send() + .await + .unwrap(); + + let _p4 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(4) + .body(ByteStream::from(u4)) + .send() + .await + .unwrap(); + + let p1bis = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u2.clone())) + .send() + .await + .unwrap(); + + let p6 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(6) + .body(ByteStream::from(u5.clone())) + .send() + .await + .unwrap(); + + { + let r = ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .unwrap(); + assert_eq!(r.parts.unwrap().len(), 4); + } + + let cmp = CompletedMultipartUpload::builder() + .parts( + CompletedPart::builder() + .part_number(1) + .e_tag(p1bis.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(3) + .e_tag(p3.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(6) + .e_tag(p6.e_tag.unwrap()) + .build(), + ) + .build(); + + ctx.client + .complete_multipart_upload() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .multipart_upload(cmp) + .send() + .await + .unwrap(); + + // The multipart upload must not appear anymore + assert!(ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .is_err()); + + { + // The object must appear as a regular object + let r = ctx + .client + .head_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_eq!(r.content_length, (SZ_5MB * 3) as i64); + } + + { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat()); + } + + { + for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .part_number(part_number) + .send() + .await + .unwrap(); + + eprintln!("get_object with part_number = {}", part_number); + assert_eq!(o.content_length, SZ_5MB as i64); + assert_bytes_eq!(o.body, data); + } + } +} + #[tokio::test] async fn test_uploadlistpart() { let ctx = common::context(); @@ -112,13 +296,13 @@ async fn test_uploadlistpart() { assert_eq!(fp.size, SZ_5MB as i64); assert_eq!(ps[1].part_number, 2); - let fp = &ps[1]; - assert!(fp.last_modified.is_some()); + let sp = &ps[1]; + assert!(sp.last_modified.is_some()); assert_eq!( - fp.e_tag.as_ref().unwrap(), + sp.e_tag.as_ref().unwrap(), "\"3366bb9dcf710d6801b5926467d02e19\"" ); - assert_eq!(fp.size, SZ_5MB as i64); + assert_eq!(sp.size, SZ_5MB as i64); } { -- cgit v1.2.3 From 7126f3e1d1fa7238788c23d01ec9f67b132e5f50 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 15:56:48 +0200 Subject: garage key import: add checks and `--yes` CLI flag (fix #278) --- src/garage/admin/key.rs | 15 +++++++++++++++ src/garage/cli/structs.rs | 4 ++++ 2 files changed, 19 insertions(+) (limited to 'src/garage') diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs index a4064539..8a1c02af 100644 --- a/src/garage/admin/key.rs +++ b/src/garage/admin/key.rs @@ -118,10 +118,25 @@ impl AdminRpcHandler { } async fn handle_import_key(&self, query: &KeyImportOpt) -> Result { + if !query.yes { + return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); + } + + if query.key_id.len() != 26 + || &query.key_id[..2] != "GK" + || hex::decode(&query.key_id[2..]).is_err() + { + return Err(Error::BadRequest(format!("The specified key ID is not a valid Garage key ID (starts with `GK`, followed by 12 hex-encoded bytes)"))); + } + if query.secret_key.len() != 64 || hex::decode(&query.secret_key).is_err() { + return Err(Error::BadRequest(format!("The specified secret key is not a valid Garage secret key (composed of 32 hex-encoded bytes)"))); + } + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; if prev_key.is_some() { return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); self.garage.key_table.insert(&imported_key).await?; diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 5dc99a0d..2547fb8d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -408,6 +408,10 @@ pub struct KeyImportOpt { /// Key name #[structopt(short = "n", default_value = "Imported key")] pub name: String, + + /// Confirm key import + #[structopt(long = "yes")] + pub yes: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -- cgit v1.2.3 From 7895f99d3afc6e97f62f52abe06a6ee8d0f0617f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Jun 2023 16:56:15 +0200 Subject: admin and cli: hide secret keys unless asked --- src/garage/admin/key.rs | 9 +++++++-- src/garage/cli/structs.rs | 7 +++++-- 2 files changed, 12 insertions(+), 4 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs index 8a1c02af..908986fa 100644 --- a/src/garage/admin/key.rs +++ b/src/garage/admin/key.rs @@ -41,12 +41,17 @@ impl AdminRpcHandler { Ok(AdminRpc::KeyList(key_ids)) } - async fn handle_key_info(&self, query: &KeyOpt) -> Result { - let key = self + async fn handle_key_info(&self, query: &KeyInfoOpt) -> Result { + let mut key = self .garage .key_helper() .get_existing_matching_key(&query.key_pattern) .await?; + + if !query.show_secret { + key.state.as_option_mut().unwrap().secret_key = "(redacted)".into(); + } + self.key_info_result(key).await } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 2547fb8d..05d2ea31 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -328,7 +328,7 @@ pub enum KeyOperation { /// Get key info #[structopt(name = "info", version = garage_version())] - Info(KeyOpt), + Info(KeyInfoOpt), /// Create new key #[structopt(name = "create", version = garage_version())] @@ -356,9 +356,12 @@ pub enum KeyOperation { } #[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyOpt { +pub struct KeyInfoOpt { /// ID or name of the key pub key_pattern: String, + /// Whether to display the secret key + #[structopt(long = "show-secret")] + pub show_secret: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug)] -- cgit v1.2.3 From 8ef42c9609bcefc642cc9739acb921dffba49b89 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Jun 2023 17:13:41 +0200 Subject: admin docs: reformatting, key admin: add check --- src/garage/admin/key.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) (limited to 'src/garage') diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs index 908986fa..1c92670c 100644 --- a/src/garage/admin/key.rs +++ b/src/garage/admin/key.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use garage_table::*; -use garage_model::helper::error::Error; +use garage_model::helper::error::*; use garage_model::key_table::*; use crate::cli::*; @@ -127,22 +127,13 @@ impl AdminRpcHandler { return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); } - if query.key_id.len() != 26 - || &query.key_id[..2] != "GK" - || hex::decode(&query.key_id[2..]).is_err() - { - return Err(Error::BadRequest(format!("The specified key ID is not a valid Garage key ID (starts with `GK`, followed by 12 hex-encoded bytes)"))); - } - if query.secret_key.len() != 64 || hex::decode(&query.secret_key).is_err() { - return Err(Error::BadRequest(format!("The specified secret key is not a valid Garage secret key (composed of 32 hex-encoded bytes)"))); - } - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; if prev_key.is_some() { return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); } - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name) + .ok_or_bad_request("Invalid key format")?; self.garage.key_table.insert(&imported_key).await?; self.key_info_result(imported_key).await -- cgit v1.2.3 From 6b008b5bd3843bb236f94a1b4472de11f5755f04 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:44:11 +0200 Subject: block manager: add rebalance operation to rebalance multi-hdd setups --- src/garage/cli/structs.rs | 3 +++ src/garage/repair/online.rs | 6 ++++++ 2 files changed, 9 insertions(+) (limited to 'src/garage') diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 9ca4a059..fd37a24e 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -471,6 +471,9 @@ pub enum RepairWhat { #[structopt(subcommand)] cmd: ScrubCmd, }, + /// Rebalance data blocks among storage locations + #[structopt(name = "rebalance", version = garage_version())] + Rebalance, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index abfaf9f9..9e4de873 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -70,6 +70,12 @@ pub async fn launch_online_repair( info!("Sending command to scrub worker: {:?}", cmd); garage.block_manager.send_scrub_command(cmd).await?; } + RepairWhat::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } } Ok(()) } -- cgit v1.2.3 From 2e229d44303bfafa22aaf0d4aa299021a937220e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 12 Sep 2023 17:24:51 +0200 Subject: new layout: improve output display --- src/garage/cli/layout.rs | 89 +++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 51 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 3932f115..9bb90309 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -174,16 +174,12 @@ pub async fn cmd_show_layout( let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== CURRENT CLUSTER LAYOUT ===="); - if !print_cluster_layout(&layout) { - println!("No nodes currently have a role in the cluster."); - println!("See `garage status` to view available nodes."); - } + print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); println!(); println!("Current cluster layout version: {}", layout.version); let has_role_changes = print_staging_role_changes(&layout); - let has_param_changes = print_staging_parameters_changes(&layout); - if has_role_changes || has_param_changes { + if has_role_changes { let v = layout.version; let res_apply = layout.apply_staged_changes(Some(v + 1)); @@ -193,9 +189,7 @@ pub async fn cmd_show_layout( Ok((layout, msg)) => { println!(); println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); - if !print_cluster_layout(&layout) { - println!("No nodes have a role in the new layout."); - } + print_cluster_layout(&layout, "No nodes have a role in the new layout."); println!(); for line in msg.iter() { @@ -326,7 +320,7 @@ pub async fn send_layout( Ok(()) } -pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { +pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; for (id, _, role) in layout.roles.items().iter() { let role = match &role.0 { @@ -356,61 +350,54 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { )); }; } - println!(); - println!("Parameters of the layout computation:"); - println!("Zone redundancy: {}", layout.parameters.zone_redundancy); - println!(); - if table.len() == 1 { - false - } else { + if table.len() > 1 { format_table(table); - true - } -} - -pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { - let has_changes = *layout.staging_parameters.get() != layout.parameters; - if has_changes { - println!(); - println!("==== NEW LAYOUT PARAMETERS ===="); - println!( - "Zone redundancy: {}", - layout.staging_parameters.get().zone_redundancy - ); - println!(); + } else { + println!("{}", empty_msg); } - has_changes + println!(); + println!("Zone redundancy: {}", layout.parameters.zone_redundancy); } pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { - let has_changes = layout + let has_role_changes = layout .staging_roles .items() .iter() .any(|(k, _, v)| layout.roles.get(k) != Some(v)); + let has_layout_changes = *layout.staging_parameters.get() != layout.parameters; - if has_changes { + if has_role_changes || has_layout_changes { println!(); println!("==== STAGED ROLE CHANGES ===="); - let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in layout.staging_roles.items().iter() { - if layout.roles.get(id) == Some(role) { - continue; - } - if let Some(role) = &role.0 { - let tags = role.tags.join(","); - table.push(format!( - "{:?}\t{}\t{}\t{}", - id, - tags, - role.zone, - role.capacity_string() - )); - } else { - table.push(format!("{:?}\tREMOVED", id)); + if has_role_changes { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for (id, _, role) in layout.staging_roles.items().iter() { + if layout.roles.get(id) == Some(role) { + continue; + } + if let Some(role) = &role.0 { + let tags = role.tags.join(","); + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string() + )); + } else { + table.push(format!("{:?}\tREMOVED", id)); + } } + format_table(table); + println!(); + } + if has_layout_changes { + println!( + "Zone redundancy: {}", + layout.staging_parameters.get().zone_redundancy + ); } - format_table(table); true } else { false -- cgit v1.2.3 From 015ccb39aa511c72d0c899713a828491871da3e7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Sep 2023 11:57:36 +0200 Subject: new layout: make zone_redundancy optionnal (if not set, is maximum) --- src/garage/cli/layout.rs | 37 ++++++++++++++++++++++--------------- src/garage/cli/structs.rs | 4 ++-- 2 files changed, 24 insertions(+), 17 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 9bb90309..557549e2 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -261,28 +261,35 @@ pub async fn cmd_config_layout( let mut did_something = false; match config_opt.redundancy { None => (), - Some(r) => { - if r > layout.replication_factor { - println!( - "The zone redundancy must be smaller or equal to the \ - replication factor ({}).", - layout.replication_factor - ); - } else if r < 1 { - println!("The zone redundancy must be at least 1."); - } else { - layout - .staging_parameters - .update(LayoutParameters { zone_redundancy: r }); - println!("The new zone redundancy has been saved ({}).", r); + Some(r_str) => { + let r = r_str + .parse::() + .ok_or_message("invalid zone redundancy value")?; + if let ZoneRedundancy::AtLeast(r_int) = r { + if r_int > layout.replication_factor { + return Err(Error::Message(format!( + "The zone redundancy must be smaller or equal to the \ + replication factor ({}).", + layout.replication_factor + ))); + } else if r_int < 1 { + return Err(Error::Message( + "The zone redundancy must be at least 1.".into(), + )); + } } + + layout + .staging_parameters + .update(LayoutParameters { zone_redundancy: r }); + println!("The new zone redundancy has been saved ({}).", r); did_something = true; } } if !did_something { return Err(Error::Message( - "Please specify an action for `garage layout config` to do".into(), + "Please specify an action for `garage layout config`".into(), )); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index fd37a24e..c4ebeb1a 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -143,9 +143,9 @@ pub struct RemoveRoleOpt { #[derive(StructOpt, Debug)] pub struct ConfigLayoutOpt { - /// Zone redundancy parameter + /// Zone redundancy parameter ('none'/'max' or integer) #[structopt(short = "r", long = "redundancy")] - pub(crate) redundancy: Option, + pub(crate) redundancy: Option, } #[derive(StructOpt, Debug)] -- cgit v1.2.3 From 749b4865d0a26c600fef79ab0456c827faafb9e8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Sep 2023 12:07:45 +0200 Subject: new layout: improve display and fix comments --- src/garage/cli/layout.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 557549e2..ce2b11e0 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -282,7 +282,7 @@ pub async fn cmd_config_layout( layout .staging_parameters .update(LayoutParameters { zone_redundancy: r }); - println!("The new zone redundancy has been saved ({}).", r); + println!("The zone redundancy parameter has been set to '{}'.", r); did_something = true; } } @@ -359,11 +359,11 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { } if table.len() > 1 { format_table(table); + println!(); + println!("Zone redundancy: {}", layout.parameters.zone_redundancy); } else { println!("{}", empty_msg); } - println!(); - println!("Zone redundancy: {}", layout.parameters.zone_redundancy); } pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { -- cgit v1.2.3 From f97168f80567f43e15cf236092703e6ae5d8dc2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Sep 2023 15:32:25 +0200 Subject: garage_db: refactor transactions and add on_commit mechanism --- src/garage/tests/common/garage.rs | 2 +- src/garage/tests/k2v/item.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'src/garage') diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index cb1b7ea5..d1f0867a 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -52,7 +52,7 @@ impl Instance { r#" metadata_dir = "{path}/meta" data_dir = "{path}/data" -db_engine = "sled" +db_engine = "lmdb" replication_mode = "1" diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 25d9cce4..20add889 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -44,6 +44,7 @@ async fn test_items_and_indices() { let content = format!("{}: hello world", sk).into_bytes(); let content2 = format!("{}: hello universe", sk).into_bytes(); let content3 = format!("{}: concurrent value", sk).into_bytes(); + eprintln!("test iteration {}: {}", i, sk); // Put initially, no causality token let res = ctx @@ -89,7 +90,7 @@ async fn test_items_and_indices() { assert_eq!(res_body, content); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -158,7 +159,7 @@ async fn test_items_and_indices() { assert_eq!(res_body, content2); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -230,7 +231,7 @@ async fn test_items_and_indices() { ); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -299,7 +300,7 @@ async fn test_items_and_indices() { assert_eq!(res.status(), StatusCode::NO_CONTENT); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request -- cgit v1.2.3 From 897cbf2c27e4477dde51eee5b73fa3267b4c7098 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Sep 2023 13:12:41 +0200 Subject: actually update rmp-serde to 1.1.2 for both garage and netapp dependency (fix #629) --- src/garage/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 88729aaf..dab2fa3e 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -53,7 +53,7 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -netapp = "0.5" +netapp = "0.10" opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry-prometheus = { version = "0.10", optional = true } -- cgit v1.2.3 From 920dec393a80fae5112bc9dd082b8635316d92c3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Oct 2023 10:44:42 +0200 Subject: cli: more precise doc comment --- src/garage/cli/structs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/garage') diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index c4ebeb1a..651df42e 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -453,7 +453,7 @@ pub enum RepairWhat { /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] Tables, - /// Repair (resync/rebalance) the set of stored blocks + /// Repair (resync/rebalance) the set of stored blocks in the cluster #[structopt(name = "blocks", version = garage_version())] Blocks, /// Repropagate object deletions to the version table @@ -471,7 +471,7 @@ pub enum RepairWhat { #[structopt(subcommand)] cmd: ScrubCmd, }, - /// Rebalance data blocks among storage locations + /// Rebalance data blocks among HDDs on individual nodes #[structopt(name = "rebalance", version = garage_version())] Rebalance, } -- cgit v1.2.3 From 3d7892477d69cfd0a8d52653e3eb777194c38f3e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Oct 2023 14:06:25 +0200 Subject: convert_db: fix build --- src/garage/cli/convert_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/convert_db.rs index ffb5c44c..3c6ce69c 100644 --- a/src/garage/cli/convert_db.rs +++ b/src/garage/cli/convert_db.rs @@ -57,7 +57,7 @@ fn open_db(path: PathBuf, engine: String) -> Result { env_builder.max_dbs(100); env_builder.map_size(map_size); unsafe { - env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + env_builder.flag(lmdb_adapter::heed::flags::Flags::MdbNoMetaSync); } let db = env_builder.open(&path)?; Ok(lmdb_adapter::LmdbDb::init(db)) -- cgit v1.2.3 From 952c9570c494468643353ee1ae9052b510353665 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Oct 2023 14:08:11 +0200 Subject: bump version to v0.9.0 --- src/garage/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/garage') diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index dab2fa3e..7c3a79cb 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.8.4" +version = "0.9.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" -- cgit v1.2.3