aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml8
-rw-r--r--src/api/admin/api_server.rs2
-rw-r--r--src/api/admin/bucket.rs4
-rw-r--r--src/api/admin/cluster.rs176
-rw-r--r--src/api/common_error.rs8
-rw-r--r--src/api/k2v/index.rs13
-rw-r--r--src/api/s3/api_server.rs2
-rw-r--r--src/api/s3/checksum.rs406
-rw-r--r--src/api/s3/copy.rs373
-rw-r--r--src/api/s3/encryption.rs595
-rw-r--r--src/api/s3/error.rs12
-rw-r--r--src/api/s3/get.rs283
-rw-r--r--src/api/s3/list.rs80
-rw-r--r--src/api/s3/mod.rs2
-rw-r--r--src/api/s3/multipart.rs199
-rw-r--r--src/api/s3/post_object.rs60
-rw-r--r--src/api/s3/put.rs327
-rw-r--r--src/api/s3/xml.rs33
18 files changed, 2153 insertions, 430 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 317031a7..1b87496c 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_api"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -21,11 +21,15 @@ garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
+aes-gcm.workspace = true
argon2.workspace = true
+async-compression.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
+crc32fast.workspace = true
+crc32c.workspace = true
crypto-common.workspace = true
err-derive.workspace = true
hex.workspace = true
@@ -35,12 +39,14 @@ tracing.workspace = true
md-5.workspace = true
nom.workspace = true
pin-project.workspace = true
+sha1.workspace = true
sha2.workspace = true
futures.workspace = true
futures-util.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
+tokio-util.workspace = true
form_urlencoded.workspace = true
http.workspace = true
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 265639c4..0e4565bb 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -276,7 +276,7 @@ impl ApiHandler for AdminApiServer {
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
- Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
+ Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await,
// Keys
Endpoint::ListKeys => handle_list_keys(&self.garage).await,
Endpoint::GetKeyInfo {
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index cfe8a6c4..ac3cba00 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -123,7 +123,7 @@ async fn bucket_info_results(
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let mpu_counters = garage
@@ -131,7 +131,7 @@ async fn bucket_info_results(
.table
.get(&bucket_id, &EmptyKey)
.await?
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 3876c608..8c9cb1e5 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -16,25 +17,95 @@ use crate::admin::error::*;
use crate::helpers::{json_ok_response, parse_json_body};
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ let layout = garage.system.cluster_layout();
+ let mut nodes = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ i.id,
+ NodeResp {
+ id: hex::encode(i.id),
+ addr: i.addr,
+ hostname: i.status.hostname,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ data_partition: i
+ .status
+ .data_disk_avail
+ .map(|(avail, total)| FreeSpaceResp {
+ available: avail,
+ total,
+ }),
+ metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| {
+ FreeSpaceResp {
+ available: avail,
+ total,
+ }
+ }),
+ ..Default::default()
+ },
+ )
+ })
+ .collect::<HashMap<_, _>>();
+
+ for (id, _, role) in layout.current().roles.items().iter() {
+ if let layout::NodeRoleV(Some(r)) = role {
+ let role = NodeRoleResp {
+ id: hex::encode(id),
+ zone: r.zone.to_string(),
+ capacity: r.capacity,
+ tags: r.tags.clone(),
+ };
+ match nodes.get_mut(id) {
+ None => {
+ nodes.insert(
+ *id,
+ NodeResp {
+ id: hex::encode(id),
+ role: Some(role),
+ ..Default::default()
+ },
+ );
+ }
+ Some(n) => {
+ if n.role.is_none() {
+ n.role = Some(role);
+ }
+ }
+ }
+ }
+ }
+
+ for ver in layout.versions.iter().rev().skip(1) {
+ for (id, _, role) in ver.roles.items().iter() {
+ if let layout::NodeRoleV(Some(r)) = role {
+ if !nodes.contains_key(id) && r.capacity.is_some() {
+ nodes.insert(
+ *id,
+ NodeResp {
+ id: hex::encode(id),
+ draining: true,
+ ..Default::default()
+ },
+ );
+ }
+ }
+ }
+ }
+
+ let mut nodes = nodes.into_values().collect::<Vec<_>>();
+ nodes.sort_by(|x, y| x.id.cmp(&y.id));
+
let res = GetClusterStatusResponse {
node: hex::encode(garage.system.id),
garage_version: garage_util::version::garage_version(),
garage_features: garage_util::version::garage_features(),
rust_version: garage_util::version::rust_version(),
db_engine: garage.db.engine(),
- known_nodes: garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|i| KnownNodeResp {
- id: hex::encode(i.id),
- addr: i.addr,
- is_up: i.is_up,
- last_seen_secs_ago: i.last_seen_secs_ago,
- hostname: i.status.hostname,
- })
- .collect(),
- layout: format_cluster_layout(&garage.system.get_cluster_layout()),
+ layout_version: layout.current().version,
+ nodes,
};
Ok(json_ok_response(&res)?)
@@ -85,13 +156,14 @@ pub async fn handle_connect_cluster_nodes(
}
pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let res = format_cluster_layout(&garage.system.get_cluster_layout());
+ let res = format_cluster_layout(&garage.system.cluster_layout());
Ok(json_ok_response(&res)?)
}
-fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse {
+fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
let roles = layout
+ .current()
.roles
.items()
.iter()
@@ -105,10 +177,12 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.collect::<Vec<_>>();
let staged_role_changes = layout
- .staging_roles
+ .staging
+ .get()
+ .roles
.items()
.iter()
- .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v))
.map(|(k, _, v)| match &v.0 {
None => NodeRoleChange {
id: hex::encode(k),
@@ -126,7 +200,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.collect::<Vec<_>>();
GetClusterLayoutResponse {
- version: layout.version,
+ version: layout.current().version,
roles,
staged_role_changes,
}
@@ -155,8 +229,8 @@ struct GetClusterStatusResponse {
garage_features: Option<&'static [&'static str]>,
rust_version: &'static str,
db_engine: String,
- known_nodes: Vec<KnownNodeResp>,
- layout: GetClusterLayoutResponse,
+ layout_version: u64,
+ nodes: Vec<NodeResp>,
}
#[derive(Serialize)]
@@ -190,14 +264,27 @@ struct NodeRoleResp {
tags: Vec<String>,
}
-#[derive(Serialize)]
+#[derive(Serialize, Default)]
+#[serde(rename_all = "camelCase")]
+struct FreeSpaceResp {
+ available: u64,
+ total: u64,
+}
+
+#[derive(Serialize, Default)]
#[serde(rename_all = "camelCase")]
-struct KnownNodeResp {
+struct NodeResp {
id: String,
- addr: SocketAddr,
+ role: Option<NodeRoleResp>,
+ addr: Option<SocketAddr>,
+ hostname: Option<String>,
is_up: bool,
last_seen_secs_ago: Option<u64>,
- hostname: String,
+ draining: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ data_partition: Option<FreeSpaceResp>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ metadata_partition: Option<FreeSpaceResp>,
}
// ---- update functions ----
@@ -208,10 +295,10 @@ pub async fn handle_update_cluster_layout(
) -> Result<Response<ResBody>, Error> {
let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
- let mut layout = garage.system.get_cluster_layout();
+ let mut layout = garage.system.cluster_layout().clone();
- let mut roles = layout.roles.clone();
- roles.merge(&layout.staging_roles);
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
for change in updates {
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
@@ -232,11 +319,17 @@ pub async fn handle_update_cluster_layout(
};
layout
- .staging_roles
+ .staging
+ .get_mut()
+ .roles
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
@@ -246,12 +339,16 @@ pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> {
- let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
+ let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?;
- let layout = garage.system.get_cluster_layout();
+ let layout = garage.system.cluster_layout().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = ApplyClusterLayoutResponse {
message: msg,
@@ -262,13 +359,14 @@ pub async fn handle_apply_cluster_layout(
pub async fn handle_revert_cluster_layout(
garage: &Arc<Garage>,
- req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> {
- let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
-
- let layout = garage.system.get_cluster_layout();
- let layout = layout.revert_staged_changes(Some(param.version))?;
- garage.system.update_cluster_layout(&layout).await?;
+ let layout = garage.system.cluster_layout().clone();
+ let layout = layout.revert_staged_changes()?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
@@ -280,7 +378,7 @@ type UpdateClusterLayoutRequest = Vec<NodeRoleChange>;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
-struct ApplyRevertLayoutRequest {
+struct ApplyLayoutRequest {
version: u64,
}
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
index 4381f227..c47555d4 100644
--- a/src/api/common_error.rs
+++ b/src/api/common_error.rs
@@ -59,9 +59,7 @@ impl CommonError {
pub fn http_status_code(&self) -> StatusCode {
match self {
CommonError::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..),
) => StatusCode::SERVICE_UNAVAILABLE,
CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
@@ -80,9 +78,7 @@ impl CommonError {
match self {
CommonError::Forbidden(_) => "AccessDenied",
CommonError::InternalError(
- GarageError::Timeout
- | GarageError::RemoteError(_)
- | GarageError::Quorum(_, _, _, _),
+ GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..),
) => "ServiceUnavailable",
CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => {
"InternalError"
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 822bec44..e3397238 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,9 +1,6 @@
-use std::sync::Arc;
-
use hyper::Response;
use serde::Serialize;
-use garage_rpc::ring::Ring;
use garage_table::util::*;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
@@ -27,7 +24,11 @@ pub async fn handle_read_index(
let reverse = reverse.unwrap_or(false);
- let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+ let node_id_vec = garage
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .to_vec();
let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table,
@@ -36,7 +37,7 @@ pub async fn handle_read_index(
&start,
&end,
limit,
- Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ Some((DeletedFilter::NotDeleted, node_id_vec)),
EnumerationOrder::from_reverse(reverse),
)
.await?;
@@ -55,7 +56,7 @@ pub async fn handle_read_index(
partition_keys: partition_keys
.into_iter()
.map(|part| {
- let vals = part.filtered_values(&ring);
+ let vals = part.filtered_values(&garage.system.cluster_layout());
ReadIndexResponseEntry {
pk: part.sk,
entries: *vals.get(&s_entries).unwrap_or(&0),
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 1ed30996..1737af33 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -325,7 +325,7 @@ impl ApiHandler for S3ApiServer {
part_number_marker: part_number_marker.map(|p| p.min(10000)),
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
};
- handle_list_parts(ctx, &query).await
+ handle_list_parts(ctx, req, &query).await
}
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs
new file mode 100644
index 00000000..c9dc001c
--- /dev/null
+++ b/src/api/s3/checksum.rs
@@ -0,0 +1,406 @@
+use std::convert::{TryFrom, TryInto};
+use std::hash::Hasher;
+
+use base64::prelude::*;
+use crc32c::Crc32cHasher as Crc32c;
+use crc32fast::Hasher as Crc32;
+use md5::{Digest, Md5};
+use sha1::Sha1;
+use sha2::Sha256;
+
+use http::{HeaderMap, HeaderName, HeaderValue};
+
+use garage_util::data::*;
+use garage_util::error::OkOrMessage;
+
+use garage_model::s3::object_table::*;
+
+use crate::s3::error::*;
+
+pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
+ HeaderName::from_static("x-amz-checksum-algorithm");
+pub const X_AMZ_CHECKSUM_MODE: HeaderName = HeaderName::from_static("x-amz-checksum-mode");
+pub const X_AMZ_CHECKSUM_CRC32: HeaderName = HeaderName::from_static("x-amz-checksum-crc32");
+pub const X_AMZ_CHECKSUM_CRC32C: HeaderName = HeaderName::from_static("x-amz-checksum-crc32c");
+pub const X_AMZ_CHECKSUM_SHA1: HeaderName = HeaderName::from_static("x-amz-checksum-sha1");
+pub const X_AMZ_CHECKSUM_SHA256: HeaderName = HeaderName::from_static("x-amz-checksum-sha256");
+
+pub type Crc32Checksum = [u8; 4];
+pub type Crc32cChecksum = [u8; 4];
+pub type Md5Checksum = [u8; 16];
+pub type Sha1Checksum = [u8; 20];
+pub type Sha256Checksum = [u8; 32];
+
+#[derive(Debug, Default)]
+pub(crate) struct ExpectedChecksums {
+ // base64-encoded md5 (content-md5 header)
+ pub md5: Option<String>,
+ // content_sha256 (as a Hash / FixedBytes32)
+ pub sha256: Option<Hash>,
+ // extra x-amz-checksum-* header
+ pub extra: Option<ChecksumValue>,
+}
+
+pub(crate) struct Checksummer {
+ pub crc32: Option<Crc32>,
+ pub crc32c: Option<Crc32c>,
+ pub md5: Option<Md5>,
+ pub sha1: Option<Sha1>,
+ pub sha256: Option<Sha256>,
+}
+
+#[derive(Default)]
+pub(crate) struct Checksums {
+ pub crc32: Option<Crc32Checksum>,
+ pub crc32c: Option<Crc32cChecksum>,
+ pub md5: Option<Md5Checksum>,
+ pub sha1: Option<Sha1Checksum>,
+ pub sha256: Option<Sha256Checksum>,
+}
+
+impl Checksummer {
+ pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
+ let mut ret = Self {
+ crc32: None,
+ crc32c: None,
+ md5: None,
+ sha1: None,
+ sha256: None,
+ };
+
+ if expected.md5.is_some() || require_md5 {
+ ret.md5 = Some(Md5::new());
+ }
+ if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
+ ret.sha256 = Some(Sha256::new());
+ }
+ if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
+ ret.crc32 = Some(Crc32::new());
+ }
+ if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
+ ret.crc32c = Some(Crc32c::default());
+ }
+ if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
+ ret.sha1 = Some(Sha1::new());
+ }
+ ret
+ }
+
+ pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
+ match algo {
+ Some(ChecksumAlgorithm::Crc32) => {
+ self.crc32 = Some(Crc32::new());
+ }
+ Some(ChecksumAlgorithm::Crc32c) => {
+ self.crc32c = Some(Crc32c::default());
+ }
+ Some(ChecksumAlgorithm::Sha1) => {
+ self.sha1 = Some(Sha1::new());
+ }
+ Some(ChecksumAlgorithm::Sha256) => {
+ self.sha256 = Some(Sha256::new());
+ }
+ None => (),
+ }
+ self
+ }
+
+ pub(crate) fn update(&mut self, bytes: &[u8]) {
+ if let Some(crc32) = &mut self.crc32 {
+ crc32.update(bytes);
+ }
+ if let Some(crc32c) = &mut self.crc32c {
+ crc32c.write(bytes);
+ }
+ if let Some(md5) = &mut self.md5 {
+ md5.update(bytes);
+ }
+ if let Some(sha1) = &mut self.sha1 {
+ sha1.update(bytes);
+ }
+ if let Some(sha256) = &mut self.sha256 {
+ sha256.update(bytes);
+ }
+ }
+
+ pub(crate) fn finalize(self) -> Checksums {
+ Checksums {
+ crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())),
+ crc32c: self
+ .crc32c
+ .map(|x| u32::to_be_bytes(u32::try_from(x.finish()).unwrap())),
+ md5: self.md5.map(|x| x.finalize()[..].try_into().unwrap()),
+ sha1: self.sha1.map(|x| x.finalize()[..].try_into().unwrap()),
+ sha256: self.sha256.map(|x| x.finalize()[..].try_into().unwrap()),
+ }
+ }
+}
+
+impl Checksums {
+ pub fn verify(&self, expected: &ExpectedChecksums) -> Result<(), Error> {
+ if let Some(expected_md5) = &expected.md5 {
+ match self.md5 {
+ Some(md5) if BASE64_STANDARD.encode(&md5) == expected_md5.trim_matches('"') => (),
+ _ => {
+ return Err(Error::InvalidDigest(
+ "MD5 checksum verification failed (from content-md5)".into(),
+ ))
+ }
+ }
+ }
+ if let Some(expected_sha256) = &expected.sha256 {
+ match self.sha256 {
+ Some(sha256) if &sha256[..] == expected_sha256.as_slice() => (),
+ _ => {
+ return Err(Error::InvalidDigest(
+ "SHA256 checksum verification failed (from x-amz-content-sha256)".into(),
+ ))
+ }
+ }
+ }
+ if let Some(extra) = expected.extra {
+ let algo = extra.algorithm();
+ if self.extract(Some(algo)) != Some(extra) {
+ return Err(Error::InvalidDigest(format!(
+ "Failed to validate checksum for algorithm {:?}",
+ algo
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ pub fn extract(&self, algo: Option<ChecksumAlgorithm>) -> Option<ChecksumValue> {
+ match algo {
+ None => None,
+ Some(ChecksumAlgorithm::Crc32) => Some(ChecksumValue::Crc32(self.crc32.unwrap())),
+ Some(ChecksumAlgorithm::Crc32c) => Some(ChecksumValue::Crc32c(self.crc32c.unwrap())),
+ Some(ChecksumAlgorithm::Sha1) => Some(ChecksumValue::Sha1(self.sha1.unwrap())),
+ Some(ChecksumAlgorithm::Sha256) => Some(ChecksumValue::Sha256(self.sha256.unwrap())),
+ }
+ }
+}
+
+// ----
+
+#[derive(Default)]
+pub(crate) struct MultipartChecksummer {
+ pub md5: Md5,
+ pub extra: Option<MultipartExtraChecksummer>,
+}
+
+pub(crate) enum MultipartExtraChecksummer {
+ Crc32(Crc32),
+ Crc32c(Crc32c),
+ Sha1(Sha1),
+ Sha256(Sha256),
+}
+
+impl MultipartChecksummer {
+ pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self {
+ Self {
+ md5: Md5::new(),
+ extra: match algo {
+ None => None,
+ Some(ChecksumAlgorithm::Crc32) => {
+ Some(MultipartExtraChecksummer::Crc32(Crc32::new()))
+ }
+ Some(ChecksumAlgorithm::Crc32c) => {
+ Some(MultipartExtraChecksummer::Crc32c(Crc32c::default()))
+ }
+ Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())),
+ Some(ChecksumAlgorithm::Sha256) => {
+ Some(MultipartExtraChecksummer::Sha256(Sha256::new()))
+ }
+ },
+ }
+ }
+
+ pub(crate) fn update(
+ &mut self,
+ etag: &str,
+ checksum: Option<ChecksumValue>,
+ ) -> Result<(), Error> {
+ self.md5
+ .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?);
+ match (&mut self.extra, checksum) {
+ (None, _) => (),
+ (
+ Some(MultipartExtraChecksummer::Crc32(ref mut crc32)),
+ Some(ChecksumValue::Crc32(x)),
+ ) => {
+ crc32.update(&x);
+ }
+ (
+ Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)),
+ Some(ChecksumValue::Crc32c(x)),
+ ) => {
+ crc32c.write(&x);
+ }
+ (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => {
+ sha1.update(&x);
+ }
+ (
+ Some(MultipartExtraChecksummer::Sha256(ref mut sha256)),
+ Some(ChecksumValue::Sha256(x)),
+ ) => {
+ sha256.update(&x);
+ }
+ (Some(_), b) => {
+ return Err(Error::internal_error(format!(
+ "part checksum was not computed correctly, got: {:?}",
+ b
+ )))
+ }
+ }
+ Ok(())
+ }
+
+ pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) {
+ let md5 = self.md5.finalize()[..].try_into().unwrap();
+ let extra = match self.extra {
+ None => None,
+ Some(MultipartExtraChecksummer::Crc32(crc32)) => {
+ Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize())))
+ }
+ Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c(
+ u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()),
+ )),
+ Some(MultipartExtraChecksummer::Sha1(sha1)) => {
+ Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap()))
+ }
+ Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256(
+ sha256.finalize()[..].try_into().unwrap(),
+ )),
+ };
+ (md5, extra)
+ }
+}
+
+// ----
+
+/// Extract the value of the x-amz-checksum-algorithm header
+pub(crate) fn request_checksum_algorithm(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumAlgorithm>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ None => Ok(None),
+ Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
+ Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
+ Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
+ Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
+/// Extract the value of any of the x-amz-checksum-* headers
+pub(crate) fn request_checksum_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ let mut ret = vec![];
+
+ if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
+ let crc32 = BASE64_STANDARD
+ .decode(&crc32_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ ret.push(ChecksumValue::Crc32(crc32))
+ }
+ if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
+ let crc32c = BASE64_STANDARD
+ .decode(&crc32c_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ ret.push(ChecksumValue::Crc32c(crc32c))
+ }
+ if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
+ let sha1 = BASE64_STANDARD
+ .decode(&sha1_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ ret.push(ChecksumValue::Sha1(sha1))
+ }
+ if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
+ let sha256 = BASE64_STANDARD
+ .decode(&sha256_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ ret.push(ChecksumValue::Sha256(sha256))
+ }
+
+ if ret.len() > 1 {
+ return Err(Error::bad_request(
+ "multiple x-amz-checksum-* headers given",
+ ));
+ }
+ Ok(ret.pop())
+}
+
+/// Checks for the presense of x-amz-checksum-algorithm
+/// if so extract the corrseponding x-amz-checksum-* value
+pub(crate) fn request_checksum_algorithm_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ Some(x) if x == "CRC32" => {
+ let crc32 = headers
+ .get(X_AMZ_CHECKSUM_CRC32)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ Ok(Some(ChecksumValue::Crc32(crc32)))
+ }
+ Some(x) if x == "CRC32C" => {
+ let crc32c = headers
+ .get(X_AMZ_CHECKSUM_CRC32C)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ Ok(Some(ChecksumValue::Crc32c(crc32c)))
+ }
+ Some(x) if x == "SHA1" => {
+ let sha1 = headers
+ .get(X_AMZ_CHECKSUM_SHA1)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ Ok(Some(ChecksumValue::Sha1(sha1)))
+ }
+ Some(x) if x == "SHA256" => {
+ let sha256 = headers
+ .get(X_AMZ_CHECKSUM_SHA256)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ Ok(Some(ChecksumValue::Sha256(sha256)))
+ }
+ Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
+ None => Ok(None),
+ }
+}
+
+pub(crate) fn add_checksum_response_headers(
+ checksum: &Option<ChecksumValue>,
+ mut resp: http::response::Builder,
+) -> http::response::Builder {
+ match checksum {
+ Some(ChecksumValue::Crc32(crc32)) => {
+ resp = resp.header(X_AMZ_CHECKSUM_CRC32, BASE64_STANDARD.encode(&crc32));
+ }
+ Some(ChecksumValue::Crc32c(crc32c)) => {
+ resp = resp.header(X_AMZ_CHECKSUM_CRC32C, BASE64_STANDARD.encode(&crc32c));
+ }
+ Some(ChecksumValue::Sha1(sha1)) => {
+ resp = resp.header(X_AMZ_CHECKSUM_SHA1, BASE64_STANDARD.encode(&sha1));
+ }
+ Some(ChecksumValue::Sha256(sha256)) => {
+ resp = resp.header(X_AMZ_CHECKSUM_SHA256, BASE64_STANDARD.encode(&sha256));
+ }
+ None => (),
+ }
+ resp
+}
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 3c2bd483..411a6917 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -1,17 +1,18 @@
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use futures::{stream, stream::Stream, StreamExt};
-use md5::{Digest as Md5Digest, Md5};
+use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
use bytes::Bytes;
use hyper::{Request, Response};
use serde::Serialize;
use garage_net::bytes_buf::BytesBuf;
+use garage_net::stream::read_stream_to_end;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
+use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::s3::block_ref_table::*;
@@ -21,11 +22,16 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
+use crate::s3::get::full_object_byte_stream;
use crate::s3::multipart;
-use crate::s3::put::get_headers;
+use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
use crate::s3::xml::{self as s3_xml, xmlns_tag};
+// -------- CopyObject ---------
+
pub async fn handle_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@@ -33,13 +39,9 @@ pub async fn handle_copy(
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
- let source_object = get_copy_source(&ctx, req).await?;
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
- let ReqCtx {
- garage,
- bucket_id: dest_bucket_id,
- ..
- } = ctx;
+ let source_object = get_copy_source(&ctx, req).await?;
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@@ -47,26 +49,150 @@ pub async fn handle_copy(
// Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_version, &source_version_meta.etag)?;
+ // Determine encryption parameters
+ let (source_encryption, source_object_meta_inner) =
+ EncryptionParams::check_decrypt_for_copy_source(
+ &ctx.garage,
+ req.headers(),
+ &source_version_meta.encryption,
+ )?;
+ let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
+
+ // Extract source checksum info before source_object_meta_inner is consumed
+ let source_checksum = source_object_meta_inner.checksum;
+ let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
+
+ // If source object has a checksum, the destination object must as well.
+ // The x-amz-checksum-algorihtm header allows to change that algorithm,
+ // but if it is absent, we must use the same as before
+ let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
+
+ // Determine metadata of destination object
+ let was_multipart = source_version_meta.etag.contains('-');
+ let dest_object_meta = ObjectVersionMetaInner {
+ headers: match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
+ get_headers(req.headers())?
+ }
+ _ => source_object_meta_inner.into_owned().headers,
+ },
+ checksum: source_checksum,
+ };
+
+ // Do actual object copying
+ //
+ // In any of the following scenarios, we need to read the whole object
+ // data and re-write it again:
+ //
+ // - the data needs to be decrypted or encrypted
+ // - the requested checksum algorithm requires us to recompute a checksum
+ // - the original object was a multipart upload and a checksum algorithm
+ // is defined (AWS specifies that in this case, we must recompute the
+ // checksum from scratch as if this was a single big object and not
+ // a multipart object, as the checksums are not computed in the same way)
+ //
+ // In other cases, we can just copy the metadata and reference the same blocks.
+ //
+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
+
+ let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption)
+ || source_checksum_algorithm != checksum_algorithm
+ || (was_multipart && checksum_algorithm.is_some());
+
+ let res = if !must_recopy {
+ // In most cases, we can just copy the metadata and link blocks of the
+ // old object from the new object.
+ handle_copy_metaonly(
+ ctx,
+ dest_key,
+ dest_object_meta,
+ dest_encryption,
+ source_version,
+ source_version_data,
+ source_version_meta,
+ )
+ .await?
+ } else {
+ let expected_checksum = ExpectedChecksums {
+ md5: None,
+ sha256: None,
+ extra: source_checksum,
+ };
+ let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm {
+ ChecksumMode::Calculate(checksum_algorithm)
+ } else {
+ ChecksumMode::Verify(&expected_checksum)
+ };
+ // If source and dest encryption use different keys,
+ // we must decrypt content and re-encrypt, so rewrite all data blocks.
+ handle_copy_reencrypt(
+ ctx,
+ dest_key,
+ dest_object_meta,
+ dest_encryption,
+ source_version,
+ source_version_data,
+ source_encryption,
+ checksum_mode,
+ )
+ .await?
+ };
+
+ let last_modified = msec_to_rfc3339(res.version_timestamp);
+ let result = CopyObjectResult {
+ last_modified: s3_xml::Value(last_modified),
+ etag: s3_xml::Value(format!("\"{}\"", res.etag)),
+ };
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ let mut resp = Response::builder()
+ .header("Content-Type", "application/xml")
+ .header("x-amz-version-id", hex::encode(res.version_uuid))
+ .header(
+ "x-amz-copy-source-version-id",
+ hex::encode(source_version.uuid),
+ );
+ dest_encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(xml))?)
+}
+
+async fn handle_copy_metaonly(
+ ctx: ReqCtx,
+ dest_key: &str,
+ dest_object_meta: ObjectVersionMetaInner,
+ dest_encryption: EncryptionParams,
+ source_version: &ObjectVersion,
+ source_version_data: &ObjectVersionData,
+ source_version_meta: &ObjectVersionMeta,
+) -> Result<SaveStreamResult, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id: dest_bucket_id,
+ ..
+ } = ctx;
+
// Generate parameters for copied object
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
- // Implement x-amz-metadata-directive: REPLACE
- let new_meta = match req.headers().get("x-amz-metadata-directive") {
- Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
- headers: get_headers(req.headers())?,
- size: source_version_meta.size,
- etag: source_version_meta.etag.clone(),
- },
- _ => source_version_meta.clone(),
+ let new_meta = ObjectVersionMeta {
+ encryption: dest_encryption.encrypt_meta(dest_object_meta)?,
+ size: source_version_meta.size,
+ etag: source_version_meta.etag.clone(),
};
- let etag = new_meta.etag.to_string();
+ let res = SaveStreamResult {
+ version_uuid: new_uuid,
+ version_timestamp: new_timestamp,
+ etag: new_meta.etag.clone(),
+ };
// Save object copy
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
+ // bytes is either plaintext before&after or encrypted with the
+ // same keys, so it's ok to just copy it as is
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
@@ -97,7 +223,8 @@ pub async fn handle_copy(
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Uploading {
- headers: new_meta.headers.clone(),
+ encryption: new_meta.encryption.clone(),
+ checksum_algorithm: None,
multipart: false,
},
};
@@ -164,23 +291,42 @@ pub async fn handle_copy(
}
}
- let last_modified = msec_to_rfc3339(new_timestamp);
- let result = CopyObjectResult {
- last_modified: s3_xml::Value(last_modified),
- etag: s3_xml::Value(format!("\"{}\"", etag)),
- };
- let xml = s3_xml::to_xml_with_header(&result)?;
+ Ok(res)
+}
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .header("x-amz-version-id", hex::encode(new_uuid))
- .header(
- "x-amz-copy-source-version-id",
- hex::encode(source_version.uuid),
- )
- .body(string_body(xml))?)
+async fn handle_copy_reencrypt(
+ ctx: ReqCtx,
+ dest_key: &str,
+ dest_object_meta: ObjectVersionMetaInner,
+ dest_encryption: EncryptionParams,
+ source_version: &ObjectVersion,
+ source_version_data: &ObjectVersionData,
+ source_encryption: EncryptionParams,
+ checksum_mode: ChecksumMode<'_>,
+) -> Result<SaveStreamResult, Error> {
+ // basically we will read the source data (decrypt if necessary)
+ // and save that in a new object (encrypt if necessary),
+ // by combining the code used in getobject and putobject
+ let source_stream = full_object_byte_stream(
+ ctx.garage.clone(),
+ source_version,
+ source_version_data,
+ source_encryption,
+ );
+
+ save_stream(
+ &ctx,
+ dest_object_meta,
+ dest_encryption,
+ source_stream.map_err(|e| Error::from(GarageError::from(e))),
+ &dest_key.to_string(),
+ checksum_mode,
+ )
+ .await
}
+// -------- UploadPartCopy ---------
+
pub async fn handle_upload_part_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@@ -193,7 +339,7 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
- let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
+ let (source_object, (_, dest_version, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
@@ -206,6 +352,24 @@ pub async fn handle_upload_part_copy(
// Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_object_version, &source_version_meta.etag)?;
+ // Determine encryption parameters
+ let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source(
+ &garage,
+ req.headers(),
+ &source_version_meta.encryption,
+ )?;
+ let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
+ _ => unreachable!(),
+ };
+ let (dest_encryption, _) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &dest_object_encryption)?;
+ let same_encryption = EncryptionParams::is_same(&source_encryption, &dest_encryption);
+
// Check source range is valid
let source_range = match req.headers().get("x-amz-copy-source-range") {
Some(range) => {
@@ -227,21 +391,16 @@ pub async fn handle_upload_part_copy(
};
// Check source version is not inlined
- match source_version_data {
- ObjectVersionData::DeleteMarker => unreachable!(),
- ObjectVersionData::Inline(_meta, _bytes) => {
- // This is only for small files, we don't bother handling this.
- // (in AWS UploadPartCopy works for parts at least 5MB which
- // is never the case of an inline object)
- return Err(Error::bad_request(
- "Source object is too small (minimum part size is 5Mb)",
- ));
- }
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
- };
+ if matches!(source_version_data, ObjectVersionData::Inline(_, _)) {
+ // This is only for small files, we don't bother handling this.
+ // (in AWS UploadPartCopy works for parts at least 5MB which
+ // is never the case of an inline object)
+ return Err(Error::bad_request(
+ "Source object is too small (minimum part size is 5Mb)",
+ ));
+ }
- // Fetch source versin with its block list,
- // and destination version to check part hasn't yet been uploaded
+ // Fetch source version with its block list
let source_version = garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
@@ -251,7 +410,9 @@ pub async fn handle_upload_part_copy(
// We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part
- // which is used as its ETag.
+ // which is used as its ETag. For encrypted sources or destinations,
+ // we must always read(+decrypt) and then write(+encrypt), so we
+ // can never reuse data blocks as is.
// First, calculate what blocks we want to keep,
// and the subrange of the block to take, if the bounds of the
@@ -300,7 +461,9 @@ pub async fn handle_upload_part_copy(
dest_mpu_part_key,
MpuPart {
version: dest_version_id,
+ // These are all filled in later (bottom of this function)
etag: None,
+ checksum: None,
size: None,
},
);
@@ -313,32 +476,55 @@ pub async fn handle_upload_part_copy(
},
false,
);
+ // write an empty version now to be the parent of the block_ref entries
+ garage.version_table.insert(&dest_version).await?;
// Now, actually copy the blocks
- let mut md5hasher = Md5::new();
+ let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted())
+ .add(dest_object_checksum_algorithm);
// First, create a stream that is able to read the source blocks
// and extract the subrange if necessary.
// The second returned value is an Option<Hash>, that is Some
// if and only if the block returned is a block that already existed
- // in the Garage data store (thus we don't need to save it again).
+ // in the Garage data store and can be reused as-is instead of having
+ // to save it again. This excludes encrypted source blocks that we had
+ // to decrypt.
let garage2 = garage.clone();
let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
.enumerate()
- .flat_map(|(i, (block_hash, range_to_copy))| {
+ .map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
- stream::once(async move {
- let data = garage3
- .block_manager
- .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ async move {
+ let stream = source_encryption
+ .get_block(&garage3, &block_hash, Some(order_stream.order(i as u64)))
.await?;
+ let data = read_stream_to_end(stream).await?.into_bytes();
+ // For each item, we return a tuple of:
+ // 1. the full data block (decrypted)
+ // 2. an Option<Hash> that indicates the hash of the block in the block store,
+ // only if it can be re-used as-is in the copied object
match range_to_copy {
- Some(r) => Ok((data.slice(r), None)),
- None => Ok((data, Some(block_hash))),
+ Some(r) => {
+ // If we are taking a subslice of the data, we cannot reuse the block as-is
+ Ok((data.slice(r), None))
+ }
+ None if same_encryption => {
+ // If the data is unencrypted before & after, or if we are using
+ // the same encryption key, we can reuse the stored block, no need
+ // to re-send it to storage nodes.
+ Ok((data, Some(block_hash)))
+ }
+ None => {
+ // If we are decrypting / (re)encrypting with different keys,
+ // we cannot reuse the block as-is
+ Ok((data, None))
+ }
}
- })
+ }
})
+ .buffered(2)
.peekable();
// The defragmenter is a custom stream (defined below) that concatenates
@@ -346,22 +532,39 @@ pub async fn handle_upload_part_copy(
// It returns a series of (Vec<u8>, Option<Hash>).
// When it is done, it returns an empty vec.
// Same as the previous iterator, the Option is Some(_) if and only if
- // it's an existing block of the Garage data store.
+ // it's an existing block of the Garage data store that can be reused.
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
let mut current_offset = 0;
let mut next_block = defragmenter.next().await?;
+ // TODO this could be optimized similarly to read_and_put_blocks
+ // low priority because uploadpartcopy is rarely used
loop {
let (data, existing_block_hash) = next_block;
if data.is_empty() {
break;
}
- md5hasher.update(&data[..]);
-
- let must_upload = existing_block_hash.is_none();
- let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
+ let data_len = data.len() as u64;
+
+ let (checksummer_updated, (data_to_upload, final_hash)) =
+ tokio::task::spawn_blocking(move || {
+ checksummer.update(&data[..]);
+
+ let tup = match existing_block_hash {
+ Some(hash) if same_encryption => (None, hash),
+ _ => {
+ let data_enc = dest_encryption.encrypt_block(data)?;
+ let hash = blake2sum(&data_enc);
+ (Some(data_enc), hash)
+ }
+ };
+ Ok::<_, Error>((checksummer, tup))
+ })
+ .await
+ .unwrap()?;
+ checksummer = checksummer_updated;
dest_version.blocks.clear();
dest_version.blocks.put(
@@ -371,10 +574,10 @@ pub async fn handle_upload_part_copy(
},
VersionBlock {
hash: final_hash,
- size: data.len() as u64,
+ size: data_len,
},
);
- current_offset += data.len() as u64;
+ current_offset += data_len;
let block_ref = BlockRef {
block: final_hash,
@@ -382,36 +585,34 @@ pub async fn handle_upload_part_copy(
deleted: false.into(),
};
- let garage2 = garage.clone();
- let res = futures::try_join!(
+ let (_, _, _, next) = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
- async move {
- if must_upload {
- garage2
+ async {
+ if let Some(final_data) = data_to_upload {
+ garage
.block_manager
- .rpc_put_block(final_hash, data, None)
+ .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
.await
} else {
Ok(())
}
},
- async {
- // Thing 2: we need to insert the block in the version
- garage.version_table.insert(&dest_version).await?;
- // Thing 3: we need to add a block reference
- garage.block_ref_table.insert(&block_ref).await
- },
- // Thing 4: we need to prefetch the next block
+ // Thing 2: we need to insert the block in the version
+ garage.version_table.insert(&dest_version),
+ // Thing 3: we need to add a block reference
+ garage.block_ref_table.insert(&block_ref),
+ // Thing 4: we need to read the next block
defragmenter.next(),
)?;
- next_block = res.2;
+ next_block = next;
}
assert_eq!(current_offset, source_range.length);
- let data_md5sum = md5hasher.finalize();
- let etag = hex::encode(data_md5sum);
+ let checksums = checksummer.finalize();
+ let etag = dest_encryption.etag_from_md5(&checksums.md5);
+ let checksum = checksums.extract(dest_object_checksum_algorithm);
// Put the part's ETag in the Versiontable
dest_mpu.parts.put(
@@ -419,6 +620,7 @@ pub async fn handle_upload_part_copy(
MpuPart {
version: dest_version_id,
etag: Some(etag.clone()),
+ checksum,
size: Some(current_offset),
},
);
@@ -431,13 +633,14 @@ pub async fn handle_upload_part_copy(
last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
})?;
- Ok(Response::builder()
+ let mut resp = Response::builder()
.header("Content-Type", "application/xml")
.header(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
- )
- .body(string_body(resp_xml))?)
+ );
+ dest_encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(resp_xml))?)
}
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs
new file mode 100644
index 00000000..2e6ed65c
--- /dev/null
+++ b/src/api/s3/encryption.rs
@@ -0,0 +1,595 @@
+use std::borrow::Cow;
+use std::convert::TryInto;
+use std::pin::Pin;
+
+use aes_gcm::{
+ aead::stream::{DecryptorLE31, EncryptorLE31, StreamLE31},
+ aead::{Aead, AeadCore, KeyInit, OsRng},
+ aes::cipher::crypto_common::rand_core::RngCore,
+ aes::cipher::typenum::Unsigned,
+ Aes256Gcm, Key, Nonce,
+};
+use base64::prelude::*;
+use bytes::Bytes;
+
+use futures::stream::Stream;
+use futures::task;
+use tokio::io::BufReader;
+
+use http::header::{HeaderMap, HeaderName, HeaderValue};
+
+use garage_net::bytes_buf::BytesBuf;
+use garage_net::stream::{stream_asyncread, ByteStream};
+use garage_rpc::rpc_helper::OrderTag;
+use garage_util::data::Hash;
+use garage_util::error::Error as GarageError;
+use garage_util::migrate::Migrate;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner};
+
+use crate::common_error::*;
+use crate::s3::checksum::Md5Checksum;
+use crate::s3::error::Error;
+
+const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
+ HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm");
+const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
+ HeaderName::from_static("x-amz-server-side-encryption-customer-key");
+const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
+ HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5");
+
+const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
+ HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm");
+const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
+ HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key");
+const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
+ HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5");
+
+const CUSTOMER_ALGORITHM_AES256: &[u8] = b"AES256";
+
+type Md5Output = md5::digest::Output<md5::Md5Core>;
+
+type StreamNonceSize = aes_gcm::aead::stream::NonceSize<Aes256Gcm, StreamLE31<Aes256Gcm>>;
+
+// Data blocks are encrypted by smaller chunks of size 4096 bytes,
+// so that data can be streamed when reading.
+// This size has to be known and has to be constant, or data won't be
+// readable anymore. DO NOT CHANGE THIS VALUE.
+const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes
+const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16;
+
+#[derive(Clone, Copy)]
+pub enum EncryptionParams {
+ Plaintext,
+ SseC {
+ client_key: Key<Aes256Gcm>,
+ client_key_md5: Md5Output,
+ compression_level: Option<i32>,
+ },
+}
+
+impl EncryptionParams {
+ pub fn is_encrypted(&self) -> bool {
+ !matches!(self, Self::Plaintext)
+ }
+
+ pub fn is_same(a: &Self, b: &Self) -> bool {
+ let relevant_info = |x: &Self| match x {
+ Self::Plaintext => None,
+ Self::SseC {
+ client_key,
+ compression_level,
+ ..
+ } => Some((*client_key, compression_level.is_some())),
+ };
+ relevant_info(a) == relevant_info(b)
+ }
+
+ pub fn new_from_headers(
+ garage: &Garage,
+ headers: &HeaderMap,
+ ) -> Result<EncryptionParams, Error> {
+ let key = parse_request_headers(
+ headers,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
+ )?;
+ match key {
+ Some((client_key, client_key_md5)) => Ok(EncryptionParams::SseC {
+ client_key,
+ client_key_md5,
+ compression_level: garage.config.compression_level,
+ }),
+ None => Ok(EncryptionParams::Plaintext),
+ }
+ }
+
+ pub fn add_response_headers(&self, resp: &mut http::response::Builder) {
+ if let Self::SseC { client_key_md5, .. } = self {
+ let md5 = BASE64_STANDARD.encode(&client_key_md5);
+
+ resp.headers_mut().unwrap().insert(
+ X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
+ HeaderValue::from_bytes(CUSTOMER_ALGORITHM_AES256).unwrap(),
+ );
+ resp.headers_mut().unwrap().insert(
+ X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
+ HeaderValue::from_bytes(md5.as_bytes()).unwrap(),
+ );
+ }
+ }
+
+ pub fn check_decrypt<'a>(
+ garage: &Garage,
+ headers: &HeaderMap,
+ obj_enc: &'a ObjectVersionEncryption,
+ ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
+ let key = parse_request_headers(
+ headers,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
+ &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
+ )?;
+ Self::check_decrypt_common(garage, key, obj_enc)
+ }
+
+ pub fn check_decrypt_for_copy_source<'a>(
+ garage: &Garage,
+ headers: &HeaderMap,
+ obj_enc: &'a ObjectVersionEncryption,
+ ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
+ let key = parse_request_headers(
+ headers,
+ &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
+ &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
+ &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
+ )?;
+ Self::check_decrypt_common(garage, key, obj_enc)
+ }
+
+ fn check_decrypt_common<'a>(
+ garage: &Garage,
+ key: Option<(Key<Aes256Gcm>, Md5Output)>,
+ obj_enc: &'a ObjectVersionEncryption,
+ ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
+ match (key, &obj_enc) {
+ (
+ Some((client_key, client_key_md5)),
+ ObjectVersionEncryption::SseC { inner, compressed },
+ ) => {
+ let enc = Self::SseC {
+ client_key,
+ client_key_md5,
+ compression_level: if *compressed {
+ Some(garage.config.compression_level.unwrap_or(1))
+ } else {
+ None
+ },
+ };
+ let plaintext = enc.decrypt_blob(&inner)?;
+ let inner = ObjectVersionMetaInner::decode(&plaintext)
+ .ok_or_internal_error("Could not decode encrypted metadata")?;
+ Ok((enc, Cow::Owned(inner)))
+ }
+ (None, ObjectVersionEncryption::Plaintext { inner }) => {
+ Ok((Self::Plaintext, Cow::Borrowed(inner)))
+ }
+ (_, ObjectVersionEncryption::SseC { .. }) => {
+ Err(Error::bad_request("Object is encrypted"))
+ }
+ (Some(_), _) => {
+ // TODO: should this be an OK scenario?
+ Err(Error::bad_request("Trying to decrypt a plaintext object"))
+ }
+ }
+ }
+
+ pub fn encrypt_meta(
+ &self,
+ meta: ObjectVersionMetaInner,
+ ) -> Result<ObjectVersionEncryption, Error> {
+ match self {
+ Self::SseC {
+ compression_level, ..
+ } => {
+ let plaintext = meta.encode().map_err(GarageError::from)?;
+ let ciphertext = self.encrypt_blob(&plaintext)?;
+ Ok(ObjectVersionEncryption::SseC {
+ inner: ciphertext.into_owned(),
+ compressed: compression_level.is_some(),
+ })
+ }
+ Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { inner: meta }),
+ }
+ }
+
+ // ---- generating object Etag values ----
+ pub fn etag_from_md5(&self, md5sum: &Option<Md5Checksum>) -> String {
+ match self {
+ Self::Plaintext => md5sum
+ .map(|x| hex::encode(&x[..]))
+ .expect("md5 digest should have been computed"),
+ Self::SseC { .. } => {
+ // AWS specifies that for encrypted objects, the Etag is not
+ // the md5sum of the data, but doesn't say what it is.
+ // So we just put some random bytes.
+ let mut random = [0u8; 16];
+ OsRng.fill_bytes(&mut random);
+ hex::encode(&random)
+ }
+ }
+ }
+
+ // ---- generic function for encrypting / decrypting blobs ----
+ // Prepends a randomly-generated nonce to the encrypted value.
+ // This is used for encrypting object metadata and inlined data for small objects.
+ // This does not compress anything.
+
+ pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
+ match self {
+ Self::SseC { client_key, .. } => {
+ let cipher = Aes256Gcm::new(&client_key);
+ let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
+ let ciphertext = cipher
+ .encrypt(&nonce, blob)
+ .ok_or_internal_error("Encryption failed")?;
+ Ok(Cow::Owned([nonce.to_vec(), ciphertext].concat()))
+ }
+ Self::Plaintext => Ok(Cow::Borrowed(blob)),
+ }
+ }
+
+ pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
+ match self {
+ Self::SseC { client_key, .. } => {
+ let cipher = Aes256Gcm::new(&client_key);
+ let nonce_size = <Aes256Gcm as AeadCore>::NonceSize::to_usize();
+ let nonce = Nonce::from_slice(
+ blob.get(..nonce_size)
+ .ok_or_internal_error("invalid encrypted data")?,
+ );
+ let plaintext = cipher
+ .decrypt(nonce, &blob[nonce_size..])
+ .ok_or_bad_request(
+ "Invalid encryption key, could not decrypt object metadata.",
+ )?;
+ Ok(Cow::Owned(plaintext))
+ }
+ Self::Plaintext => Ok(Cow::Borrowed(blob)),
+ }
+ }
+
+ // ---- function for encrypting / decrypting byte streams ----
+
+ /// Get a data block from the storage node, and decrypt+decompress it
+ /// if necessary. If object is plaintext, just get it without any processing.
+ pub async fn get_block(
+ &self,
+ garage: &Garage,
+ hash: &Hash,
+ order: Option<OrderTag>,
+ ) -> Result<ByteStream, GarageError> {
+ let raw_block = garage
+ .block_manager
+ .rpc_get_block_streaming(hash, order)
+ .await?;
+ Ok(self.decrypt_block_stream(raw_block))
+ }
+
+ pub fn decrypt_block_stream(&self, stream: ByteStream) -> ByteStream {
+ match self {
+ Self::Plaintext => stream,
+ Self::SseC {
+ client_key,
+ compression_level,
+ ..
+ } => {
+ let plaintext = DecryptStream::new(stream, *client_key);
+ if compression_level.is_some() {
+ let reader = stream_asyncread(Box::pin(plaintext));
+ let reader = BufReader::new(reader);
+ let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
+ Box::pin(tokio_util::io::ReaderStream::new(reader))
+ } else {
+ Box::pin(plaintext)
+ }
+ }
+ }
+ }
+
+ /// Encrypt a data block if encryption is set, for use before
+ /// putting the data blocks into storage
+ pub fn encrypt_block(&self, block: Bytes) -> Result<Bytes, Error> {
+ match self {
+ Self::Plaintext => Ok(block),
+ Self::SseC {
+ client_key,
+ compression_level,
+ ..
+ } => {
+ let block = if let Some(level) = compression_level {
+ Cow::Owned(
+ garage_block::zstd_encode(block.as_ref(), *level)
+ .ok_or_internal_error("failed to compress data block")?,
+ )
+ } else {
+ Cow::Borrowed(block.as_ref())
+ };
+
+ let mut ret = Vec::with_capacity(block.len() + 32 + block.len() / 64);
+
+ let mut nonce: Nonce<StreamNonceSize> = Default::default();
+ OsRng.fill_bytes(&mut nonce);
+ ret.extend_from_slice(nonce.as_slice());
+
+ let mut cipher = EncryptorLE31::<Aes256Gcm>::new(&client_key, &nonce);
+ let mut iter = block.chunks(STREAM_ENC_PLAIN_CHUNK_SIZE).peekable();
+
+ if iter.peek().is_none() {
+ // Empty stream: we encrypt an empty last chunk
+ let chunk_enc = cipher
+ .encrypt_last(&[][..])
+ .ok_or_internal_error("failed to encrypt chunk")?;
+ ret.extend_from_slice(&chunk_enc);
+ } else {
+ loop {
+ let chunk = iter.next().unwrap();
+ if iter.peek().is_some() {
+ let chunk_enc = cipher
+ .encrypt_next(chunk)
+ .ok_or_internal_error("failed to encrypt chunk")?;
+ assert_eq!(chunk.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
+ assert_eq!(chunk_enc.len(), STREAM_ENC_CYPER_CHUNK_SIZE);
+ ret.extend_from_slice(&chunk_enc);
+ } else {
+ // use encrypt_last for the last chunk
+ let chunk_enc = cipher
+ .encrypt_last(chunk)
+ .ok_or_internal_error("failed to encrypt chunk")?;
+ ret.extend_from_slice(&chunk_enc);
+ break;
+ }
+ }
+ }
+
+ Ok(ret.into())
+ }
+ }
+ }
+}
+
+fn parse_request_headers(
+ headers: &HeaderMap,
+ alg_header: &HeaderName,
+ key_header: &HeaderName,
+ md5_header: &HeaderName,
+) -> Result<Option<(Key<Aes256Gcm>, Md5Output)>, Error> {
+ let alg = headers.get(alg_header).map(HeaderValue::as_bytes);
+ let key = headers.get(key_header).map(HeaderValue::as_bytes);
+ let md5 = headers.get(md5_header).map(HeaderValue::as_bytes);
+
+ match alg {
+ Some(CUSTOMER_ALGORITHM_AES256) => {
+ use md5::{Digest, Md5};
+
+ let key_b64 =
+ key.ok_or_bad_request("Missing server-side-encryption-customer-key header")?;
+ let key_bytes: [u8; 32] = BASE64_STANDARD
+ .decode(&key_b64)
+ .ok_or_bad_request(
+ "Invalid server-side-encryption-customer-key header: invalid base64",
+ )?
+ .try_into()
+ .ok()
+ .ok_or_bad_request(
+ "Invalid server-side-encryption-customer-key header: invalid length",
+ )?;
+
+ let md5_b64 =
+ md5.ok_or_bad_request("Missing server-side-encryption-customer-key-md5 header")?;
+ let md5_bytes = BASE64_STANDARD.decode(&md5_b64).ok_or_bad_request(
+ "Invalid server-side-encryption-customer-key-md5 header: invalid bass64",
+ )?;
+
+ let mut hasher = Md5::new();
+ hasher.update(&key_bytes[..]);
+ let our_md5 = hasher.finalize();
+ if our_md5.as_slice() != md5_bytes.as_slice() {
+ return Err(Error::bad_request(
+ "Server-side encryption client key MD5 checksum does not match",
+ ));
+ }
+
+ Ok(Some((key_bytes.into(), our_md5)))
+ }
+ Some(alg) => Err(Error::InvalidEncryptionAlgorithm(
+ String::from_utf8_lossy(alg).into_owned(),
+ )),
+ None => {
+ if key.is_some() || md5.is_some() {
+ Err(Error::bad_request(
+ "Unexpected server-side-encryption-customer-key{,-md5} header(s)",
+ ))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+}
+
+// ---- encrypt & decrypt streams ----
+
+#[pin_project::pin_project]
+struct DecryptStream {
+ #[pin]
+ stream: ByteStream,
+ done_reading: bool,
+ buf: BytesBuf,
+ key: Key<Aes256Gcm>,
+ state: DecryptStreamState,
+}
+
+enum DecryptStreamState {
+ Starting,
+ Running(DecryptorLE31<Aes256Gcm>),
+ Done,
+}
+
+impl DecryptStream {
+ fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self {
+ Self {
+ stream,
+ done_reading: false,
+ buf: BytesBuf::new(),
+ key,
+ state: DecryptStreamState::Starting,
+ }
+ }
+}
+
+impl Stream for DecryptStream {
+ type Item = Result<Bytes, std::io::Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> task::Poll<Option<Self::Item>> {
+ use std::task::Poll;
+
+ let mut this = self.project();
+
+ // The first bytes of the stream should contain the starting nonce.
+ // If we don't have a Running state, it means that we haven't
+ // yet read the nonce.
+ while matches!(this.state, DecryptStreamState::Starting) {
+ let nonce_size = StreamNonceSize::to_usize();
+ if let Some(nonce) = this.buf.take_exact(nonce_size) {
+ let nonce = Nonce::from_slice(nonce.as_ref());
+ *this.state = DecryptStreamState::Running(DecryptorLE31::new(&this.key, nonce));
+ break;
+ }
+
+ match futures::ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(Ok(bytes)) => {
+ this.buf.extend(bytes);
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ None => {
+ return Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "Decrypt: unexpected EOF, could not read nonce",
+ ))));
+ }
+ }
+ }
+
+ // Read at least one byte more than the encrypted chunk size
+ // (if possible), so that we know if we are decrypting the
+ // last chunk or not.
+ while !*this.done_reading && this.buf.len() <= STREAM_ENC_CYPER_CHUNK_SIZE {
+ match futures::ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(Ok(bytes)) => {
+ this.buf.extend(bytes);
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ None => {
+ *this.done_reading = true;
+ break;
+ }
+ }
+ }
+
+ if matches!(this.state, DecryptStreamState::Done) {
+ if !this.buf.is_empty() {
+ return Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Decrypt: unexpected bytes after last encrypted chunk",
+ ))));
+ }
+ return Poll::Ready(None);
+ }
+
+ let res = if this.buf.len() > STREAM_ENC_CYPER_CHUNK_SIZE {
+ // we have strictly more bytes than the encrypted chunk size,
+ // so we know this is not the last
+ let DecryptStreamState::Running(ref mut cipher) = this.state else {
+ unreachable!()
+ };
+ let chunk = this.buf.take_exact(STREAM_ENC_CYPER_CHUNK_SIZE).unwrap();
+ let chunk_dec = cipher.decrypt_next(chunk.as_ref());
+ if let Ok(c) = &chunk_dec {
+ assert_eq!(c.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
+ }
+ chunk_dec
+ } else {
+ // We have one encrypted chunk size or less, even though we tried
+ // to read more, so this is the last chunk. Decrypt using the
+ // appropriate decrypt_last() function that then destroys the cipher.
+ let state = std::mem::replace(this.state, DecryptStreamState::Done);
+ let DecryptStreamState::Running(cipher) = state else {
+ unreachable!()
+ };
+ let chunk = this.buf.take_all();
+ cipher.decrypt_last(chunk.as_ref())
+ };
+
+ match res {
+ Ok(bytes) if bytes.is_empty() => Poll::Ready(None),
+ Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))),
+ Err(_) => Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Decryption failed",
+ )))),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use futures::stream::StreamExt;
+ use garage_net::stream::read_stream_to_end;
+
+ fn stream() -> ByteStream {
+ Box::pin(
+ futures::stream::iter(16usize..1024)
+ .map(|i| Ok(Bytes::from(vec![(i % 256) as u8; (i * 37) % 1024]))),
+ )
+ }
+
+ async fn test_block_enc(compression_level: Option<i32>) {
+ let enc = EncryptionParams::SseC {
+ client_key: Aes256Gcm::generate_key(&mut OsRng),
+ client_key_md5: Default::default(), // not needed
+ compression_level,
+ };
+
+ let block_plain = read_stream_to_end(stream()).await.unwrap().into_bytes();
+
+ let block_enc = enc.encrypt_block(block_plain.clone()).unwrap();
+
+ let block_dec =
+ enc.decrypt_block_stream(Box::pin(futures::stream::once(async { Ok(block_enc) })));
+ let block_dec = read_stream_to_end(block_dec).await.unwrap().into_bytes();
+
+ assert_eq!(block_plain, block_dec);
+ assert!(block_dec.len() > 128000);
+ }
+
+ #[tokio::test]
+ async fn test_encrypt_block() {
+ test_block_enc(None).await
+ }
+
+ #[tokio::test]
+ async fn test_encrypt_block_compressed() {
+ test_block_enc(Some(1)).await
+ }
+}
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index f86c19a6..2855e0b3 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -65,6 +65,14 @@ pub enum Error {
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
+ /// The client sent a range header with invalid value
+ #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)]
+ InvalidEncryptionAlgorithm(String),
+
+ /// The client sent invalid XML data
+ #[error(display = "Invalid digest: {}", _0)]
+ InvalidDigest(String),
+
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
@@ -125,7 +133,9 @@ impl Error {
Error::NotImplemented(_) => "NotImplemented",
Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange",
+ Error::InvalidDigest(_) => "InvalidDigest",
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
+ Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError",
}
}
}
@@ -143,6 +153,8 @@ impl ApiError for Error {
| Error::InvalidPart
| Error::InvalidPartOrder
| Error::EntityTooSmall
+ | Error::InvalidDigest(_)
+ | Error::InvalidEncryptionAlgorithm(_)
| Error::InvalidXml(_)
| Error::InvalidUtf8Str(_)
| Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index ed996fb1..f5d3cf11 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -1,10 +1,12 @@
//! Function related to GET and HEAD requests
+use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
+use bytes::Bytes;
use futures::future;
-use futures::stream::{self, StreamExt};
+use futures::stream::{self, Stream, StreamExt};
use http::header::{
ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
@@ -25,6 +27,8 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
+use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -42,6 +46,9 @@ pub struct GetObjectOverrides {
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
+ meta_inner: &ObjectVersionMetaInner,
+ encryption: EncryptionParams,
+ checksum_mode: ChecksumMode,
) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta);
@@ -49,7 +56,6 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
- .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
@@ -57,10 +63,31 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
- for (k, v) in version_meta.headers.other.iter() {
- resp = resp.header(k, v.to_string());
+ // When metadata is retrieved through the REST API, Amazon S3 combines headers that
+ // have the same name (ignoring case) into a comma-delimited list.
+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
+ let mut headers_by_name = BTreeMap::new();
+ for (name, value) in meta_inner.headers.iter() {
+ match headers_by_name.get_mut(name) {
+ None => {
+ headers_by_name.insert(name, vec![value.as_str()]);
+ }
+ Some(headers) => {
+ headers.push(value.as_str());
+ }
+ }
+ }
+
+ for (name, values) in headers_by_name {
+ resp = resp.header(name, values.join(","));
+ }
+
+ if checksum_mode.enabled {
+ resp = add_checksum_response_headers(&meta_inner.checksum, resp);
}
+ encryption.add_response_headers(&mut resp);
+
resp
}
@@ -175,21 +202,33 @@ pub async fn handle_head_without_ctx(
return Ok(cached);
}
+ let (encryption, headers) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?;
+
+ let checksum_mode = checksum_mode(&req);
+
if let Some(pn) = part_number {
match version_data {
- ObjectVersionData::Inline(_, bytes) => {
+ ObjectVersionData::Inline(_, _) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", bytes.len()))
- .header(
- CONTENT_RANGE,
- format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
- )
- .header(X_AMZ_MP_PARTS_COUNT, "1")
- .status(StatusCode::PARTIAL_CONTENT)
- .body(empty_body())?)
+ let bytes_len = version_meta.size;
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", bytes_len))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(empty_body())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -201,28 +240,40 @@ pub async fn handle_head_without_ctx(
let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
- .header(
- CONTENT_RANGE,
- format!(
- "bytes {}-{}/{}",
- part_offset,
- part_end - 1,
- version_meta.size
- ),
- )
- .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
- .status(StatusCode::PARTIAL_CONTENT)
- .body(empty_body())?)
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
+ .header(
+ CONTENT_RANGE,
+ format!(
+ "bytes {}-{}/{}",
+ part_offset,
+ part_end - 1,
+ version_meta.size
+ ),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(empty_body())?)
}
_ => unreachable!(),
}
} else {
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", version_meta.size))
- .status(StatusCode::OK)
- .body(empty_body())?)
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK)
+ .body(empty_body())?)
}
}
@@ -273,23 +324,55 @@ pub async fn handle_get_without_ctx(
return Ok(cached);
}
+ let (enc, headers) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?;
+
+ let checksum_mode = checksum_mode(&req);
+
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
- (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
+ (Some(pn), None) => {
+ handle_get_part(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ pn,
+ checksum_mode,
+ )
+ .await
+ }
(None, Some(range)) => {
handle_get_range(
garage,
last_v,
last_v_data,
last_v_meta,
+ enc,
+ &headers,
range.start,
range.start + range.length,
+ checksum_mode,
+ )
+ .await
+ }
+ (None, None) => {
+ handle_get_full(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ overrides,
+ checksum_mode,
)
.await
}
- (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
}
}
@@ -298,17 +381,43 @@ async fn handle_get_full(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
overrides: GetObjectOverrides,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
- let mut resp_builder = object_headers(version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", version_meta.size))
- .status(StatusCode::OK);
+ let mut resp_builder = object_headers(
+ version,
+ version_meta,
+ &meta_inner,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?;
+ let stream = full_object_byte_stream(garage, version, version_data, encryption);
+
+ Ok(resp_builder.body(response_body_from_stream(stream))?)
+}
+
+pub fn full_object_byte_stream(
+ garage: Arc<Garage>,
+ version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ encryption: EncryptionParams,
+) -> ByteStream {
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
+ let bytes = bytes.to_vec();
+ Box::pin(futures::stream::once(async move {
+ encryption
+ .decrypt_blob(&bytes)
+ .map(|x| Bytes::from(x.to_vec()))
+ .map_err(std_error_from_read_error)
+ }))
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2);
@@ -324,19 +433,18 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await
});
- let stream_block_0 = garage
- .block_manager
- .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ let stream_block_0 = encryption
+ .get_block(&garage, &first_block_hash, Some(order_stream.order(0)))
.await?;
+
tx.send(stream_block_0)
.await
.ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
- let stream_block_i = garage
- .block_manager
- .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ let stream_block_i = encryption
+ .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64)))
.await?;
tx.send(stream_block_i)
.await
@@ -354,8 +462,7 @@ async fn handle_get_full(
}
});
- let body = response_body_from_block_stream(rx);
- Ok(resp_builder.body(body)?)
+ Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten())
}
}
}
@@ -365,13 +472,16 @@ async fn handle_get_range(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
begin: u64,
end: u64,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT)
- let resp_builder = object_headers(version, version_meta)
+ let resp_builder = object_headers(version, version_meta, meta_inner, encryption, checksum_mode)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
CONTENT_RANGE,
@@ -382,6 +492,7 @@ async fn handle_get_range(
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
+ let bytes = encryption.decrypt_blob(&bytes)?;
if end as usize <= bytes.len() {
let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
@@ -398,7 +509,8 @@ async fn handle_get_range(
.await?
.ok_or(Error::NoSuchKey)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
@@ -409,17 +521,28 @@ async fn handle_get_part(
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
part_number: u64,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers
- let resp_builder =
- object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
+ let resp_builder = object_headers(
+ object_version,
+ version_meta,
+ meta_inner,
+ encryption,
+ checksum_mode,
+ )
+ .status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if part_number != 1 {
return Err(Error::InvalidPart);
}
+ let bytes = encryption.decrypt_blob(&bytes)?;
+ assert_eq!(bytes.len() as u64, version_meta.size);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
@@ -427,7 +550,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(bytes_body(bytes.to_vec().into()))?)
+ .body(bytes_body(bytes.into_owned().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -439,7 +562,8 @@ async fn handle_get_part(
let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
@@ -492,8 +616,23 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
None
}
+struct ChecksumMode {
+ enabled: bool,
+}
+
+fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
+ ChecksumMode {
+ enabled: req
+ .headers()
+ .get(X_AMZ_CHECKSUM_MODE)
+ .map(|x| x == "ENABLED")
+ .unwrap_or(false),
+ }
+}
+
fn body_from_blocks_range(
garage: Arc<Garage>,
+ encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
@@ -523,12 +662,11 @@ fn body_from_blocks_range(
tokio::spawn(async move {
match async {
- let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
- let block_stream = garage
- .block_manager
- .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await?
+ let block_stream = encryption
+ .get_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
@@ -588,19 +726,30 @@ fn body_from_blocks_range(
}
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
- .flatten()
- .map(|x| {
- x.map(hyper::body::Frame::data)
- .map_err(|e| Error::from(garage_util::error::Error::from(e)))
- });
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
+ response_body_from_stream(body_stream)
+}
+
+fn response_body_from_stream<S>(stream: S) -> ResBody
+where
+ S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
+{
+ let body_stream = stream.map(|x| {
+ x.map(hyper::body::Frame::data)
+ .map_err(|e| Error::from(garage_util::error::Error::from(e)))
+ });
ResBody::new(http_body_util::StreamBody::new(body_stream))
}
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
- let err = std::io::Error::new(
+ Box::pin(stream::once(future::ready(Err(std_error_from_read_error(
+ e,
+ )))))
+}
+
+fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error {
+ std::io::Error::new(
std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- Box::pin(stream::once(future::ready(Err(err))))
+ format!("Error while reading object data: {}", e),
+ )
}
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 302c03f4..648bace2 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use base64::prelude::*;
-use hyper::Response;
+use hyper::{Request, Response};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -15,7 +15,8 @@ use garage_table::EnumerationOrder;
use crate::encoding::*;
use crate::helpers::*;
-use crate::s3::api_server::ResBody;
+use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::multipart as s3_multipart;
use crate::s3::xml as s3_xml;
@@ -271,13 +272,21 @@ pub async fn handle_list_multipart_upload(
pub async fn handle_list_parts(
ctx: ReqCtx,
+ req: Request<ReqBody>,
query: &ListPartsQuery,
) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
- let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
+ let (_, object_version, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
+
+ let object_encryption = match object_version.state {
+ ObjectVersionState::Uploading { encryption, .. } => encryption,
+ _ => unreachable!(),
+ };
+ let encryption_res =
+ EncryptionParams::check_decrypt(&ctx.garage, req.headers(), &object_encryption);
let (info, next) = fetch_part_info(query, &mpu)?;
@@ -296,11 +305,40 @@ pub async fn handle_list_parts(
is_truncated: s3_xml::Value(format!("{}", next.is_some())),
parts: info
.iter()
- .map(|part| s3_xml::PartItem {
- etag: s3_xml::Value(format!("\"{}\"", part.etag)),
- last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
- part_number: s3_xml::IntValue(part.part_number as i64),
- size: s3_xml::IntValue(part.size as i64),
+ .map(|part| {
+ // hide checksum if object is encrypted and the decryption
+ // keys are not provided
+ let checksum = part.checksum.filter(|_| encryption_res.is_ok());
+ s3_xml::PartItem {
+ etag: s3_xml::Value(format!("\"{}\"", part.etag)),
+ last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
+ part_number: s3_xml::IntValue(part.part_number as i64),
+ size: s3_xml::IntValue(part.size as i64),
+ checksum_crc32: match &checksum {
+ Some(ChecksumValue::Crc32(x)) => {
+ Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
+ }
+ _ => None,
+ },
+ checksum_crc32c: match &checksum {
+ Some(ChecksumValue::Crc32c(x)) => {
+ Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
+ }
+ _ => None,
+ },
+ checksum_sha1: match &checksum {
+ Some(ChecksumValue::Sha1(x)) => {
+ Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
+ }
+ _ => None,
+ },
+ checksum_sha256: match &checksum {
+ Some(ChecksumValue::Sha256(x)) => {
+ Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
+ }
+ _ => None,
+ },
+ }
})
.collect(),
@@ -346,6 +384,7 @@ struct PartInfo<'a> {
timestamp: u64,
part_number: u64,
size: u64,
+ checksum: Option<ChecksumValue>,
}
enum ExtractionResult {
@@ -486,6 +525,7 @@ fn fetch_part_info<'a>(
timestamp: pk.timestamp,
etag,
size,
+ checksum: p.checksum,
};
match parts.last_mut() {
Some(lastpart) if lastpart.part_number == pk.part_number => {
@@ -944,10 +984,13 @@ mod tests {
timestamp: TS,
state: ObjectVersionState::Uploading {
multipart: true,
- headers: ObjectVersionHeaders {
- content_type: "text/plain".to_string(),
- other: BTreeMap::<String, String>::new(),
+ encryption: ObjectVersionEncryption::Plaintext {
+ inner: ObjectVersionMetaInner {
+ headers: vec![],
+ checksum: None,
+ },
},
+ checksum_algorithm: None,
},
}
}
@@ -1136,6 +1179,7 @@ mod tests {
version: uuid,
size: Some(3),
etag: Some("etag1".into()),
+ checksum: None,
},
),
(
@@ -1147,6 +1191,7 @@ mod tests {
version: uuid,
size: None,
etag: None,
+ checksum: None,
},
),
(
@@ -1158,6 +1203,7 @@ mod tests {
version: uuid,
size: Some(10),
etag: Some("etag2".into()),
+ checksum: None,
},
),
(
@@ -1169,6 +1215,7 @@ mod tests {
version: uuid,
size: Some(7),
etag: Some("etag3".into()),
+ checksum: None,
},
),
(
@@ -1180,6 +1227,7 @@ mod tests {
version: uuid,
size: Some(5),
etag: Some("etag4".into()),
+ checksum: None,
},
),
];
@@ -1218,12 +1266,14 @@ mod tests {
etag: "etag1",
timestamp: TS,
part_number: 1,
- size: 3
+ size: 3,
+ checksum: None,
},
PartInfo {
etag: "etag2",
timestamp: TS,
part_number: 3,
+ checksum: None,
size: 10
},
]
@@ -1239,12 +1289,14 @@ mod tests {
PartInfo {
etag: "etag3",
timestamp: TS,
+ checksum: None,
part_number: 5,
size: 7
},
PartInfo {
etag: "etag4",
timestamp: TS,
+ checksum: None,
part_number: 8,
size: 5
},
@@ -1268,24 +1320,28 @@ mod tests {
PartInfo {
etag: "etag1",
timestamp: TS,
+ checksum: None,
part_number: 1,
size: 3
},
PartInfo {
etag: "etag2",
timestamp: TS,
+ checksum: None,
part_number: 3,
size: 10
},
PartInfo {
etag: "etag3",
timestamp: TS,
+ checksum: None,
part_number: 5,
size: 7
},
PartInfo {
etag: "etag4",
timestamp: TS,
+ checksum: None,
part_number: 8,
size: 5
},
diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs
index cbdb94ab..b9bb1a6f 100644
--- a/src/api/s3/mod.rs
+++ b/src/api/s3/mod.rs
@@ -13,5 +13,7 @@ mod post_object;
mod put;
mod website;
+mod checksum;
+mod encryption;
mod router;
pub mod xml;
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 1d5aeb26..3db3e8aa 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -1,9 +1,10 @@
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
+use base64::prelude::*;
use futures::prelude::*;
use hyper::{Request, Response};
-use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
@@ -16,6 +17,8 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -40,6 +43,16 @@ pub async fn handle_create_multipart_upload(
let timestamp = next_timestamp(existing_object.as_ref());
let headers = get_headers(req.headers())?;
+ let meta = ObjectVersionMetaInner {
+ headers,
+ checksum: None,
+ };
+
+ // Determine whether object should be encrypted, and if so the key
+ let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
+ let object_encryption = encryption.encrypt_meta(meta)?;
+
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@@ -47,7 +60,8 @@ pub async fn handle_create_multipart_upload(
timestamp,
state: ObjectVersionState::Uploading {
multipart: true,
- headers,
+ encryption: object_encryption,
+ checksum_algorithm,
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@@ -68,7 +82,9 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let mut resp = Response::builder();
+ encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_put_part(
@@ -83,20 +99,37 @@ pub async fn handle_put_part(
let upload_id = decode_upload_id(upload_id)?;
- let content_md5 = match req.headers().get("content-md5") {
- Some(x) => Some(x.to_str()?.to_string()),
- None => None,
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: content_sha256,
+ extra: request_checksum_value(req.headers())?,
};
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let stream = body_stream(req.into_body());
+ let (req_head, req_body) = req.into_parts();
+ let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
- let ((_, _, mut mpu), first_block) =
+ let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
+ // Check encryption params
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
+ _ => unreachable!(),
+ };
+ let (encryption, _) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -122,7 +155,9 @@ pub async fn handle_put_part(
mpu_part_key,
MpuPart {
version: version_uuid,
+ // all these are filled in later, at the end of this function
etag: None,
+ checksum: None,
size: None,
},
);
@@ -136,24 +171,31 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
+ let checksummer =
+ Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
+ let (total_size, checksums, _) = read_and_put_blocks(
+ &ctx,
+ &version,
+ encryption,
+ part_number,
+ first_block,
+ &mut chunker,
+ checksummer,
+ )
+ .await?;
// Verify that checksums map
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
+ checksums.verify(&expected_checksums)?;
// Store part etag in version
- let data_md5sum_hex = hex::encode(data_md5sum);
+ let etag = encryption.etag_from_md5(&checksums.md5);
+
mpu.parts.put(
mpu_part_key,
MpuPart {
version: version_uuid,
- etag: Some(data_md5sum_hex.clone()),
+ etag: Some(etag.clone()),
+ checksum: checksums.extract(checksum_algorithm),
size: Some(total_size),
},
);
@@ -163,11 +205,10 @@ pub async fn handle_put_part(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
- let response = Response::builder()
- .header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(empty_body())
- .unwrap();
- Ok(response)
+ let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
+ encryption.add_response_headers(&mut resp);
+ let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
+ Ok(resp.body(empty_body())?)
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
@@ -214,10 +255,11 @@ pub async fn handle_complete_multipart_upload(
bucket_name,
..
} = &ctx;
+ let (req_head, req_body) = req.into_parts();
+
+ let expected_checksum = request_checksum_value(&req_head.headers)?;
- let body = http_body_util::BodyExt::collect(req.into_body())
- .await?
- .to_bytes();
+ let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -241,8 +283,12 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded"));
}
- let headers = match object_version.state {
- ObjectVersionState::Uploading { headers, .. } => headers,
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
_ => unreachable!(),
};
@@ -270,6 +316,13 @@ pub async fn handle_complete_multipart_upload(
for req_part in body_list_of_parts.iter() {
match have_parts.get(&req_part.part_number) {
Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => {
+ // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum {
+ if part.checksum != req_part.checksum {
+ return Err(Error::InvalidDigest(format!(
+ "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}",
+ req_part.part_number, req_part.checksum, part.checksum
+ )));
+ }
parts.push(*part)
}
_ => return Err(Error::InvalidPart),
@@ -317,18 +370,23 @@ pub async fn handle_complete_multipart_upload(
});
garage.block_ref_table.insert_many(block_refs).await?;
- // Calculate etag of final object
+ // Calculate checksum and etag of final object
// To understand how etags are calculated, read more here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
// https://teppen.io/2018/06/23/aws_s3_etags/
- let mut etag_md5_hasher = Md5::new();
+ let mut checksummer = MultipartChecksummer::init(checksum_algorithm);
for part in parts.iter() {
- etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes());
+ checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?;
}
- let etag = format!(
- "{}-{}",
- hex::encode(etag_md5_hasher.finalize()),
- parts.len()
- );
+ let (checksum_md5, checksum_extra) = checksummer.finalize();
+
+ if expected_checksum.is_some() && checksum_extra != expected_checksum {
+ return Err(Error::InvalidDigest(
+ "Failed to validate x-amz-checksum-*".into(),
+ ));
+ }
+
+ let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len());
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
@@ -341,10 +399,24 @@ pub async fn handle_complete_multipart_upload(
return Err(e);
}
+ // If there is a checksum algorithm, update metadata with checksum
+ let object_encryption = match checksum_algorithm {
+ None => object_encryption,
+ Some(_) => {
+ let (encryption, meta) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+ let new_meta = ObjectVersionMetaInner {
+ headers: meta.into_owned().headers,
+ checksum: checksum_extra,
+ };
+ encryption.encrypt_meta(new_meta)?
+ }
+ };
+
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
- headers,
+ encryption: object_encryption,
size: total_size,
etag: etag.clone(),
},
@@ -361,10 +433,28 @@ pub async fn handle_complete_multipart_upload(
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(format!("\"{}\"", etag)),
+ checksum_crc32: match &checksum_extra {
+ Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_crc32c: match &checksum_extra {
+ Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha1: match &checksum_extra {
+ Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha256: match &checksum_extra {
+ Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let resp = Response::builder();
+ let resp = add_checksum_response_headers(&expected_checksum, resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_abort_multipart_upload(
@@ -433,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
struct CompleteMultipartUploadPart {
etag: String,
part_number: u64,
+ checksum: Option<ChecksumValue>,
}
fn parse_complete_multipart_upload_body(
@@ -458,9 +549,41 @@ fn parse_complete_multipart_upload_body(
.children()
.find(|e| e.has_tag_name("PartNumber"))?
.text()?;
+ let checksum = if let Some(crc32) =
+ item.children().find(|e| e.has_tag_name("ChecksumCRC32"))
+ {
+ Some(ChecksumValue::Crc32(
+ BASE64_STANDARD.decode(crc32.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C"))
+ {
+ Some(ChecksumValue::Crc32c(
+ BASE64_STANDARD.decode(crc32c.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) {
+ Some(ChecksumValue::Sha1(
+ BASE64_STANDARD.decode(sha1.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256"))
+ {
+ Some(ChecksumValue::Sha256(
+ BASE64_STANDARD.decode(sha256.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else {
+ None
+ };
parts.push(CompleteMultipartUploadPart {
etag: etag.trim_matches('"').to_string(),
part_number: part_number.parse().ok()?,
+ checksum,
});
} else {
return None;
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 66f8174c..2c106b3b 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -14,12 +14,15 @@ use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
+use crate::s3::checksum::*;
use crate::s3::cors::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
-use crate::s3::put::{get_headers, save_stream};
+use crate::s3::put::{get_headers, save_stream, ChecksumMode};
use crate::s3::xml as s3_xml;
use crate::signature::payload::{verify_v4, Authorization};
@@ -48,13 +51,17 @@ pub async fn handle_post_object(
let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
- let field = loop {
+ let file_field = loop {
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
return Err(Error::bad_request("Request did not contain a file"));
};
- let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
+ let name: HeaderName = if let Some(Ok(name)) = field
+ .name()
+ .map(str::to_ascii_lowercase)
+ .map(TryInto::try_into)
+ {
name
} else {
continue;
@@ -96,7 +103,7 @@ pub async fn handle_post_object(
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
- if let Some(filename) = field.file_name() {
+ if let Some(filename) = file_field.file_name() {
key.replace("${filename}", filename)
} else {
key.to_owned()
@@ -143,9 +150,8 @@ pub async fn handle_post_object(
let mut conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() {
- let mut param_key = param_key.to_string();
- param_key.make_ascii_lowercase();
- match param_key.as_str() {
+ let param_key = param_key.as_str();
+ match param_key {
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
@@ -190,7 +196,7 @@ pub async fn handle_post_object(
// how aws seems to behave.
continue;
}
- let conds = conditions.params.remove(&param_key).ok_or_else(|| {
+ let conds = conditions.params.remove(param_key).ok_or_else(|| {
Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
@@ -218,8 +224,24 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?;
- let stream = field.map(|r| r.map_err(Into::into));
+ let expected_checksums = ExpectedChecksums {
+ md5: params
+ .get("content-md5")
+ .map(HeaderValue::to_str)
+ .transpose()?
+ .map(str::to_string),
+ sha256: None,
+ extra: request_checksum_algorithm_value(&params)?,
+ };
+
+ let meta = ObjectVersionMetaInner {
+ headers,
+ checksum: expected_checksums.extra,
+ };
+
+ let encryption = EncryptionParams::new_from_headers(&garage, &params)?;
+ let stream = file_field.map(|r| r.map_err(Into::into));
let ctx = ReqCtx {
garage,
bucket_id,
@@ -228,17 +250,17 @@ pub async fn handle_post_object(
api_key,
};
- let (_, md5) = save_stream(
+ let res = save_stream(
&ctx,
- headers,
+ meta,
+ encryption,
StreamLimiter::new(stream, conditions.content_length),
&key,
- None,
- None,
+ ChecksumMode::Verify(&expected_checksums),
)
.await?;
- let etag = format!("\"{}\"", md5);
+ let etag = format!("\"{}\"", res.etag);
let mut resp = if let Some(mut target) = params
.get("success_action_redirect")
@@ -252,11 +274,12 @@ pub async fn handle_post_object(
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
- Response::builder()
+ let mut resp = Response::builder()
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
- .header(header::ETAG, etag)
- .body(string_body(target))?
+ .header(header::ETAG, etag);
+ encryption.add_response_headers(&mut resp);
+ resp.body(string_body(target))?
} else {
let path = head
.uri
@@ -283,9 +306,10 @@ pub async fn handle_post_object(
.get("success_action_status")
.and_then(|h| h.to_str().ok())
.unwrap_or("204");
- let builder = Response::builder()
+ let mut builder = Response::builder()
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
+ encryption.add_response_headers(&mut builder);
match action {
"200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => {
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 685cca80..1e3b1b44 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,12 +1,9 @@
-use std::collections::{BTreeMap, HashMap};
+use std::collections::HashMap;
use std::sync::Arc;
-use base64::prelude::*;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::try_join;
-use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
-use sha2::Sha256;
use tokio::sync::mpsc;
@@ -22,7 +19,6 @@ use opentelemetry::{
use garage_net::bytes_buf::BytesBuf;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
-use garage_util::async_hash::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -36,10 +32,24 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
+pub(crate) struct SaveStreamResult {
+ pub(crate) version_uuid: Uuid,
+ pub(crate) version_timestamp: u64,
+ /// Etag WITHOUT THE QUOTES (just the hex value)
+ pub(crate) etag: String,
+}
+
+pub(crate) enum ChecksumMode<'a> {
+ Verify(&'a ExpectedChecksums),
+ Calculate(Option<ChecksumAlgorithm>),
+}
+
pub async fn handle_put(
ctx: ReqCtx,
req: Request<ReqBody>,
@@ -50,26 +60,51 @@ pub async fn handle_put(
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
- let content_md5 = match req.headers().get("content-md5") {
- Some(x) => Some(x.to_str()?.to_string()),
- None => None,
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: content_sha256,
+ extra: request_checksum_value(req.headers())?,
+ };
+
+ let meta = ObjectVersionMetaInner {
+ headers,
+ checksum: expected_checksums.extra,
};
+ // Determine whether object should be encrypted, and if so the key
+ let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
+
let stream = body_stream(req.into_body());
- save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
- .await
- .map(|(uuid, md5)| put_response(uuid, md5))
+ let res = save_stream(
+ &ctx,
+ meta,
+ encryption,
+ stream,
+ key,
+ ChecksumMode::Verify(&expected_checksums),
+ )
+ .await?;
+
+ let mut resp = Response::builder()
+ .header("x-amz-version-id", hex::encode(res.version_uuid))
+ .header("ETag", format!("\"{}\"", res.etag));
+ encryption.add_response_headers(&mut resp);
+ let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
+ Ok(resp.body(empty_body())?)
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx,
- headers: ObjectVersionHeaders,
+ mut meta: ObjectVersionMetaInner,
+ encryption: EncryptionParams,
body: S,
key: &String,
- content_md5: Option<String>,
- content_sha256: Option<FixedBytes32>,
-) -> Result<(Uuid, String), Error> {
+ checksum_mode: ChecksumMode<'_>,
+) -> Result<SaveStreamResult, Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
@@ -86,43 +121,55 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref());
+ let mut checksummer = match checksum_mode {
+ ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
+ ChecksumMode::Calculate(algo) => {
+ Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo)
+ }
+ };
+
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD {
- let mut md5sum = Md5::new();
- md5sum.update(&first_block[..]);
- let data_md5sum = md5sum.finalize();
- let data_md5sum_hex = hex::encode(data_md5sum);
+ checksummer.update(&first_block);
+ let checksums = checksummer.finalize();
- let data_sha256sum = sha256sum(&first_block[..]);
- let size = first_block.len() as u64;
-
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
+ match checksum_mode {
+ ChecksumMode::Verify(expected) => {
+ checksums.verify(&expected)?;
+ }
+ ChecksumMode::Calculate(algo) => {
+ meta.checksum = checksums.extract(algo);
+ }
+ };
+ let size = first_block.len() as u64;
check_quotas(ctx, size, existing_object.as_ref()).await?;
+ let etag = encryption.etag_from_md5(&checksums.md5);
+ let inline_data = encryption.encrypt_blob(&first_block)?.to_vec();
+
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
- headers,
+ encryption: encryption.encrypt_meta(meta)?,
size,
- etag: data_md5sum_hex.clone(),
+ etag: etag.clone(),
},
- first_block.to_vec(),
+ inline_data,
)),
};
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- return Ok((version_uuid, data_md5sum_hex));
+ return Ok(SaveStreamResult {
+ version_uuid,
+ version_timestamp,
+ etag,
+ });
}
// The following consists in many steps that can each fail.
@@ -142,7 +189,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Uploading {
- headers: headers.clone(),
+ encryption: encryption.encrypt_meta(meta.clone())?,
+ checksum_algorithm: None, // don't care; overwritten later
multipart: false,
},
};
@@ -163,26 +211,39 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
);
garage.version_table.insert(&version).await?;
- // Transfer data and verify checksum
- let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
- read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
-
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
+ // Transfer data
+ let (total_size, checksums, first_block_hash) = read_and_put_blocks(
+ ctx,
+ &version,
+ encryption,
+ 1,
+ first_block,
+ &mut chunker,
+ checksummer,
+ )
+ .await?;
+
+ // Verify checksums are ok / add calculated checksum to metadata
+ match checksum_mode {
+ ChecksumMode::Verify(expected) => {
+ checksums.verify(&expected)?;
+ }
+ ChecksumMode::Calculate(algo) => {
+ meta.checksum = checksums.extract(algo);
+ }
+ };
+ // Verify quotas are respsected
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
- let md5sum_hex = hex::encode(data_md5sum);
+ let etag = encryption.etag_from_md5(&checksums.md5);
+
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
- headers,
+ encryption: encryption.encrypt_meta(meta)?,
size: total_size,
- etag: md5sum_hex.clone(),
+ etag: etag.clone(),
},
first_block_hash,
));
@@ -193,34 +254,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
- Ok((version_uuid, md5sum_hex))
-}
-
-/// Validate MD5 sum against content-md5 header
-/// and sha256sum against signed content-sha256
-pub(crate) fn ensure_checksum_matches(
- data_md5sum: &[u8],
- data_sha256sum: garage_util::data::FixedBytes32,
- content_md5: Option<&str>,
- content_sha256: Option<garage_util::data::FixedBytes32>,
-) -> Result<(), Error> {
- if let Some(expected_sha256) = content_sha256 {
- if expected_sha256 != data_sha256sum {
- return Err(Error::bad_request(
- "Unable to validate x-amz-content-sha256",
- ));
- } else {
- trace!("Successfully validated x-amz-content-sha256");
- }
- }
- if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) {
- return Err(Error::bad_request("Unable to validate content-md5"));
- } else {
- trace!("Successfully validated content-md5");
- }
- }
- Ok(())
+ Ok(SaveStreamResult {
+ version_uuid,
+ version_timestamp,
+ etag,
+ })
}
/// Check that inserting this object with this size doesn't exceed bucket quotas
@@ -248,7 +286,7 @@ pub(crate) async fn check_quotas(
.await?;
let counters = counters
- .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
let (prev_cnt_obj, prev_cnt_size) = match prev_object {
@@ -290,10 +328,12 @@ pub(crate) async fn check_quotas(
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx,
version: &Version,
+ encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
chunker: &mut StreamChunker<S>,
-) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> {
+ checksummer: Checksummer,
+) -> Result<(u64, Checksums, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage");
let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2);
@@ -321,20 +361,20 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1);
let hash_stream = async {
- let md5hasher = AsyncHasher::<Md5>::new();
- let sha256hasher = AsyncHasher::<Sha256>::new();
+ let mut checksummer = checksummer;
while let Some(next) = block_rx.recv().await {
match next {
Ok(block) => {
block_tx2.send(Ok(block.clone())).await?;
- futures::future::join(
- md5hasher.update(block.clone()),
- sha256hasher.update(block.clone()),
- )
+ checksummer = tokio::task::spawn_blocking(move || {
+ checksummer.update(&block);
+ checksummer
+ })
.with_context(Context::current_with_span(
tracer.start("Hash block (md5, sha256)"),
))
- .await;
+ .await
+ .unwrap()
}
Err(e) => {
block_tx2.send(Err(e)).await?;
@@ -343,27 +383,38 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}
}
drop(block_tx2);
- Ok::<_, mpsc::error::SendError<_>>(futures::join!(
- md5hasher.finalize(),
- sha256hasher.finalize()
- ))
+ Ok::<_, mpsc::error::SendError<_>>(checksummer)
};
- let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1);
- let hash_blocks = async {
+ let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1);
+ let encrypt_hash_blocks = async {
let mut first_block_hash = None;
while let Some(next) = block_rx2.recv().await {
match next {
Ok(block) => {
- let hash = async_blake2sum(block.clone())
- .with_context(Context::current_with_span(
- tracer.start("Hash block (blake2)"),
- ))
- .await;
- if first_block_hash.is_none() {
- first_block_hash = Some(hash);
+ let unencrypted_len = block.len() as u64;
+ let res = tokio::task::spawn_blocking(move || {
+ let block = encryption.encrypt_block(block)?;
+ let hash = blake2sum(&block);
+ Ok((block, hash))
+ })
+ .with_context(Context::current_with_span(
+ tracer.start("Encrypt and hash (blake2) block"),
+ ))
+ .await
+ .unwrap();
+ match res {
+ Ok((block, hash)) => {
+ if first_block_hash.is_none() {
+ first_block_hash = Some(hash);
+ }
+ block_tx3.send(Ok((block, unencrypted_len, hash))).await?;
+ }
+ Err(e) => {
+ block_tx3.send(Err(e)).await?;
+ break;
+ }
}
- block_tx3.send(Ok((block, hash))).await?;
}
Err(e) => {
block_tx3.send(Err(e)).await?;
@@ -398,7 +449,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
block_rx3.recv().await
}
};
- let (block, hash) = tokio::select! {
+ let (block, unencrypted_len, hash) = tokio::select! {
result = write_futs_next => {
result?;
continue;
@@ -410,17 +461,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
// For next block to be written: count its size and spawn future to write it
- let offset = written_bytes;
- written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
ctx,
version,
part_number,
- offset,
+ written_bytes,
hash,
block,
+ unencrypted_len,
+ encryption.is_encrypted(),
order_stream.order(written_bytes),
));
+ written_bytes += unencrypted_len;
}
while let Some(res) = write_futs.next().await {
res?;
@@ -429,17 +481,15 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
let (_, stream_hash_result, block_hash_result, final_result) =
- futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks);
+ futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks);
let total_size = final_result?;
// unwrap here is ok, because if hasher failed, it is because something failed
// later in the pipeline which already caused a return at the ? on previous line
- let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap();
let first_block_hash = block_hash_result.unwrap();
+ let checksums = stream_hash_result.unwrap().finalize();
- let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
-
- Ok((total_size, data_md5sum, data_sha256sum, first_block_hash))
+ Ok((total_size, checksums, first_block_hash))
}
async fn put_block_and_meta(
@@ -449,6 +499,8 @@ async fn put_block_and_meta(
offset: u64,
hash: Hash,
block: Bytes,
+ size: u64,
+ is_encrypted: bool,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx;
@@ -459,10 +511,7 @@ async fn put_block_and_meta(
part_number,
offset,
},
- VersionBlock {
- hash,
- size: block.len() as u64,
- },
+ VersionBlock { hash, size },
);
let block_ref = BlockRef {
@@ -474,7 +523,7 @@ async fn put_block_and_meta(
futures::try_join!(
garage
.block_manager
- .rpc_put_block(hash, block, Some(order_tag)),
+ .rpc_put_block(hash, block, is_encrypted, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
@@ -517,14 +566,6 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
}
}
-pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
- Response::builder()
- .header("x-amz-version-id", hex::encode(version_uuid))
- .header("ETag", format!("\"{}\"", md5sum_hex))
- .body(empty_body())
- .unwrap()
-}
-
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
@@ -559,57 +600,35 @@ impl Drop for InterruptedCleanup {
// ============ helpers ============
-pub(crate) fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
- Ok(headers
- .get(hyper::header::CONTENT_TYPE)
- .map(|x| x.to_str())
- .unwrap_or(Ok("blob"))?
- .to_string())
-}
-
-pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
- let content_type = get_mime_type(headers)?;
- let mut other = BTreeMap::new();
+pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> {
+ let mut ret = Vec::new();
// Preserve standard headers
let standard_header = vec![
+ hyper::header::CONTENT_TYPE,
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
- for h in standard_header.iter() {
- if let Some(v) = headers.get(h) {
- match v.to_str() {
- Ok(v_str) => {
- other.insert(h.to_string(), v_str.to_string());
- }
- Err(e) => {
- warn!("Discarding header {}, error in .to_str(): {}", h, e);
- }
- }
+ for name in standard_header.iter() {
+ if let Some(value) = headers.get(name) {
+ ret.push((name.to_string(), value.to_str()?.to_string()));
}
}
// Preserve x-amz-meta- headers
- for (k, v) in headers.iter() {
- if k.as_str().starts_with("x-amz-meta-") {
- match std::str::from_utf8(v.as_bytes()) {
- Ok(v_str) => {
- other.insert(k.to_string(), v_str.to_string());
- }
- Err(e) => {
- warn!("Discarding header {}, error in .to_str(): {}", k, e);
- }
- }
+ for (name, value) in headers.iter() {
+ if name.as_str().starts_with("x-amz-meta-") {
+ ret.push((
+ name.to_string(),
+ std::str::from_utf8(value.as_bytes())?.to_string(),
+ ));
}
}
- Ok(ObjectVersionHeaders {
- content_type,
- other,
- })
+ Ok(ret)
}
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs
index 06f11288..1e569ade 100644
--- a/src/api/s3/xml.rs
+++ b/src/api/s3/xml.rs
@@ -131,6 +131,14 @@ pub struct CompleteMultipartUploadResult {
pub key: Value,
#[serde(rename = "ETag")]
pub etag: Value,
+ #[serde(rename = "ChecksumCRC32")]
+ pub checksum_crc32: Option<Value>,
+ #[serde(rename = "ChecksumCRC32C")]
+ pub checksum_crc32c: Option<Value>,
+ #[serde(rename = "ChecksumSHA1")]
+ pub checksum_sha1: Option<Value>,
+ #[serde(rename = "ChecksumSHA256")]
+ pub checksum_sha256: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq, Eq)]
@@ -197,6 +205,14 @@ pub struct PartItem {
pub part_number: IntValue,
#[serde(rename = "Size")]
pub size: IntValue,
+ #[serde(rename = "ChecksumCRC32")]
+ pub checksum_crc32: Option<Value>,
+ #[serde(rename = "ChecksumCRC32C")]
+ pub checksum_crc32c: Option<Value>,
+ #[serde(rename = "ChecksumSHA1")]
+ pub checksum_sha1: Option<Value>,
+ #[serde(rename = "ChecksumSHA256")]
+ pub checksum_sha256: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq, Eq)]
@@ -500,6 +516,10 @@ mod tests {
bucket: Value("mybucket".to_string()),
key: Value("a/plop".to_string()),
etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()),
+ checksum_crc32: None,
+ checksum_crc32c: None,
+ checksum_sha1: Some(Value("ZJAnHyG8PeKz9tI8UTcHrJos39A=".into())),
+ checksum_sha256: None,
};
assert_eq!(
to_xml_with_header(&result)?,
@@ -509,6 +529,7 @@ mod tests {
<Bucket>mybucket</Bucket>\
<Key>a/plop</Key>\
<ETag>&quot;3858f62230ac3c915f300c664312c11f-9&quot;</ETag>\
+ <ChecksumSHA1>ZJAnHyG8PeKz9tI8UTcHrJos39A=</ChecksumSHA1>\
</CompleteMultipartUploadResult>"
);
Ok(())
@@ -780,12 +801,22 @@ mod tests {
last_modified: Value("2010-11-10T20:48:34.000Z".to_string()),
part_number: IntValue(2),
size: IntValue(10485760),
+ checksum_crc32: None,
+ checksum_crc32c: None,
+ checksum_sha256: Some(Value(
+ "5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=".into(),
+ )),
+ checksum_sha1: None,
},
PartItem {
etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()),
last_modified: Value("2010-11-10T20:48:33.000Z".to_string()),
part_number: IntValue(3),
size: IntValue(10485760),
+ checksum_sha256: None,
+ checksum_crc32c: None,
+ checksum_crc32: Some(Value("ZJAnHyG8=".into())),
+ checksum_sha1: None,
},
],
initiator: Initiator {
@@ -820,12 +851,14 @@ mod tests {
<LastModified>2010-11-10T20:48:34.000Z</LastModified>\
<PartNumber>2</PartNumber>\
<Size>10485760</Size>\
+ <ChecksumSHA256>5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=</ChecksumSHA256>\
</Part>\
<Part>\
<ETag>&quot;aaaa18db4cc2f85cedef654fccc4a4x8&quot;</ETag>\
<LastModified>2010-11-10T20:48:33.000Z</LastModified>\
<PartNumber>3</PartNumber>\
<Size>10485760</Size>\
+ <ChecksumCRC32>ZJAnHyG8=</ChecksumCRC32>\
</Part>\
<Initiator>\
<DisplayName>umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx</DisplayName>\