aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin/bucket.rs4
-rw-r--r--src/garage/admin/mod.rs30
-rw-r--r--src/garage/cli/cmd.rs189
-rw-r--r--src/garage/cli/layout.rs230
-rw-r--r--src/garage/cli/structs.rs24
-rw-r--r--src/garage/cli/util.rs4
-rw-r--r--src/garage/secrets.rs6
-rw-r--r--src/garage/tests/common/ext/process.rs44
-rw-r--r--src/garage/tests/common/garage.rs4
-rw-r--r--src/garage/tests/s3/mod.rs1
-rw-r--r--src/garage/tests/s3/ssec.rs455
12 files changed, 830 insertions, 163 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index b022049c..00ecb35e 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
index 803b55bd..6b1190f8 100644
--- a/src/garage/admin/bucket.rs
+++ b/src/garage/admin/bucket.rs
@@ -70,7 +70,7 @@ impl AdminRpcHandler {
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mpu_counters = self
@@ -79,7 +79,7 @@ impl AdminRpcHandler {
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index b6f9c426..de7851e1 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -18,7 +18,7 @@ use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
-use garage_rpc::ring::PARTITION_BITS;
+use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
@@ -126,8 +126,8 @@ impl AdminRpcHandler {
opt_to_send.all_nodes = false;
let mut failures = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
@@ -163,9 +163,9 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let ring = self.garage.system.ring.borrow().clone();
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in ring.layout.node_ids().iter() {
+ for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
@@ -274,11 +274,11 @@ impl AdminRpcHandler {
fn gather_cluster_stats(&self) -> String {
let mut ret = String::new();
- // Gather storage node and free space statistics
- let layout = &self.garage.system.ring.borrow().layout;
+ // Gather storage node and free space statistics for current nodes
+ let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.ring_assignment_data.iter() {
- let id = layout.node_id_vec[*short_id as usize];
+ for short_id in layout.current().ring_assignment_data.iter() {
+ let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = self
@@ -293,8 +293,8 @@ impl AdminRpcHandler {
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
- let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
@@ -440,8 +440,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
@@ -488,8 +488,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 48359614..fb6dface 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use std::time::Duration;
use format_table::format_table;
@@ -49,50 +49,61 @@ pub async fn cli_command_dispatch(
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
+ let status = fetch_status(rpc_cli, rpc_host).await?;
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
let mut healthy_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
- match layout.roles.get(&adv.id) {
- Some(NodeRoleV(Some(cfg))) => {
- let data_avail = match &adv.status.data_disk_avail {
- _ if cfg.capacity.is_none() => "N/A".into(),
- Some((avail, total)) => {
- let pct = (*avail as f64) / (*total as f64) * 100.;
- let avail = bytesize::ByteSize::b(*avail);
- format!("{} ({:.1}%)", avail, pct)
- }
- None => "?".into(),
- };
+ let host = adv.status.hostname.as_deref().unwrap_or("?");
+ if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
+ let data_avail = match &adv.status.data_disk_avail {
+ _ if cfg.capacity.is_none() => "N/A".into(),
+ Some((avail, total)) => {
+ let pct = (*avail as f64) / (*total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(*avail);
+ format!("{} ({:.1}%)", avail, pct)
+ }
+ None => "?".into(),
+ };
+ healthy_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
+ id = adv.id,
+ host = host,
+ addr = adv.addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = cfg.capacity_string(),
+ data_avail = data_avail,
+ ));
+ } else {
+ let prev_role = layout
+ .versions
+ .iter()
+ .rev()
+ .find_map(|x| match x.roles.get(&adv.id) {
+ Some(NodeRoleV(Some(cfg))) => Some(cfg),
+ _ => None,
+ });
+ if let Some(cfg) = prev_role {
healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
id = adv.id,
- host = adv.status.hostname,
+ host = host,
addr = adv.addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
- capacity = cfg.capacity_string(),
- data_avail = data_avail,
));
- }
- _ => {
- let new_role = match layout.staging_roles.get(&adv.id) {
- Some(NodeRoleV(Some(_))) => "(pending)",
+ } else {
+ let new_role = match layout.staging.get().roles.get(&adv.id) {
+ Some(NodeRoleV(Some(_))) => "pending...",
_ => "NO ROLE ASSIGNED",
};
healthy_nodes.push(format!(
- "{id:?}\t{h}\t{addr}\t{new_role}",
+ "{id:?}\t{h}\t{addr}\t\t\t{new_role}",
id = adv.id,
- h = adv.status.hostname,
+ h = host,
addr = adv.addr,
new_role = new_role,
));
@@ -101,51 +112,76 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
}
format_table(healthy_nodes);
- let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
- let failure_case_1 = status
- .iter()
- .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_)))));
- let failure_case_2 = layout
- .roles
- .items()
+ // Determine which nodes are unhealthy and print that to stdout
+ let status_map = status
.iter()
- .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some());
- if failure_case_1 || failure_case_2 {
- println!("\n==== FAILED NODES ====");
- let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
- for adv in status.iter().filter(|adv| !adv.is_up) {
- if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
- let tf = timeago::Formatter::new();
- failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
- id = adv.id,
- host = adv.status.hostname,
- addr = adv.addr,
- tags = cfg.tags.join(","),
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- last_seen = adv
- .last_seen_secs_ago
- .map(|s| tf.convert(Duration::from_secs(s)))
- .unwrap_or_else(|| "never seen".into()),
- ));
+ .map(|adv| (adv.id, adv))
+ .collect::<HashMap<_, _>>();
+
+ let tf = timeago::Formatter::new();
+ let mut drain_msg = false;
+ let mut failed_nodes =
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
+ let mut listed = HashSet::new();
+ for ver in layout.versions.iter().rev() {
+ for (node, _, role) in ver.roles.items().iter() {
+ let cfg = match role {
+ NodeRoleV(Some(role)) if role.capacity.is_some() => role,
+ _ => continue,
+ };
+
+ if listed.contains(node) {
+ continue;
}
- }
- for (id, _, role_v) in layout.roles.items().iter() {
- if let NodeRoleV(Some(cfg)) = role_v {
- if !status_keys.contains(id) {
- failed_nodes.push(format!(
- "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen",
- id = id,
- tags = cfg.tags.join(","),
- zone = cfg.zone,
- capacity = cfg.capacity_string(),
- ));
- }
+ listed.insert(*node);
+
+ let adv = status_map.get(node);
+ if adv.map(|x| x.is_up).unwrap_or(false) {
+ continue;
}
+
+ // Node is in a layout version, is not a gateway node, and is not up:
+ // it is in a failed state, add proper line to the output
+ let (host, addr, last_seen) = match adv {
+ Some(adv) => (
+ adv.status.hostname.as_deref().unwrap_or("?"),
+ adv.addr.to_string(),
+ adv.last_seen_secs_ago
+ .map(|s| tf.convert(Duration::from_secs(s)))
+ .unwrap_or_else(|| "never seen".into()),
+ ),
+ None => ("??", "??".into(), "never seen".into()),
+ };
+ let capacity = if ver.version == layout.current().version {
+ cfg.capacity_string()
+ } else {
+ drain_msg = true;
+ "draining metadata...".to_string()
+ };
+ failed_nodes.push(format!(
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
+ id = node,
+ host = host,
+ addr = addr,
+ tags = cfg.tags.join(","),
+ zone = cfg.zone,
+ capacity = capacity,
+ last_seen = last_seen,
+ ));
}
+ }
+
+ if failed_nodes.len() > 1 {
+ println!("\n==== FAILED NODES ====");
format_table(failed_nodes);
+ if drain_msg {
+ println!();
+ println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
+ println!("If these nodes are definitely dead, please review the layout history with");
+ println!(
+ "`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
+ );
+ }
}
if print_staging_role_changes(&layout) {
@@ -226,3 +262,18 @@ pub async fn cmd_admin(
}
Ok(())
}
+
+// ---- utility ----
+
+pub async fn fetch_status(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<Vec<KnownNodeInfo>, Error> {
+ match rpc_cli
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
+ resp => Err(Error::unexpected_rpc_message(resp)),
+ }
+}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index ce2b11e0..f76e33c5 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -32,6 +32,10 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
}
+ LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
+ LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
+ cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
+ }
}
}
@@ -49,6 +53,7 @@ pub async fn cmd_assign_role(
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let all_nodes = layout.get_all_nodes();
let added_nodes = args
.node_ids
@@ -58,21 +63,23 @@ pub async fn cmd_assign_role(
status
.iter()
.map(|adv| adv.id)
- .chain(layout.node_ids().iter().cloned()),
+ .chain(all_nodes.iter().cloned()),
node_id,
)
})
.collect::<Result<Vec<_>, _>>()?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
- let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
+ let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -130,7 +137,9 @@ pub async fn cmd_assign_role(
};
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -149,14 +158,16 @@ pub async fn cmd_remove_role(
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -174,13 +185,16 @@ pub async fn cmd_show_layout(
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
- print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
+ print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
- println!("Current cluster layout version: {}", layout.version);
+ println!(
+ "Current cluster layout version: {}",
+ layout.current().version
+ );
let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes {
- let v = layout.version;
+ let v = layout.current().version;
let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
@@ -189,7 +203,7 @@ pub async fn cmd_show_layout(
Ok((layout, msg)) => {
println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- print_cluster_layout(&layout, "No nodes have a role in the new layout.");
+ print_cluster_layout(layout.current(), "No nodes have a role in the new layout.");
println!();
for line in msg.iter() {
@@ -199,16 +213,12 @@ pub async fn cmd_show_layout(
println!();
println!(" garage layout apply --version {}", v + 1);
println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
Err(e) => {
println!("Error while trying to compute the assignment: {}", e);
println!("This new layout cannot yet be applied.");
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- v + 1)
+ println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
@@ -241,9 +251,15 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
+ if !revert_opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to run the layout revert operation".into(),
+ ));
+ }
+
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.revert_staged_changes(revert_opt.version)?;
+ let layout = layout.revert_staged_changes()?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -266,11 +282,11 @@ pub async fn cmd_config_layout(
.parse::<ZoneRedundancy>()
.ok_or_message("invalid zone redundancy value")?;
if let ZoneRedundancy::AtLeast(r_int) = r {
- if r_int > layout.replication_factor {
+ if r_int > layout.current().replication_factor {
return Err(Error::Message(format!(
"The zone redundancy must be smaller or equal to the \
replication factor ({}).",
- layout.replication_factor
+ layout.current().replication_factor
)));
} else if r_int < 1 {
return Err(Error::Message(
@@ -280,7 +296,9 @@ pub async fn cmd_config_layout(
}
layout
- .staging_parameters
+ .staging
+ .get_mut()
+ .parameters
.update(LayoutParameters { zone_redundancy: r });
println!("The zone redundancy parameter has been set to '{}'.", r);
did_something = true;
@@ -297,25 +315,166 @@ pub async fn cmd_config_layout(
Ok(())
}
+pub async fn cmd_layout_history(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<(), Error> {
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let min_stored = layout.min_stored();
+
+ println!("==== LAYOUT HISTORY ====");
+ let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
+ for ver in layout
+ .versions
+ .iter()
+ .rev()
+ .chain(layout.old_versions.iter().rev())
+ {
+ let status = if ver.version == layout.current().version {
+ "current"
+ } else if ver.version >= min_stored {
+ "draining"
+ } else {
+ "historical"
+ };
+ table.push(format!(
+ "#{}\t{}\t{}\t{}",
+ ver.version,
+ status,
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
+ .count(),
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
+ .count(),
+ ));
+ }
+ format_table(table);
+ println!();
+
+ if layout.versions.len() > 1 {
+ println!("==== UPDATE TRACKERS ====");
+ println!("Several layout versions are currently live in the version, and data is being migrated.");
+ println!(
+ "This is the internal data that Garage stores to know which nodes have what data."
+ );
+ println!();
+ let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
+ let all_nodes = layout.get_all_nodes();
+ for node in all_nodes.iter() {
+ table.push(format!(
+ "{:?}\t#{}\t#{}\t#{}",
+ node,
+ layout.update_trackers.ack_map.get(node, min_stored),
+ layout.update_trackers.sync_map.get(node, min_stored),
+ layout.update_trackers.sync_ack_map.get(node, min_stored),
+ ));
+ }
+ table[1..].sort();
+ format_table(table);
+
+ println!();
+ println!(
+ "If some nodes are not catching up to the latest layout version in the update trackers,"
+ );
+ println!("it might be because they are offline or unable to complete a sync successfully.");
+ println!(
+ "You may force progress using `garage layout skip-dead-nodes --version {}`",
+ layout.current().version
+ );
+ } else {
+ println!("Your cluster is currently in a stable state with a single live layout version.");
+ println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
+ println!(
+ "so you might want to keep old nodes online until their data directories become empty."
+ );
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_layout_skip_dead_nodes(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ opt: SkipDeadNodesOpt,
+) -> Result<(), Error> {
+ let status = fetch_status(rpc_cli, rpc_host).await?;
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ if layout.versions.len() == 1 {
+ return Err(Error::Message(
+ "This command cannot be called when there is only one live cluster layout version"
+ .into(),
+ ));
+ }
+
+ let min_v = layout.min_stored();
+ if opt.version <= min_v || opt.version > layout.current().version {
+ return Err(Error::Message(format!(
+ "Invalid version, you may use the following version numbers: {}",
+ (min_v + 1..=layout.current().version)
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ .join(" ")
+ )));
+ }
+
+ let all_nodes = layout.get_all_nodes();
+ let mut did_something = false;
+ for node in all_nodes.iter() {
+ if status.iter().any(|x| x.id == *node && x.is_up) {
+ continue;
+ }
+
+ if layout.update_trackers.ack_map.set_max(*node, opt.version) {
+ println!("Increased the ACK tracker for node {:?}", node);
+ did_something = true;
+ }
+
+ if opt.allow_missing_data {
+ if layout.update_trackers.sync_map.set_max(*node, opt.version) {
+ println!("Increased the SYNC tracker for node {:?}", node);
+ did_something = true;
+ }
+ }
+ }
+
+ if did_something {
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ println!("Success.");
+ Ok(())
+ } else if !opt.allow_missing_data {
+ Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
+ } else {
+ Err(Error::Message(
+ "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
+ ))
+ }
+}
+
// --- utility ---
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
-) -> Result<ClusterLayout, Error> {
+) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
- resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
- layout: ClusterLayout,
+ layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
@@ -327,7 +486,7 @@ pub async fn send_layout(
Ok(())
}
-pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
+pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
@@ -366,21 +525,22 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
}
}
-pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- let has_role_changes = layout
- .staging_roles
+pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
+ let staging = layout.staging.get();
+ let has_role_changes = staging
+ .roles
.items()
.iter()
- .any(|(k, _, v)| layout.roles.get(k) != Some(v));
- let has_layout_changes = *layout.staging_parameters.get() != layout.parameters;
+ .any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
+ let has_layout_changes = *staging.parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging_roles.items().iter() {
- if layout.roles.get(id) == Some(role) {
+ for (id, _, role) in staging.roles.items().iter() {
+ if layout.current().roles.get(id) == Some(role) {
continue;
}
if let Some(role) = &role.0 {
@@ -402,7 +562,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
if has_layout_changes {
println!(
"Zone redundancy: {}",
- layout.staging_parameters.get().zone_redundancy
+ staging.parameters.get().zone_redundancy
);
}
true
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index be4d5bd6..40e47ee1 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -114,6 +114,14 @@ pub enum LayoutOperation {
/// Revert staged changes to cluster layout
#[structopt(name = "revert", version = garage_version())]
Revert(RevertLayoutOpt),
+
+ /// View the history of layouts in the cluster
+ #[structopt(name = "history", version = garage_version())]
+ History,
+
+ /// Skip dead nodes when awaiting for a new layout version to be synchronized
+ #[structopt(name = "skip-dead-nodes", version = garage_version())]
+ SkipDeadNodes(SkipDeadNodesOpt),
}
#[derive(StructOpt, Debug)]
@@ -166,9 +174,21 @@ pub struct ApplyLayoutOpt {
#[derive(StructOpt, Debug)]
pub struct RevertLayoutOpt {
- /// Version number of old configuration to which to revert
+ /// The revert operation will not be ran unless this flag is added
+ #[structopt(long = "yes")]
+ pub(crate) yes: bool,
+}
+
+#[derive(StructOpt, Debug)]
+pub struct SkipDeadNodesOpt {
+ /// Version number of the layout to assume is currently up-to-date.
+ /// This will generally be the current layout version.
#[structopt(long = "version")]
- pub(crate) version: Option<u64>,
+ pub(crate) version: u64,
+ /// Allow the skip even if a quorum of ndoes could not be found for
+ /// the data among the remaining nodes
+ #[structopt(long = "allow-missing-data")]
+ pub(crate) allow_missing_data: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 2232d395..0511e2b1 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -450,6 +450,8 @@ pub fn print_block_info(
if refcount != nondeleted_count {
println!();
- println!("Warning: refcount does not match number of non-deleted versions");
+ println!(
+ "Warning: refcount does not match number of non-deleted versions (see issue #644)."
+ );
}
}
diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs
index c3d704aa..8d2ff475 100644
--- a/src/garage/secrets.rs
+++ b/src/garage/secrets.rs
@@ -163,7 +163,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
@@ -185,7 +185,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
allow_world_readable_secrets = true
@@ -296,7 +296,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
diff --git a/src/garage/tests/common/ext/process.rs b/src/garage/tests/common/ext/process.rs
index ba533b6c..8e20bf7c 100644
--- a/src/garage/tests/common/ext/process.rs
+++ b/src/garage/tests/common/ext/process.rs
@@ -14,42 +14,20 @@ impl CommandExt for process::Command {
}
fn expect_success_status(&mut self, msg: &str) -> process::ExitStatus {
- let status = self.status().expect(msg);
- status.expect_success(msg);
- status
+ self.expect_success_output(msg).status
}
fn expect_success_output(&mut self, msg: &str) -> process::Output {
let output = self.output().expect(msg);
- output.expect_success(msg);
- output
- }
-}
-
-pub trait OutputExt {
- fn expect_success(&self, msg: &str);
-}
-
-impl OutputExt for process::Output {
- fn expect_success(&self, msg: &str) {
- self.status.expect_success(msg)
- }
-}
-
-pub trait ExitStatusExt {
- fn expect_success(&self, msg: &str);
-}
-
-impl ExitStatusExt for process::ExitStatus {
- fn expect_success(&self, msg: &str) {
- if !self.success() {
- match self.code() {
- Some(code) => panic!(
- "Command exited with code {code}: {msg}",
- code = code,
- msg = msg
- ),
- None => panic!("Command exited with signal: {msg}", msg = msg),
- }
+ if !output.status.success() {
+ panic!(
+ "{}: command {:?} exited with error {:?}\nSTDOUT: {}\nSTDERR: {}",
+ msg,
+ self,
+ output.status.code(),
+ String::from_utf8_lossy(&output.stdout),
+ String::from_utf8_lossy(&output.stderr)
+ );
}
+ output
}
}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index d1f0867a..f1c1efc8 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -54,7 +54,7 @@ metadata_dir = "{path}/meta"
data_dir = "{path}/data"
db_engine = "lmdb"
-replication_mode = "1"
+replication_factor = 1
rpc_bind_addr = "127.0.0.1:{rpc_port}"
rpc_public_addr = "127.0.0.1:{rpc_port}"
@@ -96,7 +96,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=info,garage_api=trace")
+ .env("RUST_LOG", "garage=debug,garage_api=trace")
.spawn()
.expect("Could not start garage");
diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs
index 4ebc4914..e75b1397 100644
--- a/src/garage/tests/s3/mod.rs
+++ b/src/garage/tests/s3/mod.rs
@@ -3,5 +3,6 @@ mod multipart;
mod objects;
mod presigned;
mod simple;
+mod ssec;
mod streaming_signature;
mod website;
diff --git a/src/garage/tests/s3/ssec.rs b/src/garage/tests/s3/ssec.rs
new file mode 100644
index 00000000..d8f11950
--- /dev/null
+++ b/src/garage/tests/s3/ssec.rs
@@ -0,0 +1,455 @@
+use crate::common::{self, Context};
+use aws_sdk_s3::primitives::ByteStream;
+use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
+
+const SSEC_KEY: &str = "u8zCfnEyt5Imo/krN+sxA1DQXxLWtPJavU6T6gOVj1Y=";
+const SSEC_KEY_MD5: &str = "jMGbs3GyZkYjJUP6q5jA7g==";
+const SSEC_KEY2: &str = "XkYVk4Z3vVDO2yJaUqCAEZX6lL10voMxtV06d8my/eU=";
+const SSEC_KEY2_MD5: &str = "kedo2ab8J1MCjHwJuLTJHw==";
+
+const SZ_2MB: usize = 2 * 1024 * 1024;
+
+#[tokio::test]
+async fn test_ssec_object() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("sse-c");
+
+ let bytes1 = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".to_vec();
+ let bytes2 = (0..400000)
+ .map(|x| ((x * 3792) % 256) as u8)
+ .collect::<Vec<u8>>();
+
+ for data in vec![bytes1, bytes2] {
+ let stream = ByteStream::new(data.clone().into());
+
+ // Write encrypted object
+ let r = ctx
+ .client
+ .put_object()
+ .bucket(&bucket)
+ .key("testobj")
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY)
+ .sse_customer_key_md5(SSEC_KEY_MD5)
+ .body(stream)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
+
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "testobj",
+ &data,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ )
+ .await;
+
+ // Test copy from encrypted to non-encrypted
+ let r = ctx
+ .client
+ .copy_object()
+ .bucket(&bucket)
+ .key("test-copy-enc-dec")
+ .copy_source(format!("{}/{}", bucket, "testobj"))
+ .copy_source_sse_customer_algorithm("AES256")
+ .copy_source_sse_customer_key(SSEC_KEY)
+ .copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.sse_customer_algorithm, None);
+ assert_eq!(r.sse_customer_key_md5, None);
+
+ // Test read decrypted file
+ let r = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key("test-copy-enc-dec")
+ .send()
+ .await
+ .unwrap();
+ assert_bytes_eq!(r.body, &data);
+ assert_eq!(r.sse_customer_algorithm, None);
+ assert_eq!(r.sse_customer_key_md5, None);
+
+ // Test copy from non-encrypted to encrypted
+ let r = ctx
+ .client
+ .copy_object()
+ .bucket(&bucket)
+ .key("test-copy-enc-dec-enc")
+ .copy_source(format!("{}/test-copy-enc-dec", bucket))
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY2_MD5.into()));
+
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "test-copy-enc-dec-enc",
+ &data,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ )
+ .await;
+
+ // Test copy from encrypted to encrypted with different keys
+ let r = ctx
+ .client
+ .copy_object()
+ .bucket(&bucket)
+ .key("test-copy-enc-enc")
+ .copy_source(format!("{}/{}", bucket, "testobj"))
+ .copy_source_sse_customer_algorithm("AES256")
+ .copy_source_sse_customer_key(SSEC_KEY)
+ .copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY2_MD5.into()));
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "test-copy-enc-enc",
+ &data,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ )
+ .await;
+
+ // Test copy from encrypted to encrypted with the same key
+ let r = ctx
+ .client
+ .copy_object()
+ .bucket(&bucket)
+ .key("test-copy-enc-enc-same")
+ .copy_source(format!("{}/{}", bucket, "testobj"))
+ .copy_source_sse_customer_algorithm("AES256")
+ .copy_source_sse_customer_key(SSEC_KEY)
+ .copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY)
+ .sse_customer_key_md5(SSEC_KEY_MD5)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "test-copy-enc-enc-same",
+ &data,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ )
+ .await;
+ }
+}
+
+#[tokio::test]
+async fn test_multipart_upload() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-ssec-mpu");
+
+ let u1 = vec![0x11; SZ_2MB];
+ let u2 = vec![0x22; SZ_2MB];
+ let u3 = vec![0x33; SZ_2MB];
+ let all = [&u1[..], &u2[..], &u3[..]].concat();
+
+ // Test simple encrypted mpu
+ {
+ let up = ctx
+ .client
+ .create_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY)
+ .sse_customer_key_md5(SSEC_KEY_MD5)
+ .send()
+ .await
+ .unwrap();
+ assert!(up.upload_id.is_some());
+ assert_eq!(up.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(up.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
+
+ let uid = up.upload_id.as_ref().unwrap();
+
+ let mut etags = vec![];
+ for (i, part) in vec![&u1, &u2, &u3].into_iter().enumerate() {
+ let pu = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number((i + 1) as i32)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY)
+ .sse_customer_key_md5(SSEC_KEY_MD5)
+ .body(ByteStream::from(part.to_vec()))
+ .send()
+ .await
+ .unwrap();
+ etags.push(pu.e_tag.unwrap());
+ }
+
+ let mut cmp = CompletedMultipartUpload::builder();
+ for (i, etag) in etags.into_iter().enumerate() {
+ cmp = cmp.parts(
+ CompletedPart::builder()
+ .part_number((i + 1) as i32)
+ .e_tag(etag)
+ .build(),
+ );
+ }
+
+ ctx.client
+ .complete_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .multipart_upload(cmp.build())
+ .send()
+ .await
+ .unwrap();
+
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "a",
+ &all,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ )
+ .await;
+ }
+
+ // Test upload part copy from first object
+ {
+ // (setup) Upload a single part object
+ ctx.client
+ .put_object()
+ .bucket(&bucket)
+ .key("b")
+ .body(ByteStream::from(u1.clone()))
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+
+ let up = ctx
+ .client
+ .create_multipart_upload()
+ .bucket(&bucket)
+ .key("target")
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+ let uid = up.upload_id.as_ref().unwrap();
+
+ let p1 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("target")
+ .upload_id(uid)
+ .part_number(1)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .body(ByteStream::from(u3.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let p2 = ctx
+ .client
+ .upload_part_copy()
+ .bucket(&bucket)
+ .key("target")
+ .upload_id(uid)
+ .part_number(2)
+ .copy_source(format!("{}/a", bucket))
+ .copy_source_range("bytes=500-550000")
+ .copy_source_sse_customer_algorithm("AES256")
+ .copy_source_sse_customer_key(SSEC_KEY)
+ .copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+
+ let p3 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("target")
+ .upload_id(uid)
+ .part_number(3)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .body(ByteStream::from(u2.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let p4 = ctx
+ .client
+ .upload_part_copy()
+ .bucket(&bucket)
+ .key("target")
+ .upload_id(uid)
+ .part_number(4)
+ .copy_source(format!("{}/b", bucket))
+ .copy_source_range("bytes=1500-20500")
+ .copy_source_sse_customer_algorithm("AES256")
+ .copy_source_sse_customer_key(SSEC_KEY2)
+ .copy_source_sse_customer_key_md5(SSEC_KEY2_MD5)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(SSEC_KEY2)
+ .sse_customer_key_md5(SSEC_KEY2_MD5)
+ .send()
+ .await
+ .unwrap();
+
+ let cmp = CompletedMultipartUpload::builder()
+ .parts(
+ CompletedPart::builder()
+ .part_number(1)
+ .e_tag(p1.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(2)
+ .e_tag(p2.copy_part_result.unwrap().e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(3)
+ .e_tag(p3.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(4)
+ .e_tag(p4.copy_part_result.unwrap().e_tag.unwrap())
+ .build(),
+ )
+ .build();
+
+ ctx.client
+ .complete_multipart_upload()
+ .bucket(&bucket)
+ .key("target")
+ .upload_id(uid)
+ .multipart_upload(cmp)
+ .send()
+ .await
+ .unwrap();
+
+ // (check) Get object
+ let expected = [&u3[..], &all[500..550001], &u2[..], &u1[1500..20501]].concat();
+ test_read_encrypted(
+ &ctx,
+ &bucket,
+ "target",
+ &expected,
+ SSEC_KEY2,
+ SSEC_KEY2_MD5,
+ SSEC_KEY,
+ SSEC_KEY_MD5,
+ )
+ .await;
+ }
+}
+
+async fn test_read_encrypted(
+ ctx: &Context,
+ bucket: &str,
+ obj_key: &str,
+ expected_data: &[u8],
+ enc_key: &str,
+ enc_key_md5: &str,
+ wrong_enc_key: &str,
+ wrong_enc_key_md5: &str,
+) {
+ // Test read encrypted without key
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(bucket)
+ .key(obj_key)
+ .send()
+ .await;
+ assert!(
+ o.is_err(),
+ "encrypted file could be read without encryption key"
+ );
+
+ // Test read encrypted with wrong key
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(bucket)
+ .key(obj_key)
+ .sse_customer_key(wrong_enc_key)
+ .sse_customer_key_md5(wrong_enc_key_md5)
+ .send()
+ .await;
+ assert!(
+ o.is_err(),
+ "encrypted file could be read with incorrect encryption key"
+ );
+
+ // Test read encrypted with correct key
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(bucket)
+ .key(obj_key)
+ .sse_customer_algorithm("AES256")
+ .sse_customer_key(enc_key)
+ .sse_customer_key_md5(enc_key_md5)
+ .send()
+ .await
+ .unwrap();
+ assert_bytes_eq!(o.body, expected_data);
+ assert_eq!(o.sse_customer_algorithm, Some("AES256".into()));
+ assert_eq!(o.sse_customer_key_md5, Some(enc_key_md5.to_string()));
+}