diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 8 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 2 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 4 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 176 | ||||
-rw-r--r-- | src/api/common_error.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/index.rs | 13 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 2 | ||||
-rw-r--r-- | src/api/s3/checksum.rs | 406 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 373 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 595 | ||||
-rw-r--r-- | src/api/s3/error.rs | 12 | ||||
-rw-r--r-- | src/api/s3/get.rs | 283 | ||||
-rw-r--r-- | src/api/s3/list.rs | 80 | ||||
-rw-r--r-- | src/api/s3/mod.rs | 2 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 199 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 60 | ||||
-rw-r--r-- | src/api/s3/put.rs | 327 | ||||
-rw-r--r-- | src/api/s3/xml.rs | 33 |
18 files changed, 2153 insertions, 430 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 317031a7..1b87496c 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.9.3" +version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -21,11 +21,15 @@ garage_net.workspace = true garage_util.workspace = true garage_rpc.workspace = true +aes-gcm.workspace = true argon2.workspace = true +async-compression.workspace = true async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true +crc32fast.workspace = true +crc32c.workspace = true crypto-common.workspace = true err-derive.workspace = true hex.workspace = true @@ -35,12 +39,14 @@ tracing.workspace = true md-5.workspace = true nom.workspace = true pin-project.workspace = true +sha1.workspace = true sha2.workspace = true futures.workspace = true futures-util.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true form_urlencoded.workspace = true http.workspace = true diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 265639c4..0e4565bb 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -276,7 +276,7 @@ impl ApiHandler for AdminApiServer { Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, - Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await, // Keys Endpoint::ListKeys => handle_list_keys(&self.garage).await, Endpoint::GetKeyInfo { diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index cfe8a6c4..ac3cba00 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -123,7 +123,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mpu_counters = garage @@ -131,7 +131,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 3876c608..8c9cb1e5 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -16,25 +17,95 @@ use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { + let layout = garage.system.cluster_layout(); + let mut nodes = garage + .system + .get_known_nodes() + .into_iter() + .map(|i| { + ( + i.id, + NodeResp { + id: hex::encode(i.id), + addr: i.addr, + hostname: i.status.hostname, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + data_partition: i + .status + .data_disk_avail + .map(|(avail, total)| FreeSpaceResp { + available: avail, + total, + }), + metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| { + FreeSpaceResp { + available: avail, + total, + } + }), + ..Default::default() + }, + ) + }) + .collect::<HashMap<_, _>>(); + + for (id, _, role) in layout.current().roles.items().iter() { + if let layout::NodeRoleV(Some(r)) = role { + let role = NodeRoleResp { + id: hex::encode(id), + zone: r.zone.to_string(), + capacity: r.capacity, + tags: r.tags.clone(), + }; + match nodes.get_mut(id) { + None => { + nodes.insert( + *id, + NodeResp { + id: hex::encode(id), + role: Some(role), + ..Default::default() + }, + ); + } + Some(n) => { + if n.role.is_none() { + n.role = Some(role); + } + } + } + } + } + + for ver in layout.versions.iter().rev().skip(1) { + for (id, _, role) in ver.roles.items().iter() { + if let layout::NodeRoleV(Some(r)) = role { + if !nodes.contains_key(id) && r.capacity.is_some() { + nodes.insert( + *id, + NodeResp { + id: hex::encode(id), + draining: true, + ..Default::default() + }, + ); + } + } + } + } + + let mut nodes = nodes.into_values().collect::<Vec<_>>(); + nodes.sort_by(|x, y| x.id.cmp(&y.id)); + let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), garage_features: garage_util::version::garage_features(), rust_version: garage_util::version::rust_version(), db_engine: garage.db.engine(), - known_nodes: garage - .system - .get_known_nodes() - .into_iter() - .map(|i| KnownNodeResp { - id: hex::encode(i.id), - addr: i.addr, - is_up: i.is_up, - last_seen_secs_ago: i.last_seen_secs_ago, - hostname: i.status.hostname, - }) - .collect(), - layout: format_cluster_layout(&garage.system.get_cluster_layout()), + layout_version: layout.current().version, + nodes, }; Ok(json_ok_response(&res)?) @@ -85,13 +156,14 @@ pub async fn handle_connect_cluster_nodes( } pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let res = format_cluster_layout(&garage.system.get_cluster_layout()); + let res = format_cluster_layout(&garage.system.cluster_layout()); Ok(json_ok_response(&res)?) } -fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse { +fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse { let roles = layout + .current() .roles .items() .iter() @@ -105,10 +177,12 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .collect::<Vec<_>>(); let staged_role_changes = layout - .staging_roles + .staging + .get() + .roles .items() .iter() - .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v)) .map(|(k, _, v)| match &v.0 { None => NodeRoleChange { id: hex::encode(k), @@ -126,7 +200,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .collect::<Vec<_>>(); GetClusterLayoutResponse { - version: layout.version, + version: layout.current().version, roles, staged_role_changes, } @@ -155,8 +229,8 @@ struct GetClusterStatusResponse { garage_features: Option<&'static [&'static str]>, rust_version: &'static str, db_engine: String, - known_nodes: Vec<KnownNodeResp>, - layout: GetClusterLayoutResponse, + layout_version: u64, + nodes: Vec<NodeResp>, } #[derive(Serialize)] @@ -190,14 +264,27 @@ struct NodeRoleResp { tags: Vec<String>, } -#[derive(Serialize)] +#[derive(Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct FreeSpaceResp { + available: u64, + total: u64, +} + +#[derive(Serialize, Default)] #[serde(rename_all = "camelCase")] -struct KnownNodeResp { +struct NodeResp { id: String, - addr: SocketAddr, + role: Option<NodeRoleResp>, + addr: Option<SocketAddr>, + hostname: Option<String>, is_up: bool, last_seen_secs_ago: Option<u64>, - hostname: String, + draining: bool, + #[serde(skip_serializing_if = "Option::is_none")] + data_partition: Option<FreeSpaceResp>, + #[serde(skip_serializing_if = "Option::is_none")] + metadata_partition: Option<FreeSpaceResp>, } // ---- update functions ---- @@ -208,10 +295,10 @@ pub async fn handle_update_cluster_layout( ) -> Result<Response<ResBody>, Error> { let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; - let mut layout = garage.system.get_cluster_layout(); + let mut layout = garage.system.cluster_layout().clone(); - let mut roles = layout.roles.clone(); - roles.merge(&layout.staging_roles); + let mut roles = layout.current().roles.clone(); + roles.merge(&layout.staging.get().roles); for change in updates { let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; @@ -232,11 +319,17 @@ pub async fn handle_update_cluster_layout( }; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) @@ -246,12 +339,16 @@ pub async fn handle_apply_cluster_layout( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; + let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?; - let layout = garage.system.get_cluster_layout(); + let layout = garage.system.cluster_layout().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = ApplyClusterLayoutResponse { message: msg, @@ -262,13 +359,14 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout( garage: &Arc<Garage>, - req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; - - let layout = garage.system.get_cluster_layout(); - let layout = layout.revert_staged_changes(Some(param.version))?; - garage.system.update_cluster_layout(&layout).await?; + let layout = garage.system.cluster_layout().clone(); + let layout = layout.revert_staged_changes()?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) @@ -280,7 +378,7 @@ type UpdateClusterLayoutRequest = Vec<NodeRoleChange>; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct ApplyRevertLayoutRequest { +struct ApplyLayoutRequest { version: u64, } diff --git a/src/api/common_error.rs b/src/api/common_error.rs index 4381f227..c47555d4 100644 --- a/src/api/common_error.rs +++ b/src/api/common_error.rs @@ -59,9 +59,7 @@ impl CommonError { pub fn http_status_code(&self) -> StatusCode { match self { CommonError::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..), ) => StatusCode::SERVICE_UNAVAILABLE, CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { StatusCode::INTERNAL_SERVER_ERROR @@ -80,9 +78,7 @@ impl CommonError { match self { CommonError::Forbidden(_) => "AccessDenied", CommonError::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..), ) => "ServiceUnavailable", CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { "InternalError" diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 822bec44..e3397238 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,9 +1,6 @@ -use std::sync::Arc; - use hyper::Response; use serde::Serialize; -use garage_rpc::ring::Ring; use garage_table::util::*; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; @@ -27,7 +24,11 @@ pub async fn handle_read_index( let reverse = reverse.unwrap_or(false); - let ring: Arc<Ring> = garage.system.ring.borrow().clone(); + let node_id_vec = garage + .system + .cluster_layout() + .all_nongateway_nodes() + .to_vec(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, @@ -36,7 +37,7 @@ pub async fn handle_read_index( &start, &end, limit, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, node_id_vec)), EnumerationOrder::from_reverse(reverse), ) .await?; @@ -55,7 +56,7 @@ pub async fn handle_read_index( partition_keys: partition_keys .into_iter() .map(|part| { - let vals = part.filtered_values(&ring); + let vals = part.filtered_values(&garage.system.cluster_layout()); ReadIndexResponseEntry { pk: part.sk, entries: *vals.get(&s_entries).unwrap_or(&0), diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 1ed30996..1737af33 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -325,7 +325,7 @@ impl ApiHandler for S3ApiServer { part_number_marker: part_number_marker.map(|p| p.min(10000)), max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), }; - handle_list_parts(ctx, &query).await + handle_list_parts(ctx, req, &query).await } Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs new file mode 100644 index 00000000..c9dc001c --- /dev/null +++ b/src/api/s3/checksum.rs @@ -0,0 +1,406 @@ +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; + +use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; + +use http::{HeaderMap, HeaderName, HeaderValue}; + +use garage_util::data::*; +use garage_util::error::OkOrMessage; + +use garage_model::s3::object_table::*; + +use crate::s3::error::*; + +pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = + HeaderName::from_static("x-amz-checksum-algorithm"); +pub const X_AMZ_CHECKSUM_MODE: HeaderName = HeaderName::from_static("x-amz-checksum-mode"); +pub const X_AMZ_CHECKSUM_CRC32: HeaderName = HeaderName::from_static("x-amz-checksum-crc32"); +pub const X_AMZ_CHECKSUM_CRC32C: HeaderName = HeaderName::from_static("x-amz-checksum-crc32c"); +pub const X_AMZ_CHECKSUM_SHA1: HeaderName = HeaderName::from_static("x-amz-checksum-sha1"); +pub const X_AMZ_CHECKSUM_SHA256: HeaderName = HeaderName::from_static("x-amz-checksum-sha256"); + +pub type Crc32Checksum = [u8; 4]; +pub type Crc32cChecksum = [u8; 4]; +pub type Md5Checksum = [u8; 16]; +pub type Sha1Checksum = [u8; 20]; +pub type Sha256Checksum = [u8; 32]; + +#[derive(Debug, Default)] +pub(crate) struct ExpectedChecksums { + // base64-encoded md5 (content-md5 header) + pub md5: Option<String>, + // content_sha256 (as a Hash / FixedBytes32) + pub sha256: Option<Hash>, + // extra x-amz-checksum-* header + pub extra: Option<ChecksumValue>, +} + +pub(crate) struct Checksummer { + pub crc32: Option<Crc32>, + pub crc32c: Option<Crc32c>, + pub md5: Option<Md5>, + pub sha1: Option<Sha1>, + pub sha256: Option<Sha256>, +} + +#[derive(Default)] +pub(crate) struct Checksums { + pub crc32: Option<Crc32Checksum>, + pub crc32c: Option<Crc32cChecksum>, + pub md5: Option<Md5Checksum>, + pub sha1: Option<Sha1Checksum>, + pub sha256: Option<Sha256Checksum>, +} + +impl Checksummer { + pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + let mut ret = Self { + crc32: None, + crc32c: None, + md5: None, + sha1: None, + sha256: None, + }; + + if expected.md5.is_some() || require_md5 { + ret.md5 = Some(Md5::new()); + } + if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { + ret.sha256 = Some(Sha256::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { + ret.crc32 = Some(Crc32::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { + ret.crc32c = Some(Crc32c::default()); + } + if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { + ret.sha1 = Some(Sha1::new()); + } + ret + } + + pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { + match algo { + Some(ChecksumAlgorithm::Crc32) => { + self.crc32 = Some(Crc32::new()); + } + Some(ChecksumAlgorithm::Crc32c) => { + self.crc32c = Some(Crc32c::default()); + } + Some(ChecksumAlgorithm::Sha1) => { + self.sha1 = Some(Sha1::new()); + } + Some(ChecksumAlgorithm::Sha256) => { + self.sha256 = Some(Sha256::new()); + } + None => (), + } + self + } + + pub(crate) fn update(&mut self, bytes: &[u8]) { + if let Some(crc32) = &mut self.crc32 { + crc32.update(bytes); + } + if let Some(crc32c) = &mut self.crc32c { + crc32c.write(bytes); + } + if let Some(md5) = &mut self.md5 { + md5.update(bytes); + } + if let Some(sha1) = &mut self.sha1 { + sha1.update(bytes); + } + if let Some(sha256) = &mut self.sha256 { + sha256.update(bytes); + } + } + + pub(crate) fn finalize(self) -> Checksums { + Checksums { + crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())), + crc32c: self + .crc32c + .map(|x| u32::to_be_bytes(u32::try_from(x.finish()).unwrap())), + md5: self.md5.map(|x| x.finalize()[..].try_into().unwrap()), + sha1: self.sha1.map(|x| x.finalize()[..].try_into().unwrap()), + sha256: self.sha256.map(|x| x.finalize()[..].try_into().unwrap()), + } + } +} + +impl Checksums { + pub fn verify(&self, expected: &ExpectedChecksums) -> Result<(), Error> { + if let Some(expected_md5) = &expected.md5 { + match self.md5 { + Some(md5) if BASE64_STANDARD.encode(&md5) == expected_md5.trim_matches('"') => (), + _ => { + return Err(Error::InvalidDigest( + "MD5 checksum verification failed (from content-md5)".into(), + )) + } + } + } + if let Some(expected_sha256) = &expected.sha256 { + match self.sha256 { + Some(sha256) if &sha256[..] == expected_sha256.as_slice() => (), + _ => { + return Err(Error::InvalidDigest( + "SHA256 checksum verification failed (from x-amz-content-sha256)".into(), + )) + } + } + } + if let Some(extra) = expected.extra { + let algo = extra.algorithm(); + if self.extract(Some(algo)) != Some(extra) { + return Err(Error::InvalidDigest(format!( + "Failed to validate checksum for algorithm {:?}", + algo + ))); + } + } + Ok(()) + } + + pub fn extract(&self, algo: Option<ChecksumAlgorithm>) -> Option<ChecksumValue> { + match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => Some(ChecksumValue::Crc32(self.crc32.unwrap())), + Some(ChecksumAlgorithm::Crc32c) => Some(ChecksumValue::Crc32c(self.crc32c.unwrap())), + Some(ChecksumAlgorithm::Sha1) => Some(ChecksumValue::Sha1(self.sha1.unwrap())), + Some(ChecksumAlgorithm::Sha256) => Some(ChecksumValue::Sha256(self.sha256.unwrap())), + } + } +} + +// ---- + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} + +// ---- + +/// Extract the value of the x-amz-checksum-algorithm header +pub(crate) fn request_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + None => Ok(None), + Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +/// Extract the value of any of the x-amz-checksum-* headers +pub(crate) fn request_checksum_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + let mut ret = vec![]; + + if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { + let crc32 = BASE64_STANDARD + .decode(&crc32_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + ret.push(ChecksumValue::Crc32(crc32)) + } + if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { + let crc32c = BASE64_STANDARD + .decode(&crc32c_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + ret.push(ChecksumValue::Crc32c(crc32c)) + } + if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { + let sha1 = BASE64_STANDARD + .decode(&sha1_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + ret.push(ChecksumValue::Sha1(sha1)) + } + if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { + let sha256 = BASE64_STANDARD + .decode(&sha256_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + ret.push(ChecksumValue::Sha256(sha256)) + } + + if ret.len() > 1 { + return Err(Error::bad_request( + "multiple x-amz-checksum-* headers given", + )); + } + Ok(ret.pop()) +} + +/// Checks for the presense of x-amz-checksum-algorithm +/// if so extract the corrseponding x-amz-checksum-* value +pub(crate) fn request_checksum_algorithm_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + Some(x) if x == "CRC32" => { + let crc32 = headers + .get(X_AMZ_CHECKSUM_CRC32) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + Ok(Some(ChecksumValue::Crc32(crc32))) + } + Some(x) if x == "CRC32C" => { + let crc32c = headers + .get(X_AMZ_CHECKSUM_CRC32C) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + Ok(Some(ChecksumValue::Crc32c(crc32c))) + } + Some(x) if x == "SHA1" => { + let sha1 = headers + .get(X_AMZ_CHECKSUM_SHA1) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + Ok(Some(ChecksumValue::Sha1(sha1))) + } + Some(x) if x == "SHA256" => { + let sha256 = headers + .get(X_AMZ_CHECKSUM_SHA256) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + Ok(Some(ChecksumValue::Sha256(sha256))) + } + Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), + None => Ok(None), + } +} + +pub(crate) fn add_checksum_response_headers( + checksum: &Option<ChecksumValue>, + mut resp: http::response::Builder, +) -> http::response::Builder { + match checksum { + Some(ChecksumValue::Crc32(crc32)) => { + resp = resp.header(X_AMZ_CHECKSUM_CRC32, BASE64_STANDARD.encode(&crc32)); + } + Some(ChecksumValue::Crc32c(crc32c)) => { + resp = resp.header(X_AMZ_CHECKSUM_CRC32C, BASE64_STANDARD.encode(&crc32c)); + } + Some(ChecksumValue::Sha1(sha1)) => { + resp = resp.header(X_AMZ_CHECKSUM_SHA1, BASE64_STANDARD.encode(&sha1)); + } + Some(ChecksumValue::Sha256(sha256)) => { + resp = resp.header(X_AMZ_CHECKSUM_SHA256, BASE64_STANDARD.encode(&sha256)); + } + None => (), + } + resp +} diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 3c2bd483..411a6917 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -1,17 +1,18 @@ use std::pin::Pin; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{stream, stream::Stream, StreamExt}; -use md5::{Digest as Md5Digest, Md5}; +use futures::{stream, stream::Stream, StreamExt, TryStreamExt}; use bytes::Bytes; use hyper::{Request, Response}; use serde::Serialize; use garage_net::bytes_buf::BytesBuf; +use garage_net::stream::read_stream_to_end; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; +use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::s3::block_ref_table::*; @@ -21,11 +22,16 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; +use crate::s3::get::full_object_byte_stream; use crate::s3::multipart; -use crate::s3::put::get_headers; +use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; use crate::s3::xml::{self as s3_xml, xmlns_tag}; +// -------- CopyObject --------- + pub async fn handle_copy( ctx: ReqCtx, req: &Request<ReqBody>, @@ -33,13 +39,9 @@ pub async fn handle_copy( ) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&ctx, req).await?; + let checksum_algorithm = request_checksum_algorithm(req.headers())?; - let ReqCtx { - garage, - bucket_id: dest_bucket_id, - .. - } = ctx; + let source_object = get_copy_source(&ctx, req).await?; let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -47,26 +49,150 @@ pub async fn handle_copy( // Check precondition, e.g. x-amz-copy-source-if-match copy_precondition.check(source_version, &source_version_meta.etag)?; + // Determine encryption parameters + let (source_encryption, source_object_meta_inner) = + EncryptionParams::check_decrypt_for_copy_source( + &ctx.garage, + req.headers(), + &source_version_meta.encryption, + )?; + let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + + // Extract source checksum info before source_object_meta_inner is consumed + let source_checksum = source_object_meta_inner.checksum; + let source_checksum_algorithm = source_checksum.map(|x| x.algorithm()); + + // If source object has a checksum, the destination object must as well. + // The x-amz-checksum-algorihtm header allows to change that algorithm, + // but if it is absent, we must use the same as before + let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm); + + // Determine metadata of destination object + let was_multipart = source_version_meta.etag.contains('-'); + let dest_object_meta = ObjectVersionMetaInner { + headers: match req.headers().get("x-amz-metadata-directive") { + Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => { + get_headers(req.headers())? + } + _ => source_object_meta_inner.into_owned().headers, + }, + checksum: source_checksum, + }; + + // Do actual object copying + // + // In any of the following scenarios, we need to read the whole object + // data and re-write it again: + // + // - the data needs to be decrypted or encrypted + // - the requested checksum algorithm requires us to recompute a checksum + // - the original object was a multipart upload and a checksum algorithm + // is defined (AWS specifies that in this case, we must recompute the + // checksum from scratch as if this was a single big object and not + // a multipart object, as the checksums are not computed in the same way) + // + // In other cases, we can just copy the metadata and reference the same blocks. + // + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + + let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption) + || source_checksum_algorithm != checksum_algorithm + || (was_multipart && checksum_algorithm.is_some()); + + let res = if !must_recopy { + // In most cases, we can just copy the metadata and link blocks of the + // old object from the new object. + handle_copy_metaonly( + ctx, + dest_key, + dest_object_meta, + dest_encryption, + source_version, + source_version_data, + source_version_meta, + ) + .await? + } else { + let expected_checksum = ExpectedChecksums { + md5: None, + sha256: None, + extra: source_checksum, + }; + let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm { + ChecksumMode::Calculate(checksum_algorithm) + } else { + ChecksumMode::Verify(&expected_checksum) + }; + // If source and dest encryption use different keys, + // we must decrypt content and re-encrypt, so rewrite all data blocks. + handle_copy_reencrypt( + ctx, + dest_key, + dest_object_meta, + dest_encryption, + source_version, + source_version_data, + source_encryption, + checksum_mode, + ) + .await? + }; + + let last_modified = msec_to_rfc3339(res.version_timestamp); + let result = CopyObjectResult { + last_modified: s3_xml::Value(last_modified), + etag: s3_xml::Value(format!("\"{}\"", res.etag)), + }; + let xml = s3_xml::to_xml_with_header(&result)?; + + let mut resp = Response::builder() + .header("Content-Type", "application/xml") + .header("x-amz-version-id", hex::encode(res.version_uuid)) + .header( + "x-amz-copy-source-version-id", + hex::encode(source_version.uuid), + ); + dest_encryption.add_response_headers(&mut resp); + Ok(resp.body(string_body(xml))?) +} + +async fn handle_copy_metaonly( + ctx: ReqCtx, + dest_key: &str, + dest_object_meta: ObjectVersionMetaInner, + dest_encryption: EncryptionParams, + source_version: &ObjectVersion, + source_version_data: &ObjectVersionData, + source_version_meta: &ObjectVersionMeta, +) -> Result<SaveStreamResult, Error> { + let ReqCtx { + garage, + bucket_id: dest_bucket_id, + .. + } = ctx; + // Generate parameters for copied object let new_uuid = gen_uuid(); let new_timestamp = now_msec(); - // Implement x-amz-metadata-directive: REPLACE - let new_meta = match req.headers().get("x-amz-metadata-directive") { - Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { - headers: get_headers(req.headers())?, - size: source_version_meta.size, - etag: source_version_meta.etag.clone(), - }, - _ => source_version_meta.clone(), + let new_meta = ObjectVersionMeta { + encryption: dest_encryption.encrypt_meta(dest_object_meta)?, + size: source_version_meta.size, + etag: source_version_meta.etag.clone(), }; - let etag = new_meta.etag.to_string(); + let res = SaveStreamResult { + version_uuid: new_uuid, + version_timestamp: new_timestamp, + etag: new_meta.etag.clone(), + }; // Save object copy match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { + // bytes is either plaintext before&after or encrypted with the + // same keys, so it's ok to just copy it as is let dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, @@ -97,7 +223,8 @@ pub async fn handle_copy( uuid: new_uuid, timestamp: new_timestamp, state: ObjectVersionState::Uploading { - headers: new_meta.headers.clone(), + encryption: new_meta.encryption.clone(), + checksum_algorithm: None, multipart: false, }, }; @@ -164,23 +291,42 @@ pub async fn handle_copy( } } - let last_modified = msec_to_rfc3339(new_timestamp); - let result = CopyObjectResult { - last_modified: s3_xml::Value(last_modified), - etag: s3_xml::Value(format!("\"{}\"", etag)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; + Ok(res) +} - Ok(Response::builder() - .header("Content-Type", "application/xml") - .header("x-amz-version-id", hex::encode(new_uuid)) - .header( - "x-amz-copy-source-version-id", - hex::encode(source_version.uuid), - ) - .body(string_body(xml))?) +async fn handle_copy_reencrypt( + ctx: ReqCtx, + dest_key: &str, + dest_object_meta: ObjectVersionMetaInner, + dest_encryption: EncryptionParams, + source_version: &ObjectVersion, + source_version_data: &ObjectVersionData, + source_encryption: EncryptionParams, + checksum_mode: ChecksumMode<'_>, +) -> Result<SaveStreamResult, Error> { + // basically we will read the source data (decrypt if necessary) + // and save that in a new object (encrypt if necessary), + // by combining the code used in getobject and putobject + let source_stream = full_object_byte_stream( + ctx.garage.clone(), + source_version, + source_version_data, + source_encryption, + ); + + save_stream( + &ctx, + dest_object_meta, + dest_encryption, + source_stream.map_err(|e| Error::from(GarageError::from(e))), + &dest_key.to_string(), + checksum_mode, + ) + .await } +// -------- UploadPartCopy --------- + pub async fn handle_upload_part_copy( ctx: ReqCtx, req: &Request<ReqBody>, @@ -193,7 +339,7 @@ pub async fn handle_upload_part_copy( let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( + let (source_object, (_, dest_version, mut dest_mpu)) = futures::try_join!( get_copy_source(&ctx, req), multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; @@ -206,6 +352,24 @@ pub async fn handle_upload_part_copy( // Check precondition on source, e.g. x-amz-copy-source-if-match copy_precondition.check(source_object_version, &source_version_meta.etag)?; + // Determine encryption parameters + let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source( + &garage, + req.headers(), + &source_version_meta.encryption, + )?; + let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), + _ => unreachable!(), + }; + let (dest_encryption, _) = + EncryptionParams::check_decrypt(&garage, req.headers(), &dest_object_encryption)?; + let same_encryption = EncryptionParams::is_same(&source_encryption, &dest_encryption); + // Check source range is valid let source_range = match req.headers().get("x-amz-copy-source-range") { Some(range) => { @@ -227,21 +391,16 @@ pub async fn handle_upload_part_copy( }; // Check source version is not inlined - match source_version_data { - ObjectVersionData::DeleteMarker => unreachable!(), - ObjectVersionData::Inline(_meta, _bytes) => { - // This is only for small files, we don't bother handling this. - // (in AWS UploadPartCopy works for parts at least 5MB which - // is never the case of an inline object) - return Err(Error::bad_request( - "Source object is too small (minimum part size is 5Mb)", - )); - } - ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), - }; + if matches!(source_version_data, ObjectVersionData::Inline(_, _)) { + // This is only for small files, we don't bother handling this. + // (in AWS UploadPartCopy works for parts at least 5MB which + // is never the case of an inline object) + return Err(Error::bad_request( + "Source object is too small (minimum part size is 5Mb)", + )); + } - // Fetch source versin with its block list, - // and destination version to check part hasn't yet been uploaded + // Fetch source version with its block list let source_version = garage .version_table .get(&source_object_version.uuid, &EmptyKey) @@ -251,7 +410,9 @@ pub async fn handle_upload_part_copy( // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks // because we need to know it to calculate the MD5sum of the part - // which is used as its ETag. + // which is used as its ETag. For encrypted sources or destinations, + // we must always read(+decrypt) and then write(+encrypt), so we + // can never reuse data blocks as is. // First, calculate what blocks we want to keep, // and the subrange of the block to take, if the bounds of the @@ -300,7 +461,9 @@ pub async fn handle_upload_part_copy( dest_mpu_part_key, MpuPart { version: dest_version_id, + // These are all filled in later (bottom of this function) etag: None, + checksum: None, size: None, }, ); @@ -313,32 +476,55 @@ pub async fn handle_upload_part_copy( }, false, ); + // write an empty version now to be the parent of the block_ref entries + garage.version_table.insert(&dest_version).await?; // Now, actually copy the blocks - let mut md5hasher = Md5::new(); + let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted()) + .add(dest_object_checksum_algorithm); // First, create a stream that is able to read the source blocks // and extract the subrange if necessary. // The second returned value is an Option<Hash>, that is Some // if and only if the block returned is a block that already existed - // in the Garage data store (thus we don't need to save it again). + // in the Garage data store and can be reused as-is instead of having + // to save it again. This excludes encrypted source blocks that we had + // to decrypt. let garage2 = garage.clone(); let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) .enumerate() - .flat_map(|(i, (block_hash, range_to_copy))| { + .map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); - stream::once(async move { - let data = garage3 - .block_manager - .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + async move { + let stream = source_encryption + .get_block(&garage3, &block_hash, Some(order_stream.order(i as u64))) .await?; + let data = read_stream_to_end(stream).await?.into_bytes(); + // For each item, we return a tuple of: + // 1. the full data block (decrypted) + // 2. an Option<Hash> that indicates the hash of the block in the block store, + // only if it can be re-used as-is in the copied object match range_to_copy { - Some(r) => Ok((data.slice(r), None)), - None => Ok((data, Some(block_hash))), + Some(r) => { + // If we are taking a subslice of the data, we cannot reuse the block as-is + Ok((data.slice(r), None)) + } + None if same_encryption => { + // If the data is unencrypted before & after, or if we are using + // the same encryption key, we can reuse the stored block, no need + // to re-send it to storage nodes. + Ok((data, Some(block_hash))) + } + None => { + // If we are decrypting / (re)encrypting with different keys, + // we cannot reuse the block as-is + Ok((data, None)) + } } - }) + } }) + .buffered(2) .peekable(); // The defragmenter is a custom stream (defined below) that concatenates @@ -346,22 +532,39 @@ pub async fn handle_upload_part_copy( // It returns a series of (Vec<u8>, Option<Hash>). // When it is done, it returns an empty vec. // Same as the previous iterator, the Option is Some(_) if and only if - // it's an existing block of the Garage data store. + // it's an existing block of the Garage data store that can be reused. let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); let mut current_offset = 0; let mut next_block = defragmenter.next().await?; + // TODO this could be optimized similarly to read_and_put_blocks + // low priority because uploadpartcopy is rarely used loop { let (data, existing_block_hash) = next_block; if data.is_empty() { break; } - md5hasher.update(&data[..]); - - let must_upload = existing_block_hash.is_none(); - let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); + let data_len = data.len() as u64; + + let (checksummer_updated, (data_to_upload, final_hash)) = + tokio::task::spawn_blocking(move || { + checksummer.update(&data[..]); + + let tup = match existing_block_hash { + Some(hash) if same_encryption => (None, hash), + _ => { + let data_enc = dest_encryption.encrypt_block(data)?; + let hash = blake2sum(&data_enc); + (Some(data_enc), hash) + } + }; + Ok::<_, Error>((checksummer, tup)) + }) + .await + .unwrap()?; + checksummer = checksummer_updated; dest_version.blocks.clear(); dest_version.blocks.put( @@ -371,10 +574,10 @@ pub async fn handle_upload_part_copy( }, VersionBlock { hash: final_hash, - size: data.len() as u64, + size: data_len, }, ); - current_offset += data.len() as u64; + current_offset += data_len; let block_ref = BlockRef { block: final_hash, @@ -382,36 +585,34 @@ pub async fn handle_upload_part_copy( deleted: false.into(), }; - let garage2 = garage.clone(); - let res = futures::try_join!( + let (_, _, _, next) = futures::try_join!( // Thing 1: if the block is not exactly a block that existed before, // we need to insert that data as a new block. - async move { - if must_upload { - garage2 + async { + if let Some(final_data) = data_to_upload { + garage .block_manager - .rpc_put_block(final_hash, data, None) + .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None) .await } else { Ok(()) } }, - async { - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&dest_version).await?; - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref).await - }, - // Thing 4: we need to prefetch the next block + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&dest_version), + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref), + // Thing 4: we need to read the next block defragmenter.next(), )?; - next_block = res.2; + next_block = next; } assert_eq!(current_offset, source_range.length); - let data_md5sum = md5hasher.finalize(); - let etag = hex::encode(data_md5sum); + let checksums = checksummer.finalize(); + let etag = dest_encryption.etag_from_md5(&checksums.md5); + let checksum = checksums.extract(dest_object_checksum_algorithm); // Put the part's ETag in the Versiontable dest_mpu.parts.put( @@ -419,6 +620,7 @@ pub async fn handle_upload_part_copy( MpuPart { version: dest_version_id, etag: Some(etag.clone()), + checksum, size: Some(current_offset), }, ); @@ -431,13 +633,14 @@ pub async fn handle_upload_part_copy( last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)), })?; - Ok(Response::builder() + let mut resp = Response::builder() .header("Content-Type", "application/xml") .header( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), - ) - .body(string_body(resp_xml))?) + ); + dest_encryption.add_response_headers(&mut resp); + Ok(resp.body(string_body(resp_xml))?) } async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> { diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs new file mode 100644 index 00000000..2e6ed65c --- /dev/null +++ b/src/api/s3/encryption.rs @@ -0,0 +1,595 @@ +use std::borrow::Cow; +use std::convert::TryInto; +use std::pin::Pin; + +use aes_gcm::{ + aead::stream::{DecryptorLE31, EncryptorLE31, StreamLE31}, + aead::{Aead, AeadCore, KeyInit, OsRng}, + aes::cipher::crypto_common::rand_core::RngCore, + aes::cipher::typenum::Unsigned, + Aes256Gcm, Key, Nonce, +}; +use base64::prelude::*; +use bytes::Bytes; + +use futures::stream::Stream; +use futures::task; +use tokio::io::BufReader; + +use http::header::{HeaderMap, HeaderName, HeaderValue}; + +use garage_net::bytes_buf::BytesBuf; +use garage_net::stream::{stream_asyncread, ByteStream}; +use garage_rpc::rpc_helper::OrderTag; +use garage_util::data::Hash; +use garage_util::error::Error as GarageError; +use garage_util::migrate::Migrate; + +use garage_model::garage::Garage; +use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; + +use crate::common_error::*; +use crate::s3::checksum::Md5Checksum; +use crate::s3::error::Error; + +const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = + HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm"); +const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = + HeaderName::from_static("x-amz-server-side-encryption-customer-key"); +const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = + HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5"); + +const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = + HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm"); +const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = + HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key"); +const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = + HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5"); + +const CUSTOMER_ALGORITHM_AES256: &[u8] = b"AES256"; + +type Md5Output = md5::digest::Output<md5::Md5Core>; + +type StreamNonceSize = aes_gcm::aead::stream::NonceSize<Aes256Gcm, StreamLE31<Aes256Gcm>>; + +// Data blocks are encrypted by smaller chunks of size 4096 bytes, +// so that data can be streamed when reading. +// This size has to be known and has to be constant, or data won't be +// readable anymore. DO NOT CHANGE THIS VALUE. +const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes +const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16; + +#[derive(Clone, Copy)] +pub enum EncryptionParams { + Plaintext, + SseC { + client_key: Key<Aes256Gcm>, + client_key_md5: Md5Output, + compression_level: Option<i32>, + }, +} + +impl EncryptionParams { + pub fn is_encrypted(&self) -> bool { + !matches!(self, Self::Plaintext) + } + + pub fn is_same(a: &Self, b: &Self) -> bool { + let relevant_info = |x: &Self| match x { + Self::Plaintext => None, + Self::SseC { + client_key, + compression_level, + .. + } => Some((*client_key, compression_level.is_some())), + }; + relevant_info(a) == relevant_info(b) + } + + pub fn new_from_headers( + garage: &Garage, + headers: &HeaderMap, + ) -> Result<EncryptionParams, Error> { + let key = parse_request_headers( + headers, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, + )?; + match key { + Some((client_key, client_key_md5)) => Ok(EncryptionParams::SseC { + client_key, + client_key_md5, + compression_level: garage.config.compression_level, + }), + None => Ok(EncryptionParams::Plaintext), + } + } + + pub fn add_response_headers(&self, resp: &mut http::response::Builder) { + if let Self::SseC { client_key_md5, .. } = self { + let md5 = BASE64_STANDARD.encode(&client_key_md5); + + resp.headers_mut().unwrap().insert( + X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, + HeaderValue::from_bytes(CUSTOMER_ALGORITHM_AES256).unwrap(), + ); + resp.headers_mut().unwrap().insert( + X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, + HeaderValue::from_bytes(md5.as_bytes()).unwrap(), + ); + } + } + + pub fn check_decrypt<'a>( + garage: &Garage, + headers: &HeaderMap, + obj_enc: &'a ObjectVersionEncryption, + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { + let key = parse_request_headers( + headers, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, + &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, + )?; + Self::check_decrypt_common(garage, key, obj_enc) + } + + pub fn check_decrypt_for_copy_source<'a>( + garage: &Garage, + headers: &HeaderMap, + obj_enc: &'a ObjectVersionEncryption, + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { + let key = parse_request_headers( + headers, + &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, + &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, + &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, + )?; + Self::check_decrypt_common(garage, key, obj_enc) + } + + fn check_decrypt_common<'a>( + garage: &Garage, + key: Option<(Key<Aes256Gcm>, Md5Output)>, + obj_enc: &'a ObjectVersionEncryption, + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { + match (key, &obj_enc) { + ( + Some((client_key, client_key_md5)), + ObjectVersionEncryption::SseC { inner, compressed }, + ) => { + let enc = Self::SseC { + client_key, + client_key_md5, + compression_level: if *compressed { + Some(garage.config.compression_level.unwrap_or(1)) + } else { + None + }, + }; + let plaintext = enc.decrypt_blob(&inner)?; + let inner = ObjectVersionMetaInner::decode(&plaintext) + .ok_or_internal_error("Could not decode encrypted metadata")?; + Ok((enc, Cow::Owned(inner))) + } + (None, ObjectVersionEncryption::Plaintext { inner }) => { + Ok((Self::Plaintext, Cow::Borrowed(inner))) + } + (_, ObjectVersionEncryption::SseC { .. }) => { + Err(Error::bad_request("Object is encrypted")) + } + (Some(_), _) => { + // TODO: should this be an OK scenario? + Err(Error::bad_request("Trying to decrypt a plaintext object")) + } + } + } + + pub fn encrypt_meta( + &self, + meta: ObjectVersionMetaInner, + ) -> Result<ObjectVersionEncryption, Error> { + match self { + Self::SseC { + compression_level, .. + } => { + let plaintext = meta.encode().map_err(GarageError::from)?; + let ciphertext = self.encrypt_blob(&plaintext)?; + Ok(ObjectVersionEncryption::SseC { + inner: ciphertext.into_owned(), + compressed: compression_level.is_some(), + }) + } + Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { inner: meta }), + } + } + + // ---- generating object Etag values ---- + pub fn etag_from_md5(&self, md5sum: &Option<Md5Checksum>) -> String { + match self { + Self::Plaintext => md5sum + .map(|x| hex::encode(&x[..])) + .expect("md5 digest should have been computed"), + Self::SseC { .. } => { + // AWS specifies that for encrypted objects, the Etag is not + // the md5sum of the data, but doesn't say what it is. + // So we just put some random bytes. + let mut random = [0u8; 16]; + OsRng.fill_bytes(&mut random); + hex::encode(&random) + } + } + } + + // ---- generic function for encrypting / decrypting blobs ---- + // Prepends a randomly-generated nonce to the encrypted value. + // This is used for encrypting object metadata and inlined data for small objects. + // This does not compress anything. + + pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> { + match self { + Self::SseC { client_key, .. } => { + let cipher = Aes256Gcm::new(&client_key); + let nonce = Aes256Gcm::generate_nonce(&mut OsRng); + let ciphertext = cipher + .encrypt(&nonce, blob) + .ok_or_internal_error("Encryption failed")?; + Ok(Cow::Owned([nonce.to_vec(), ciphertext].concat())) + } + Self::Plaintext => Ok(Cow::Borrowed(blob)), + } + } + + pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> { + match self { + Self::SseC { client_key, .. } => { + let cipher = Aes256Gcm::new(&client_key); + let nonce_size = <Aes256Gcm as AeadCore>::NonceSize::to_usize(); + let nonce = Nonce::from_slice( + blob.get(..nonce_size) + .ok_or_internal_error("invalid encrypted data")?, + ); + let plaintext = cipher + .decrypt(nonce, &blob[nonce_size..]) + .ok_or_bad_request( + "Invalid encryption key, could not decrypt object metadata.", + )?; + Ok(Cow::Owned(plaintext)) + } + Self::Plaintext => Ok(Cow::Borrowed(blob)), + } + } + + // ---- function for encrypting / decrypting byte streams ---- + + /// Get a data block from the storage node, and decrypt+decompress it + /// if necessary. If object is plaintext, just get it without any processing. + pub async fn get_block( + &self, + garage: &Garage, + hash: &Hash, + order: Option<OrderTag>, + ) -> Result<ByteStream, GarageError> { + let raw_block = garage + .block_manager + .rpc_get_block_streaming(hash, order) + .await?; + Ok(self.decrypt_block_stream(raw_block)) + } + + pub fn decrypt_block_stream(&self, stream: ByteStream) -> ByteStream { + match self { + Self::Plaintext => stream, + Self::SseC { + client_key, + compression_level, + .. + } => { + let plaintext = DecryptStream::new(stream, *client_key); + if compression_level.is_some() { + let reader = stream_asyncread(Box::pin(plaintext)); + let reader = BufReader::new(reader); + let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); + Box::pin(tokio_util::io::ReaderStream::new(reader)) + } else { + Box::pin(plaintext) + } + } + } + } + + /// Encrypt a data block if encryption is set, for use before + /// putting the data blocks into storage + pub fn encrypt_block(&self, block: Bytes) -> Result<Bytes, Error> { + match self { + Self::Plaintext => Ok(block), + Self::SseC { + client_key, + compression_level, + .. + } => { + let block = if let Some(level) = compression_level { + Cow::Owned( + garage_block::zstd_encode(block.as_ref(), *level) + .ok_or_internal_error("failed to compress data block")?, + ) + } else { + Cow::Borrowed(block.as_ref()) + }; + + let mut ret = Vec::with_capacity(block.len() + 32 + block.len() / 64); + + let mut nonce: Nonce<StreamNonceSize> = Default::default(); + OsRng.fill_bytes(&mut nonce); + ret.extend_from_slice(nonce.as_slice()); + + let mut cipher = EncryptorLE31::<Aes256Gcm>::new(&client_key, &nonce); + let mut iter = block.chunks(STREAM_ENC_PLAIN_CHUNK_SIZE).peekable(); + + if iter.peek().is_none() { + // Empty stream: we encrypt an empty last chunk + let chunk_enc = cipher + .encrypt_last(&[][..]) + .ok_or_internal_error("failed to encrypt chunk")?; + ret.extend_from_slice(&chunk_enc); + } else { + loop { + let chunk = iter.next().unwrap(); + if iter.peek().is_some() { + let chunk_enc = cipher + .encrypt_next(chunk) + .ok_or_internal_error("failed to encrypt chunk")?; + assert_eq!(chunk.len(), STREAM_ENC_PLAIN_CHUNK_SIZE); + assert_eq!(chunk_enc.len(), STREAM_ENC_CYPER_CHUNK_SIZE); + ret.extend_from_slice(&chunk_enc); + } else { + // use encrypt_last for the last chunk + let chunk_enc = cipher + .encrypt_last(chunk) + .ok_or_internal_error("failed to encrypt chunk")?; + ret.extend_from_slice(&chunk_enc); + break; + } + } + } + + Ok(ret.into()) + } + } + } +} + +fn parse_request_headers( + headers: &HeaderMap, + alg_header: &HeaderName, + key_header: &HeaderName, + md5_header: &HeaderName, +) -> Result<Option<(Key<Aes256Gcm>, Md5Output)>, Error> { + let alg = headers.get(alg_header).map(HeaderValue::as_bytes); + let key = headers.get(key_header).map(HeaderValue::as_bytes); + let md5 = headers.get(md5_header).map(HeaderValue::as_bytes); + + match alg { + Some(CUSTOMER_ALGORITHM_AES256) => { + use md5::{Digest, Md5}; + + let key_b64 = + key.ok_or_bad_request("Missing server-side-encryption-customer-key header")?; + let key_bytes: [u8; 32] = BASE64_STANDARD + .decode(&key_b64) + .ok_or_bad_request( + "Invalid server-side-encryption-customer-key header: invalid base64", + )? + .try_into() + .ok() + .ok_or_bad_request( + "Invalid server-side-encryption-customer-key header: invalid length", + )?; + + let md5_b64 = + md5.ok_or_bad_request("Missing server-side-encryption-customer-key-md5 header")?; + let md5_bytes = BASE64_STANDARD.decode(&md5_b64).ok_or_bad_request( + "Invalid server-side-encryption-customer-key-md5 header: invalid bass64", + )?; + + let mut hasher = Md5::new(); + hasher.update(&key_bytes[..]); + let our_md5 = hasher.finalize(); + if our_md5.as_slice() != md5_bytes.as_slice() { + return Err(Error::bad_request( + "Server-side encryption client key MD5 checksum does not match", + )); + } + + Ok(Some((key_bytes.into(), our_md5))) + } + Some(alg) => Err(Error::InvalidEncryptionAlgorithm( + String::from_utf8_lossy(alg).into_owned(), + )), + None => { + if key.is_some() || md5.is_some() { + Err(Error::bad_request( + "Unexpected server-side-encryption-customer-key{,-md5} header(s)", + )) + } else { + Ok(None) + } + } + } +} + +// ---- encrypt & decrypt streams ---- + +#[pin_project::pin_project] +struct DecryptStream { + #[pin] + stream: ByteStream, + done_reading: bool, + buf: BytesBuf, + key: Key<Aes256Gcm>, + state: DecryptStreamState, +} + +enum DecryptStreamState { + Starting, + Running(DecryptorLE31<Aes256Gcm>), + Done, +} + +impl DecryptStream { + fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self { + Self { + stream, + done_reading: false, + buf: BytesBuf::new(), + key, + state: DecryptStreamState::Starting, + } + } +} + +impl Stream for DecryptStream { + type Item = Result<Bytes, std::io::Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<Option<Self::Item>> { + use std::task::Poll; + + let mut this = self.project(); + + // The first bytes of the stream should contain the starting nonce. + // If we don't have a Running state, it means that we haven't + // yet read the nonce. + while matches!(this.state, DecryptStreamState::Starting) { + let nonce_size = StreamNonceSize::to_usize(); + if let Some(nonce) = this.buf.take_exact(nonce_size) { + let nonce = Nonce::from_slice(nonce.as_ref()); + *this.state = DecryptStreamState::Running(DecryptorLE31::new(&this.key, nonce)); + break; + } + + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Decrypt: unexpected EOF, could not read nonce", + )))); + } + } + } + + // Read at least one byte more than the encrypted chunk size + // (if possible), so that we know if we are decrypting the + // last chunk or not. + while !*this.done_reading && this.buf.len() <= STREAM_ENC_CYPER_CHUNK_SIZE { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + *this.done_reading = true; + break; + } + } + } + + if matches!(this.state, DecryptStreamState::Done) { + if !this.buf.is_empty() { + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Decrypt: unexpected bytes after last encrypted chunk", + )))); + } + return Poll::Ready(None); + } + + let res = if this.buf.len() > STREAM_ENC_CYPER_CHUNK_SIZE { + // we have strictly more bytes than the encrypted chunk size, + // so we know this is not the last + let DecryptStreamState::Running(ref mut cipher) = this.state else { + unreachable!() + }; + let chunk = this.buf.take_exact(STREAM_ENC_CYPER_CHUNK_SIZE).unwrap(); + let chunk_dec = cipher.decrypt_next(chunk.as_ref()); + if let Ok(c) = &chunk_dec { + assert_eq!(c.len(), STREAM_ENC_PLAIN_CHUNK_SIZE); + } + chunk_dec + } else { + // We have one encrypted chunk size or less, even though we tried + // to read more, so this is the last chunk. Decrypt using the + // appropriate decrypt_last() function that then destroys the cipher. + let state = std::mem::replace(this.state, DecryptStreamState::Done); + let DecryptStreamState::Running(cipher) = state else { + unreachable!() + }; + let chunk = this.buf.take_all(); + cipher.decrypt_last(chunk.as_ref()) + }; + + match res { + Ok(bytes) if bytes.is_empty() => Poll::Ready(None), + Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))), + Err(_) => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Decryption failed", + )))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::stream::StreamExt; + use garage_net::stream::read_stream_to_end; + + fn stream() -> ByteStream { + Box::pin( + futures::stream::iter(16usize..1024) + .map(|i| Ok(Bytes::from(vec![(i % 256) as u8; (i * 37) % 1024]))), + ) + } + + async fn test_block_enc(compression_level: Option<i32>) { + let enc = EncryptionParams::SseC { + client_key: Aes256Gcm::generate_key(&mut OsRng), + client_key_md5: Default::default(), // not needed + compression_level, + }; + + let block_plain = read_stream_to_end(stream()).await.unwrap().into_bytes(); + + let block_enc = enc.encrypt_block(block_plain.clone()).unwrap(); + + let block_dec = + enc.decrypt_block_stream(Box::pin(futures::stream::once(async { Ok(block_enc) }))); + let block_dec = read_stream_to_end(block_dec).await.unwrap().into_bytes(); + + assert_eq!(block_plain, block_dec); + assert!(block_dec.len() > 128000); + } + + #[tokio::test] + async fn test_encrypt_block() { + test_block_enc(None).await + } + + #[tokio::test] + async fn test_encrypt_block_compressed() { + test_block_enc(Some(1)).await + } +} diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index f86c19a6..2855e0b3 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -65,6 +65,14 @@ pub enum Error { #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), + /// The client sent a range header with invalid value + #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)] + InvalidEncryptionAlgorithm(String), + + /// The client sent invalid XML data + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), + /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), @@ -125,7 +133,9 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", + Error::InvalidDigest(_) => "InvalidDigest", Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", + Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError", } } } @@ -143,6 +153,8 @@ impl ApiError for Error { | Error::InvalidPart | Error::InvalidPartOrder | Error::EntityTooSmall + | Error::InvalidDigest(_) + | Error::InvalidEncryptionAlgorithm(_) | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ed996fb1..f5d3cf11 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -1,10 +1,12 @@ //! Function related to GET and HEAD requests +use std::collections::BTreeMap; use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; +use bytes::Bytes; use futures::future; -use futures::stream::{self, StreamExt}; +use futures::stream::{self, Stream, StreamExt}; use http::header::{ ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, @@ -25,6 +27,8 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -42,6 +46,9 @@ pub struct GetObjectOverrides { fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, + meta_inner: &ObjectVersionMetaInner, + encryption: EncryptionParams, + checksum_mode: ChecksumMode, ) -> http::response::Builder { debug!("Version meta: {:?}", version_meta); @@ -49,7 +56,6 @@ fn object_headers( let date_str = httpdate::fmt_http_date(date); let mut resp = Response::builder() - .header(CONTENT_TYPE, version_meta.headers.content_type.to_string()) .header(LAST_MODIFIED, date_str) .header(ACCEPT_RANGES, "bytes".to_string()); @@ -57,10 +63,31 @@ fn object_headers( resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag)); } - for (k, v) in version_meta.headers.other.iter() { - resp = resp.header(k, v.to_string()); + // When metadata is retrieved through the REST API, Amazon S3 combines headers that + // have the same name (ignoring case) into a comma-delimited list. + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html + let mut headers_by_name = BTreeMap::new(); + for (name, value) in meta_inner.headers.iter() { + match headers_by_name.get_mut(name) { + None => { + headers_by_name.insert(name, vec![value.as_str()]); + } + Some(headers) => { + headers.push(value.as_str()); + } + } + } + + for (name, values) in headers_by_name { + resp = resp.header(name, values.join(",")); + } + + if checksum_mode.enabled { + resp = add_checksum_response_headers(&meta_inner.checksum, resp); } + encryption.add_response_headers(&mut resp); + resp } @@ -175,21 +202,33 @@ pub async fn handle_head_without_ctx( return Ok(cached); } + let (encryption, headers) = + EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?; + + let checksum_mode = checksum_mode(&req); + if let Some(pn) = part_number { match version_data { - ObjectVersionData::Inline(_, bytes) => { + ObjectVersionData::Inline(_, _) => { if pn != 1 { return Err(Error::InvalidPart); } - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", bytes.len())) - .header( - CONTENT_RANGE, - format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()), - ) - .header(X_AMZ_MP_PARTS_COUNT, "1") - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?) + let bytes_len = version_meta.size; + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", bytes_len)) + .header( + CONTENT_RANGE, + format!("bytes 0-{}/{}", bytes_len - 1, bytes_len), + ) + .header(X_AMZ_MP_PARTS_COUNT, "1") + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -201,28 +240,40 @@ pub async fn handle_head_without_ctx( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) - .header( - CONTENT_RANGE, - format!( - "bytes {}-{}/{}", - part_offset, - part_end - 1, - version_meta.size - ), - ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?) + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) + .header( + CONTENT_RANGE, + format!( + "bytes {}-{}/{}", + part_offset, + part_end - 1, + version_meta.size + ), + ) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?) } _ => unreachable!(), } } else { - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", version_meta.size)) - .status(StatusCode::OK) - .body(empty_body())?) + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", version_meta.size)) + .status(StatusCode::OK) + .body(empty_body())?) } } @@ -273,23 +324,55 @@ pub async fn handle_get_without_ctx( return Ok(cached); } + let (enc, headers) = + EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?; + + let checksum_mode = checksum_mode(&req); + match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => { + handle_get_part( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + pn, + checksum_mode, + ) + .await + } (None, Some(range)) => { handle_get_range( garage, last_v, last_v_data, last_v_meta, + enc, + &headers, range.start, range.start + range.length, + checksum_mode, + ) + .await + } + (None, None) => { + handle_get_full( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + overrides, + checksum_mode, ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, } } @@ -298,17 +381,43 @@ async fn handle_get_full( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + meta_inner: &ObjectVersionMetaInner, overrides: GetObjectOverrides, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { - let mut resp_builder = object_headers(version, version_meta) - .header(CONTENT_LENGTH, format!("{}", version_meta.size)) - .status(StatusCode::OK); + let mut resp_builder = object_headers( + version, + version_meta, + &meta_inner, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", version_meta.size)) + .status(StatusCode::OK); getobject_override_headers(overrides, &mut resp_builder)?; + let stream = full_object_byte_stream(garage, version, version_data, encryption); + + Ok(resp_builder.body(response_body_from_stream(stream))?) +} + +pub fn full_object_byte_stream( + garage: Arc<Garage>, + version: &ObjectVersion, + version_data: &ObjectVersionData, + encryption: EncryptionParams, +) -> ByteStream { match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) + let bytes = bytes.to_vec(); + Box::pin(futures::stream::once(async move { + encryption + .decrypt_blob(&bytes) + .map(|x| Bytes::from(x.to_vec())) + .map_err(std_error_from_read_error) + })) } ObjectVersionData::FirstBlock(_, first_block_hash) => { let (tx, rx) = mpsc::channel::<ByteStream>(2); @@ -324,19 +433,18 @@ async fn handle_get_full( garage2.version_table.get(&version_uuid, &EmptyKey).await }); - let stream_block_0 = garage - .block_manager - .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + let stream_block_0 = encryption + .get_block(&garage, &first_block_hash, Some(order_stream.order(0))) .await?; + tx.send(stream_block_0) .await .ok_or_message("channel closed")?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { - let stream_block_i = garage - .block_manager - .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + let stream_block_i = encryption + .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64))) .await?; tx.send(stream_block_i) .await @@ -354,8 +462,7 @@ async fn handle_get_full( } }); - let body = response_body_from_block_stream(rx); - Ok(resp_builder.body(body)?) + Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten()) } } } @@ -365,13 +472,16 @@ async fn handle_get_range( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + meta_inner: &ObjectVersionMetaInner, begin: u64, end: u64, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { // Here we do not use getobject_override_headers because we don't // want to add any overridden headers (those should not be added // when returning PARTIAL_CONTENT) - let resp_builder = object_headers(version, version_meta) + let resp_builder = object_headers(version, version_meta, meta_inner, encryption, checksum_mode) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( CONTENT_RANGE, @@ -382,6 +492,7 @@ async fn handle_get_range( match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { + let bytes = encryption.decrypt_blob(&bytes)?; if end as usize <= bytes.len() { let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) @@ -398,7 +509,8 @@ async fn handle_get_range( .await? .ok_or(Error::NoSuchKey)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder.body(body)?) } } @@ -409,17 +521,28 @@ async fn handle_get_part( object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + meta_inner: &ObjectVersionMetaInner, part_number: u64, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { // Same as for get_range, no getobject_override_headers - let resp_builder = - object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); + let resp_builder = object_headers( + object_version, + version_meta, + meta_inner, + encryption, + checksum_mode, + ) + .status(StatusCode::PARTIAL_CONTENT); match version_data { ObjectVersionData::Inline(_, bytes) => { if part_number != 1 { return Err(Error::InvalidPart); } + let bytes = encryption.decrypt_blob(&bytes)?; + assert_eq!(bytes.len() as u64, version_meta.size); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", bytes.len())) .header( @@ -427,7 +550,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(bytes_body(bytes.to_vec().into()))?) + .body(bytes_body(bytes.into_owned().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -439,7 +562,8 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", end - begin)) @@ -492,8 +616,23 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { None } +struct ChecksumMode { + enabled: bool, +} + +fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { + ChecksumMode { + enabled: req + .headers() + .get(X_AMZ_CHECKSUM_MODE) + .map(|x| x == "ENABLED") + .unwrap_or(false), + } +} + fn body_from_blocks_range( garage: Arc<Garage>, + encryption: EncryptionParams, all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, @@ -523,12 +662,11 @@ fn body_from_blocks_range( tokio::spawn(async move { match async { - let garage = garage.clone(); for (i, (block, block_offset)) in blocks.iter().enumerate() { - let block_stream = garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await? + let block_stream = encryption + .get_block(&garage, &block.hash, Some(order_stream.order(i as u64))) + .await?; + let block_stream = block_stream .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { @@ -588,19 +726,30 @@ fn body_from_blocks_range( } fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) - .flatten() - .map(|x| { - x.map(hyper::body::Frame::data) - .map_err(|e| Error::from(garage_util::error::Error::from(e))) - }); + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); + response_body_from_stream(body_stream) +} + +fn response_body_from_stream<S>(stream: S) -> ResBody +where + S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static, +{ + let body_stream = stream.map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); ResBody::new(http_body_util::StreamBody::new(body_stream)) } fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { - let err = std::io::Error::new( + Box::pin(stream::once(future::ready(Err(std_error_from_read_error( + e, + ))))) +} + +fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error { + std::io::Error::new( std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - Box::pin(stream::once(future::ready(Err(err)))) + format!("Error while reading object data: {}", e), + ) } diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 302c03f4..648bace2 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use base64::prelude::*; -use hyper::Response; +use hyper::{Request, Response}; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -15,7 +15,8 @@ use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::*; -use crate::s3::api_server::ResBody; +use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -271,13 +272,21 @@ pub async fn handle_list_multipart_upload( pub async fn handle_list_parts( ctx: ReqCtx, + req: Request<ReqBody>, query: &ListPartsQuery, ) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; + let (_, object_version, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; + + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, + _ => unreachable!(), + }; + let encryption_res = + EncryptionParams::check_decrypt(&ctx.garage, req.headers(), &object_encryption); let (info, next) = fetch_part_info(query, &mpu)?; @@ -296,11 +305,40 @@ pub async fn handle_list_parts( is_truncated: s3_xml::Value(format!("{}", next.is_some())), parts: info .iter() - .map(|part| s3_xml::PartItem { - etag: s3_xml::Value(format!("\"{}\"", part.etag)), - last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), - part_number: s3_xml::IntValue(part.part_number as i64), - size: s3_xml::IntValue(part.size as i64), + .map(|part| { + // hide checksum if object is encrypted and the decryption + // keys are not provided + let checksum = part.checksum.filter(|_| encryption_res.is_ok()); + s3_xml::PartItem { + etag: s3_xml::Value(format!("\"{}\"", part.etag)), + last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), + part_number: s3_xml::IntValue(part.part_number as i64), + size: s3_xml::IntValue(part.size as i64), + checksum_crc32: match &checksum { + Some(ChecksumValue::Crc32(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_crc32c: match &checksum { + Some(ChecksumValue::Crc32c(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_sha1: match &checksum { + Some(ChecksumValue::Sha1(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_sha256: match &checksum { + Some(ChecksumValue::Sha256(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + } }) .collect(), @@ -346,6 +384,7 @@ struct PartInfo<'a> { timestamp: u64, part_number: u64, size: u64, + checksum: Option<ChecksumValue>, } enum ExtractionResult { @@ -486,6 +525,7 @@ fn fetch_part_info<'a>( timestamp: pk.timestamp, etag, size, + checksum: p.checksum, }; match parts.last_mut() { Some(lastpart) if lastpart.part_number == pk.part_number => { @@ -944,10 +984,13 @@ mod tests { timestamp: TS, state: ObjectVersionState::Uploading { multipart: true, - headers: ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::<String, String>::new(), + encryption: ObjectVersionEncryption::Plaintext { + inner: ObjectVersionMetaInner { + headers: vec![], + checksum: None, + }, }, + checksum_algorithm: None, }, } } @@ -1136,6 +1179,7 @@ mod tests { version: uuid, size: Some(3), etag: Some("etag1".into()), + checksum: None, }, ), ( @@ -1147,6 +1191,7 @@ mod tests { version: uuid, size: None, etag: None, + checksum: None, }, ), ( @@ -1158,6 +1203,7 @@ mod tests { version: uuid, size: Some(10), etag: Some("etag2".into()), + checksum: None, }, ), ( @@ -1169,6 +1215,7 @@ mod tests { version: uuid, size: Some(7), etag: Some("etag3".into()), + checksum: None, }, ), ( @@ -1180,6 +1227,7 @@ mod tests { version: uuid, size: Some(5), etag: Some("etag4".into()), + checksum: None, }, ), ]; @@ -1218,12 +1266,14 @@ mod tests { etag: "etag1", timestamp: TS, part_number: 1, - size: 3 + size: 3, + checksum: None, }, PartInfo { etag: "etag2", timestamp: TS, part_number: 3, + checksum: None, size: 10 }, ] @@ -1239,12 +1289,14 @@ mod tests { PartInfo { etag: "etag3", timestamp: TS, + checksum: None, part_number: 5, size: 7 }, PartInfo { etag: "etag4", timestamp: TS, + checksum: None, part_number: 8, size: 5 }, @@ -1268,24 +1320,28 @@ mod tests { PartInfo { etag: "etag1", timestamp: TS, + checksum: None, part_number: 1, size: 3 }, PartInfo { etag: "etag2", timestamp: TS, + checksum: None, part_number: 3, size: 10 }, PartInfo { etag: "etag3", timestamp: TS, + checksum: None, part_number: 5, size: 7 }, PartInfo { etag: "etag4", timestamp: TS, + checksum: None, part_number: 8, size: 5 }, diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index cbdb94ab..b9bb1a6f 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -13,5 +13,7 @@ mod post_object; mod put; mod website; +mod checksum; +mod encryption; mod router; pub mod xml; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 1d5aeb26..3db3e8aa 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::{Request, Response}; -use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; @@ -16,6 +17,8 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -40,6 +43,16 @@ pub async fn handle_create_multipart_upload( let timestamp = next_timestamp(existing_object.as_ref()); let headers = get_headers(req.headers())?; + let meta = ObjectVersionMetaInner { + headers, + checksum: None, + }; + + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?; + let object_encryption = encryption.encrypt_meta(meta)?; + + let checksum_algorithm = request_checksum_algorithm(req.headers())?; // Create object in object table let object_version = ObjectVersion { @@ -47,7 +60,8 @@ pub async fn handle_create_multipart_upload( timestamp, state: ObjectVersionState::Uploading { multipart: true, - headers, + encryption: object_encryption, + checksum_algorithm, }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); @@ -68,7 +82,9 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let mut resp = Response::builder(); + encryption.add_response_headers(&mut resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_put_part( @@ -83,20 +99,37 @@ pub async fn handle_put_part( let upload_id = decode_upload_id(upload_id)?; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let stream = body_stream(req.into_body()); + let (req_head, req_body) = req.into_parts(); + let stream = body_stream(req_body); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = + let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; + // Check encryption params + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), + _ => unreachable!(), + }; + let (encryption, _) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -122,7 +155,9 @@ pub async fn handle_put_part( mpu_part_key, MpuPart { version: version_uuid, + // all these are filled in later, at the end of this function etag: None, + checksum: None, size: None, }, ); @@ -136,24 +171,31 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; + let checksummer = + Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); + let (total_size, checksums, _) = read_and_put_blocks( + &ctx, + &version, + encryption, + part_number, + first_block, + &mut chunker, + checksummer, + ) + .await?; // Verify that checksums map - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + checksums.verify(&expected_checksums)?; // Store part etag in version - let data_md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); + mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, - etag: Some(data_md5sum_hex.clone()), + etag: Some(etag.clone()), + checksum: checksums.extract(checksum_algorithm), size: Some(total_size), }, ); @@ -163,11 +205,10 @@ pub async fn handle_put_part( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - let response = Response::builder() - .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(empty_body()) - .unwrap(); - Ok(response) + let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag)); + encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); + Ok(resp.body(empty_body())?) } struct InterruptedCleanup(Option<InterruptedCleanupInner>); @@ -214,10 +255,11 @@ pub async fn handle_complete_multipart_upload( bucket_name, .. } = &ctx; + let (req_head, req_body) = req.into_parts(); + + let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -241,8 +283,12 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let headers = match object_version.state { - ObjectVersionState::Uploading { headers, .. } => headers, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; @@ -270,6 +316,13 @@ pub async fn handle_complete_multipart_upload( for req_part in body_list_of_parts.iter() { match have_parts.get(&req_part.part_number) { Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum { + if part.checksum != req_part.checksum { + return Err(Error::InvalidDigest(format!( + "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}", + req_part.part_number, req_part.checksum, part.checksum + ))); + } parts.push(*part) } _ => return Err(Error::InvalidPart), @@ -317,18 +370,23 @@ pub async fn handle_complete_multipart_upload( }); garage.block_ref_table.insert_many(block_refs).await?; - // Calculate etag of final object + // Calculate checksum and etag of final object // To understand how etags are calculated, read more here: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html // https://teppen.io/2018/06/23/aws_s3_etags/ - let mut etag_md5_hasher = Md5::new(); + let mut checksummer = MultipartChecksummer::init(checksum_algorithm); for part in parts.iter() { - etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?; } - let etag = format!( - "{}-{}", - hex::encode(etag_md5_hasher.finalize()), - parts.len() - ); + let (checksum_md5, checksum_extra) = checksummer.finalize(); + + if expected_checksum.is_some() && checksum_extra != expected_checksum { + return Err(Error::InvalidDigest( + "Failed to validate x-amz-checksum-*".into(), + )); + } + + let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len()); // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); @@ -341,10 +399,24 @@ pub async fn handle_complete_multipart_upload( return Err(e); } + // If there is a checksum algorithm, update metadata with checksum + let object_encryption = match checksum_algorithm { + None => object_encryption, + Some(_) => { + let (encryption, meta) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + let new_meta = ObjectVersionMetaInner { + headers: meta.into_owned().headers, + checksum: checksum_extra, + }; + encryption.encrypt_meta(new_meta)? + } + }; + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, etag: etag.clone(), }, @@ -361,10 +433,28 @@ pub async fn handle_complete_multipart_upload( bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), + checksum_crc32: match &checksum_extra { + Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_crc32c: match &checksum_extra { + Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha1: match &checksum_extra { + Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha256: match &checksum_extra { + Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let resp = Response::builder(); + let resp = add_checksum_response_headers(&expected_checksum, resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_abort_multipart_upload( @@ -433,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { struct CompleteMultipartUploadPart { etag: String, part_number: u64, + checksum: Option<ChecksumValue>, } fn parse_complete_multipart_upload_body( @@ -458,9 +549,41 @@ fn parse_complete_multipart_upload_body( .children() .find(|e| e.has_tag_name("PartNumber"))? .text()?; + let checksum = if let Some(crc32) = + item.children().find(|e| e.has_tag_name("ChecksumCRC32")) + { + Some(ChecksumValue::Crc32( + BASE64_STANDARD.decode(crc32.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C")) + { + Some(ChecksumValue::Crc32c( + BASE64_STANDARD.decode(crc32c.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) { + Some(ChecksumValue::Sha1( + BASE64_STANDARD.decode(sha1.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256")) + { + Some(ChecksumValue::Sha256( + BASE64_STANDARD.decode(sha256.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else { + None + }; parts.push(CompleteMultipartUploadPart { etag: etag.trim_matches('"').to_string(), part_number: part_number.parse().ok()?, + checksum, }); } else { return None; diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 66f8174c..2c106b3b 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,12 +14,15 @@ use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use garage_model::s3::object_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::checksum::*; use crate::s3::cors::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; -use crate::s3::put::{get_headers, save_stream}; +use crate::s3::put::{get_headers, save_stream, ChecksumMode}; use crate::s3::xml as s3_xml; use crate::signature::payload::{verify_v4, Authorization}; @@ -48,13 +51,17 @@ pub async fn handle_post_object( let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); - let field = loop { + let file_field = loop { let field = if let Some(field) = multipart.next_field().await? { field } else { return Err(Error::bad_request("Request did not contain a file")); }; - let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { + let name: HeaderName = if let Some(Ok(name)) = field + .name() + .map(str::to_ascii_lowercase) + .map(TryInto::try_into) + { name } else { continue; @@ -96,7 +103,7 @@ pub async fn handle_post_object( let key = if key.contains("${filename}") { // if no filename is provided, don't replace. This matches the behavior of AWS. - if let Some(filename) = field.file_name() { + if let Some(filename) = file_field.file_name() { key.replace("${filename}", filename) } else { key.to_owned() @@ -143,9 +150,8 @@ pub async fn handle_post_object( let mut conditions = decoded_policy.into_conditions()?; for (param_key, value) in params.iter() { - let mut param_key = param_key.to_string(); - param_key.make_ascii_lowercase(); - match param_key.as_str() { + let param_key = param_key.as_str(); + match param_key { "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields "content-type" => { let conds = conditions.params.remove("content-type").ok_or_else(|| { @@ -190,7 +196,7 @@ pub async fn handle_post_object( // how aws seems to behave. continue; } - let conds = conditions.params.remove(¶m_key).ok_or_else(|| { + let conds = conditions.params.remove(param_key).ok_or_else(|| { Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { @@ -218,8 +224,24 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; - let stream = field.map(|r| r.map_err(Into::into)); + let expected_checksums = ExpectedChecksums { + md5: params + .get("content-md5") + .map(HeaderValue::to_str) + .transpose()? + .map(str::to_string), + sha256: None, + extra: request_checksum_algorithm_value(¶ms)?, + }; + + let meta = ObjectVersionMetaInner { + headers, + checksum: expected_checksums.extra, + }; + + let encryption = EncryptionParams::new_from_headers(&garage, ¶ms)?; + let stream = file_field.map(|r| r.map_err(Into::into)); let ctx = ReqCtx { garage, bucket_id, @@ -228,17 +250,17 @@ pub async fn handle_post_object( api_key, }; - let (_, md5) = save_stream( + let res = save_stream( &ctx, - headers, + meta, + encryption, StreamLimiter::new(stream, conditions.content_length), &key, - None, - None, + ChecksumMode::Verify(&expected_checksums), ) .await?; - let etag = format!("\"{}\"", md5); + let etag = format!("\"{}\"", res.etag); let mut resp = if let Some(mut target) = params .get("success_action_redirect") @@ -252,11 +274,12 @@ pub async fn handle_post_object( .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); - Response::builder() + let mut resp = Response::builder() .status(StatusCode::SEE_OTHER) .header(header::LOCATION, target.clone()) - .header(header::ETAG, etag) - .body(string_body(target))? + .header(header::ETAG, etag); + encryption.add_response_headers(&mut resp); + resp.body(string_body(target))? } else { let path = head .uri @@ -283,9 +306,10 @@ pub async fn handle_post_object( .get("success_action_status") .and_then(|h| h.to_str().ok()) .unwrap_or("204"); - let builder = Response::builder() + let mut builder = Response::builder() .header(header::LOCATION, location.clone()) .header(header::ETAG, etag.clone()); + encryption.add_response_headers(&mut builder); match action { "200" => builder.status(StatusCode::OK).body(empty_body())?, "201" => { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 685cca80..1e3b1b44 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,12 +1,9 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; -use base64::prelude::*; use futures::prelude::*; use futures::stream::FuturesOrdered; use futures::try_join; -use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; -use sha2::Sha256; use tokio::sync::mpsc; @@ -22,7 +19,6 @@ use opentelemetry::{ use garage_net::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; -use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -36,10 +32,24 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; +pub(crate) struct SaveStreamResult { + pub(crate) version_uuid: Uuid, + pub(crate) version_timestamp: u64, + /// Etag WITHOUT THE QUOTES (just the hex value) + pub(crate) etag: String, +} + +pub(crate) enum ChecksumMode<'a> { + Verify(&'a ExpectedChecksums), + Calculate(Option<ChecksumAlgorithm>), +} + pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, @@ -50,26 +60,51 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, + }; + + let meta = ObjectVersionMetaInner { + headers, + checksum: expected_checksums.extra, }; + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let stream = body_stream(req.into_body()); - save_stream(&ctx, headers, stream, key, content_md5, content_sha256) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + let res = save_stream( + &ctx, + meta, + encryption, + stream, + key, + ChecksumMode::Verify(&expected_checksums), + ) + .await?; + + let mut resp = Response::builder() + .header("x-amz-version-id", hex::encode(res.version_uuid)) + .header("ETag", format!("\"{}\"", res.etag)); + encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); + Ok(resp.body(empty_body())?) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, - headers: ObjectVersionHeaders, + mut meta: ObjectVersionMetaInner, + encryption: EncryptionParams, body: S, key: &String, - content_md5: Option<String>, - content_sha256: Option<FixedBytes32>, -) -> Result<(Uuid, String), Error> { + checksum_mode: ChecksumMode<'_>, +) -> Result<SaveStreamResult, Error> { let ReqCtx { garage, bucket_id, .. } = ctx; @@ -86,43 +121,55 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); + let mut checksummer = match checksum_mode { + ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), + ChecksumMode::Calculate(algo) => { + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + } + }; + // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { - let mut md5sum = Md5::new(); - md5sum.update(&first_block[..]); - let data_md5sum = md5sum.finalize(); - let data_md5sum_hex = hex::encode(data_md5sum); + checksummer.update(&first_block); + let checksums = checksummer.finalize(); - let data_sha256sum = sha256sum(&first_block[..]); - let size = first_block.len() as u64; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; + let size = first_block.len() as u64; check_quotas(ctx, size, existing_object.as_ref()).await?; + let etag = encryption.etag_from_md5(&checksums.md5); + let inline_data = encryption.encrypt_blob(&first_block)?.to_vec(); + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - headers, + encryption: encryption.encrypt_meta(meta)?, size, - etag: data_md5sum_hex.clone(), + etag: etag.clone(), }, - first_block.to_vec(), + inline_data, )), }; let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - return Ok((version_uuid, data_md5sum_hex)); + return Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }); } // The following consists in many steps that can each fail. @@ -142,7 +189,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - headers: headers.clone(), + encryption: encryption.encrypt_meta(meta.clone())?, + checksum_algorithm: None, // don't care; overwritten later multipart: false, }, }; @@ -163,26 +211,39 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ); garage.version_table.insert(&version).await?; - // Transfer data and verify checksum - let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + // Transfer data + let (total_size, checksums, first_block_hash) = read_and_put_blocks( + ctx, + &version, + encryption, + 1, + first_block, + &mut chunker, + checksummer, + ) + .await?; + + // Verify checksums are ok / add calculated checksum to metadata + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; + // Verify quotas are respsected check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: encryption.encrypt_meta(meta)?, size: total_size, - etag: md5sum_hex.clone(), + etag: etag.clone(), }, first_block_hash, )); @@ -193,34 +254,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - Ok((version_uuid, md5sum_hex)) -} - -/// Validate MD5 sum against content-md5 header -/// and sha256sum against signed content-sha256 -pub(crate) fn ensure_checksum_matches( - data_md5sum: &[u8], - data_sha256sum: garage_util::data::FixedBytes32, - content_md5: Option<&str>, - content_sha256: Option<garage_util::data::FixedBytes32>, -) -> Result<(), Error> { - if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != data_sha256sum { - return Err(Error::bad_request( - "Unable to validate x-amz-content-sha256", - )); - } else { - trace!("Successfully validated x-amz-content-sha256"); - } - } - if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { - return Err(Error::bad_request("Unable to validate content-md5")); - } else { - trace!("Successfully validated content-md5"); - } - } - Ok(()) + Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }) } /// Check that inserting this object with this size doesn't exceed bucket quotas @@ -248,7 +286,7 @@ pub(crate) async fn check_quotas( .await?; let counters = counters - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let (prev_cnt_obj, prev_cnt_size) = match prev_object { @@ -290,10 +328,12 @@ pub(crate) async fn check_quotas( pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, version: &Version, + encryption: EncryptionParams, part_number: u64, first_block: Bytes, chunker: &mut StreamChunker<S>, -) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> { + checksummer: Checksummer, +) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2); @@ -321,20 +361,20 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1); let hash_stream = async { - let md5hasher = AsyncHasher::<Md5>::new(); - let sha256hasher = AsyncHasher::<Sha256>::new(); + let mut checksummer = checksummer; while let Some(next) = block_rx.recv().await { match next { Ok(block) => { block_tx2.send(Ok(block.clone())).await?; - futures::future::join( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - ) + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&block); + checksummer + }) .with_context(Context::current_with_span( tracer.start("Hash block (md5, sha256)"), )) - .await; + .await + .unwrap() } Err(e) => { block_tx2.send(Err(e)).await?; @@ -343,27 +383,38 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } } drop(block_tx2); - Ok::<_, mpsc::error::SendError<_>>(futures::join!( - md5hasher.finalize(), - sha256hasher.finalize() - )) + Ok::<_, mpsc::error::SendError<_>>(checksummer) }; - let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1); - let hash_blocks = async { + let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1); + let encrypt_hash_blocks = async { let mut first_block_hash = None; while let Some(next) = block_rx2.recv().await { match next { Ok(block) => { - let hash = async_blake2sum(block.clone()) - .with_context(Context::current_with_span( - tracer.start("Hash block (blake2)"), - )) - .await; - if first_block_hash.is_none() { - first_block_hash = Some(hash); + let unencrypted_len = block.len() as u64; + let res = tokio::task::spawn_blocking(move || { + let block = encryption.encrypt_block(block)?; + let hash = blake2sum(&block); + Ok((block, hash)) + }) + .with_context(Context::current_with_span( + tracer.start("Encrypt and hash (blake2) block"), + )) + .await + .unwrap(); + match res { + Ok((block, hash)) => { + if first_block_hash.is_none() { + first_block_hash = Some(hash); + } + block_tx3.send(Ok((block, unencrypted_len, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } } - block_tx3.send(Ok((block, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -398,7 +449,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + block_rx3.recv().await } }; - let (block, hash) = tokio::select! { + let (block, unencrypted_len, hash) = tokio::select! { result = write_futs_next => { result?; continue; @@ -410,17 +461,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; // For next block to be written: count its size and spawn future to write it - let offset = written_bytes; - written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( ctx, version, part_number, - offset, + written_bytes, hash, block, + unencrypted_len, + encryption.is_encrypted(), order_stream.order(written_bytes), )); + written_bytes += unencrypted_len; } while let Some(res) = write_futs.next().await { res?; @@ -429,17 +481,15 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; let (_, stream_hash_result, block_hash_result, final_result) = - futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); + futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks); let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed // later in the pipeline which already caused a return at the ? on previous line - let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); let first_block_hash = block_hash_result.unwrap(); + let checksums = stream_hash_result.unwrap().finalize(); - let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - - Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) + Ok((total_size, checksums, first_block_hash)) } async fn put_block_and_meta( @@ -449,6 +499,8 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + size: u64, + is_encrypted: bool, order_tag: OrderTag, ) -> Result<(), GarageError> { let ReqCtx { garage, .. } = ctx; @@ -459,10 +511,7 @@ async fn put_block_and_meta( part_number, offset, }, - VersionBlock { - hash, - size: block.len() as u64, - }, + VersionBlock { hash, size }, ); let block_ref = BlockRef { @@ -474,7 +523,7 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), + .rpc_put_block(hash, block, is_encrypted, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; @@ -517,14 +566,6 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> { - Response::builder() - .header("x-amz-version-id", hex::encode(version_uuid)) - .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(empty_body()) - .unwrap() -} - struct InterruptedCleanup(Option<InterruptedCleanupInner>); struct InterruptedCleanupInner { garage: Arc<Garage>, @@ -559,57 +600,35 @@ impl Drop for InterruptedCleanup { // ============ helpers ============ -pub(crate) fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { - Ok(headers - .get(hyper::header::CONTENT_TYPE) - .map(|x| x.to_str()) - .unwrap_or(Ok("blob"))? - .to_string()) -} - -pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> { - let content_type = get_mime_type(headers)?; - let mut other = BTreeMap::new(); +pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> { + let mut ret = Vec::new(); // Preserve standard headers let standard_header = vec![ + hyper::header::CONTENT_TYPE, hyper::header::CACHE_CONTROL, hyper::header::CONTENT_DISPOSITION, hyper::header::CONTENT_ENCODING, hyper::header::CONTENT_LANGUAGE, hyper::header::EXPIRES, ]; - for h in standard_header.iter() { - if let Some(v) = headers.get(h) { - match v.to_str() { - Ok(v_str) => { - other.insert(h.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", h, e); - } - } + for name in standard_header.iter() { + if let Some(value) = headers.get(name) { + ret.push((name.to_string(), value.to_str()?.to_string())); } } // Preserve x-amz-meta- headers - for (k, v) in headers.iter() { - if k.as_str().starts_with("x-amz-meta-") { - match std::str::from_utf8(v.as_bytes()) { - Ok(v_str) => { - other.insert(k.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", k, e); - } - } + for (name, value) in headers.iter() { + if name.as_str().starts_with("x-amz-meta-") { + ret.push(( + name.to_string(), + std::str::from_utf8(value.as_bytes())?.to_string(), + )); } } - Ok(ObjectVersionHeaders { - content_type, - other, - }) + Ok(ret) } pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 06f11288..1e569ade 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -131,6 +131,14 @@ pub struct CompleteMultipartUploadResult { pub key: Value, #[serde(rename = "ETag")] pub etag: Value, + #[serde(rename = "ChecksumCRC32")] + pub checksum_crc32: Option<Value>, + #[serde(rename = "ChecksumCRC32C")] + pub checksum_crc32c: Option<Value>, + #[serde(rename = "ChecksumSHA1")] + pub checksum_sha1: Option<Value>, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option<Value>, } #[derive(Debug, Serialize, PartialEq, Eq)] @@ -197,6 +205,14 @@ pub struct PartItem { pub part_number: IntValue, #[serde(rename = "Size")] pub size: IntValue, + #[serde(rename = "ChecksumCRC32")] + pub checksum_crc32: Option<Value>, + #[serde(rename = "ChecksumCRC32C")] + pub checksum_crc32c: Option<Value>, + #[serde(rename = "ChecksumSHA1")] + pub checksum_sha1: Option<Value>, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option<Value>, } #[derive(Debug, Serialize, PartialEq, Eq)] @@ -500,6 +516,10 @@ mod tests { bucket: Value("mybucket".to_string()), key: Value("a/plop".to_string()), etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()), + checksum_crc32: None, + checksum_crc32c: None, + checksum_sha1: Some(Value("ZJAnHyG8PeKz9tI8UTcHrJos39A=".into())), + checksum_sha256: None, }; assert_eq!( to_xml_with_header(&result)?, @@ -509,6 +529,7 @@ mod tests { <Bucket>mybucket</Bucket>\ <Key>a/plop</Key>\ <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>\ + <ChecksumSHA1>ZJAnHyG8PeKz9tI8UTcHrJos39A=</ChecksumSHA1>\ </CompleteMultipartUploadResult>" ); Ok(()) @@ -780,12 +801,22 @@ mod tests { last_modified: Value("2010-11-10T20:48:34.000Z".to_string()), part_number: IntValue(2), size: IntValue(10485760), + checksum_crc32: None, + checksum_crc32c: None, + checksum_sha256: Some(Value( + "5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=".into(), + )), + checksum_sha1: None, }, PartItem { etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()), last_modified: Value("2010-11-10T20:48:33.000Z".to_string()), part_number: IntValue(3), size: IntValue(10485760), + checksum_sha256: None, + checksum_crc32c: None, + checksum_crc32: Some(Value("ZJAnHyG8=".into())), + checksum_sha1: None, }, ], initiator: Initiator { @@ -820,12 +851,14 @@ mod tests { <LastModified>2010-11-10T20:48:34.000Z</LastModified>\ <PartNumber>2</PartNumber>\ <Size>10485760</Size>\ + <ChecksumSHA256>5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=</ChecksumSHA256>\ </Part>\ <Part>\ <ETag>"aaaa18db4cc2f85cedef654fccc4a4x8"</ETag>\ <LastModified>2010-11-10T20:48:33.000Z</LastModified>\ <PartNumber>3</PartNumber>\ <Size>10485760</Size>\ + <ChecksumCRC32>ZJAnHyG8=</ChecksumCRC32>\ </Part>\ <Initiator>\ <DisplayName>umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx</DisplayName>\ |