diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 3 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 2 | ||||
-rw-r--r-- | src/api/s3/checksum.rs | 406 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 132 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 44 | ||||
-rw-r--r-- | src/api/s3/error.rs | 6 | ||||
-rw-r--r-- | src/api/s3/get.rs | 147 | ||||
-rw-r--r-- | src/api/s3/list.rs | 77 | ||||
-rw-r--r-- | src/api/s3/mod.rs | 1 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 159 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 28 | ||||
-rw-r--r-- | src/api/s3/put.rs | 221 | ||||
-rw-r--r-- | src/api/s3/xml.rs | 33 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/tests/s3/multipart.rs | 158 | ||||
-rw-r--r-- | src/model/s3/mpu_table.rs | 9 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 53 | ||||
-rw-r--r-- | src/util/async_hash.rs | 61 | ||||
-rw-r--r-- | src/util/lib.rs | 1 |
19 files changed, 1209 insertions, 333 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index bcf6a537..1b87496c 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,6 +28,8 @@ async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true +crc32fast.workspace = true +crc32c.workspace = true crypto-common.workspace = true err-derive.workspace = true hex.workspace = true @@ -37,6 +39,7 @@ tracing.workspace = true md-5.workspace = true nom.workspace = true pin-project.workspace = true +sha1.workspace = true sha2.workspace = true futures.workspace = true diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 1ed30996..1737af33 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -325,7 +325,7 @@ impl ApiHandler for S3ApiServer { part_number_marker: part_number_marker.map(|p| p.min(10000)), max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), }; - handle_list_parts(ctx, &query).await + handle_list_parts(ctx, req, &query).await } Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs new file mode 100644 index 00000000..c9dc001c --- /dev/null +++ b/src/api/s3/checksum.rs @@ -0,0 +1,406 @@ +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; + +use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; + +use http::{HeaderMap, HeaderName, HeaderValue}; + +use garage_util::data::*; +use garage_util::error::OkOrMessage; + +use garage_model::s3::object_table::*; + +use crate::s3::error::*; + +pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = + HeaderName::from_static("x-amz-checksum-algorithm"); +pub const X_AMZ_CHECKSUM_MODE: HeaderName = HeaderName::from_static("x-amz-checksum-mode"); +pub const X_AMZ_CHECKSUM_CRC32: HeaderName = HeaderName::from_static("x-amz-checksum-crc32"); +pub const X_AMZ_CHECKSUM_CRC32C: HeaderName = HeaderName::from_static("x-amz-checksum-crc32c"); +pub const X_AMZ_CHECKSUM_SHA1: HeaderName = HeaderName::from_static("x-amz-checksum-sha1"); +pub const X_AMZ_CHECKSUM_SHA256: HeaderName = HeaderName::from_static("x-amz-checksum-sha256"); + +pub type Crc32Checksum = [u8; 4]; +pub type Crc32cChecksum = [u8; 4]; +pub type Md5Checksum = [u8; 16]; +pub type Sha1Checksum = [u8; 20]; +pub type Sha256Checksum = [u8; 32]; + +#[derive(Debug, Default)] +pub(crate) struct ExpectedChecksums { + // base64-encoded md5 (content-md5 header) + pub md5: Option<String>, + // content_sha256 (as a Hash / FixedBytes32) + pub sha256: Option<Hash>, + // extra x-amz-checksum-* header + pub extra: Option<ChecksumValue>, +} + +pub(crate) struct Checksummer { + pub crc32: Option<Crc32>, + pub crc32c: Option<Crc32c>, + pub md5: Option<Md5>, + pub sha1: Option<Sha1>, + pub sha256: Option<Sha256>, +} + +#[derive(Default)] +pub(crate) struct Checksums { + pub crc32: Option<Crc32Checksum>, + pub crc32c: Option<Crc32cChecksum>, + pub md5: Option<Md5Checksum>, + pub sha1: Option<Sha1Checksum>, + pub sha256: Option<Sha256Checksum>, +} + +impl Checksummer { + pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + let mut ret = Self { + crc32: None, + crc32c: None, + md5: None, + sha1: None, + sha256: None, + }; + + if expected.md5.is_some() || require_md5 { + ret.md5 = Some(Md5::new()); + } + if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { + ret.sha256 = Some(Sha256::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { + ret.crc32 = Some(Crc32::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { + ret.crc32c = Some(Crc32c::default()); + } + if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { + ret.sha1 = Some(Sha1::new()); + } + ret + } + + pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { + match algo { + Some(ChecksumAlgorithm::Crc32) => { + self.crc32 = Some(Crc32::new()); + } + Some(ChecksumAlgorithm::Crc32c) => { + self.crc32c = Some(Crc32c::default()); + } + Some(ChecksumAlgorithm::Sha1) => { + self.sha1 = Some(Sha1::new()); + } + Some(ChecksumAlgorithm::Sha256) => { + self.sha256 = Some(Sha256::new()); + } + None => (), + } + self + } + + pub(crate) fn update(&mut self, bytes: &[u8]) { + if let Some(crc32) = &mut self.crc32 { + crc32.update(bytes); + } + if let Some(crc32c) = &mut self.crc32c { + crc32c.write(bytes); + } + if let Some(md5) = &mut self.md5 { + md5.update(bytes); + } + if let Some(sha1) = &mut self.sha1 { + sha1.update(bytes); + } + if let Some(sha256) = &mut self.sha256 { + sha256.update(bytes); + } + } + + pub(crate) fn finalize(self) -> Checksums { + Checksums { + crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())), + crc32c: self + .crc32c + .map(|x| u32::to_be_bytes(u32::try_from(x.finish()).unwrap())), + md5: self.md5.map(|x| x.finalize()[..].try_into().unwrap()), + sha1: self.sha1.map(|x| x.finalize()[..].try_into().unwrap()), + sha256: self.sha256.map(|x| x.finalize()[..].try_into().unwrap()), + } + } +} + +impl Checksums { + pub fn verify(&self, expected: &ExpectedChecksums) -> Result<(), Error> { + if let Some(expected_md5) = &expected.md5 { + match self.md5 { + Some(md5) if BASE64_STANDARD.encode(&md5) == expected_md5.trim_matches('"') => (), + _ => { + return Err(Error::InvalidDigest( + "MD5 checksum verification failed (from content-md5)".into(), + )) + } + } + } + if let Some(expected_sha256) = &expected.sha256 { + match self.sha256 { + Some(sha256) if &sha256[..] == expected_sha256.as_slice() => (), + _ => { + return Err(Error::InvalidDigest( + "SHA256 checksum verification failed (from x-amz-content-sha256)".into(), + )) + } + } + } + if let Some(extra) = expected.extra { + let algo = extra.algorithm(); + if self.extract(Some(algo)) != Some(extra) { + return Err(Error::InvalidDigest(format!( + "Failed to validate checksum for algorithm {:?}", + algo + ))); + } + } + Ok(()) + } + + pub fn extract(&self, algo: Option<ChecksumAlgorithm>) -> Option<ChecksumValue> { + match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => Some(ChecksumValue::Crc32(self.crc32.unwrap())), + Some(ChecksumAlgorithm::Crc32c) => Some(ChecksumValue::Crc32c(self.crc32c.unwrap())), + Some(ChecksumAlgorithm::Sha1) => Some(ChecksumValue::Sha1(self.sha1.unwrap())), + Some(ChecksumAlgorithm::Sha256) => Some(ChecksumValue::Sha256(self.sha256.unwrap())), + } + } +} + +// ---- + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} + +// ---- + +/// Extract the value of the x-amz-checksum-algorithm header +pub(crate) fn request_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + None => Ok(None), + Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +/// Extract the value of any of the x-amz-checksum-* headers +pub(crate) fn request_checksum_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + let mut ret = vec![]; + + if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { + let crc32 = BASE64_STANDARD + .decode(&crc32_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + ret.push(ChecksumValue::Crc32(crc32)) + } + if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { + let crc32c = BASE64_STANDARD + .decode(&crc32c_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + ret.push(ChecksumValue::Crc32c(crc32c)) + } + if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { + let sha1 = BASE64_STANDARD + .decode(&sha1_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + ret.push(ChecksumValue::Sha1(sha1)) + } + if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { + let sha256 = BASE64_STANDARD + .decode(&sha256_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + ret.push(ChecksumValue::Sha256(sha256)) + } + + if ret.len() > 1 { + return Err(Error::bad_request( + "multiple x-amz-checksum-* headers given", + )); + } + Ok(ret.pop()) +} + +/// Checks for the presense of x-amz-checksum-algorithm +/// if so extract the corrseponding x-amz-checksum-* value +pub(crate) fn request_checksum_algorithm_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + Some(x) if x == "CRC32" => { + let crc32 = headers + .get(X_AMZ_CHECKSUM_CRC32) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + Ok(Some(ChecksumValue::Crc32(crc32))) + } + Some(x) if x == "CRC32C" => { + let crc32c = headers + .get(X_AMZ_CHECKSUM_CRC32C) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + Ok(Some(ChecksumValue::Crc32c(crc32c))) + } + Some(x) if x == "SHA1" => { + let sha1 = headers + .get(X_AMZ_CHECKSUM_SHA1) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + Ok(Some(ChecksumValue::Sha1(sha1))) + } + Some(x) if x == "SHA256" => { + let sha256 = headers + .get(X_AMZ_CHECKSUM_SHA256) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + Ok(Some(ChecksumValue::Sha256(sha256))) + } + Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), + None => Ok(None), + } +} + +pub(crate) fn add_checksum_response_headers( + checksum: &Option<ChecksumValue>, + mut resp: http::response::Builder, +) -> http::response::Builder { + match checksum { + Some(ChecksumValue::Crc32(crc32)) => { + resp = resp.header(X_AMZ_CHECKSUM_CRC32, BASE64_STANDARD.encode(&crc32)); + } + Some(ChecksumValue::Crc32c(crc32c)) => { + resp = resp.header(X_AMZ_CHECKSUM_CRC32C, BASE64_STANDARD.encode(&crc32c)); + } + Some(ChecksumValue::Sha1(sha1)) => { + resp = resp.header(X_AMZ_CHECKSUM_SHA1, BASE64_STANDARD.encode(&sha1)); + } + Some(ChecksumValue::Sha256(sha256)) => { + resp = resp.header(X_AMZ_CHECKSUM_SHA256, BASE64_STANDARD.encode(&sha256)); + } + None => (), + } + resp +} diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 2b29ec6d..411a6917 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -2,7 +2,6 @@ use std::pin::Pin; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryStreamExt}; -use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; use hyper::{Request, Response}; @@ -23,11 +22,12 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::get::full_object_byte_stream; use crate::s3::multipart; -use crate::s3::put::{get_headers, save_stream, SaveStreamResult}; +use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; use crate::s3::xml::{self as s3_xml, xmlns_tag}; // -------- CopyObject --------- @@ -39,6 +39,8 @@ pub async fn handle_copy( ) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; + let checksum_algorithm = request_checksum_algorithm(req.headers())?; + let source_object = get_copy_source(&ctx, req).await?; let (source_version, source_version_data, source_version_meta) = @@ -48,7 +50,7 @@ pub async fn handle_copy( copy_precondition.check(source_version, &source_version_meta.etag)?; // Determine encryption parameters - let (source_encryption, source_object_headers) = + let (source_encryption, source_object_meta_inner) = EncryptionParams::check_decrypt_for_copy_source( &ctx.garage, req.headers(), @@ -56,23 +58,54 @@ pub async fn handle_copy( )?; let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - // Determine headers of destination object - let dest_object_headers = match req.headers().get("x-amz-metadata-directive") { - Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => { - get_headers(req.headers())? - } - _ => source_object_headers.into_owned(), + // Extract source checksum info before source_object_meta_inner is consumed + let source_checksum = source_object_meta_inner.checksum; + let source_checksum_algorithm = source_checksum.map(|x| x.algorithm()); + + // If source object has a checksum, the destination object must as well. + // The x-amz-checksum-algorihtm header allows to change that algorithm, + // but if it is absent, we must use the same as before + let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm); + + // Determine metadata of destination object + let was_multipart = source_version_meta.etag.contains('-'); + let dest_object_meta = ObjectVersionMetaInner { + headers: match req.headers().get("x-amz-metadata-directive") { + Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => { + get_headers(req.headers())? + } + _ => source_object_meta_inner.into_owned().headers, + }, + checksum: source_checksum, }; // Do actual object copying - let res = if EncryptionParams::is_same(&source_encryption, &dest_encryption) { - // If source and dest are both unencrypted, or if the encryption keys - // are the same, we can just copy the metadata and link blocks of the + // + // In any of the following scenarios, we need to read the whole object + // data and re-write it again: + // + // - the data needs to be decrypted or encrypted + // - the requested checksum algorithm requires us to recompute a checksum + // - the original object was a multipart upload and a checksum algorithm + // is defined (AWS specifies that in this case, we must recompute the + // checksum from scratch as if this was a single big object and not + // a multipart object, as the checksums are not computed in the same way) + // + // In other cases, we can just copy the metadata and reference the same blocks. + // + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + + let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption) + || source_checksum_algorithm != checksum_algorithm + || (was_multipart && checksum_algorithm.is_some()); + + let res = if !must_recopy { + // In most cases, we can just copy the metadata and link blocks of the // old object from the new object. handle_copy_metaonly( ctx, dest_key, - dest_object_headers, + dest_object_meta, dest_encryption, source_version, source_version_data, @@ -80,16 +113,27 @@ pub async fn handle_copy( ) .await? } else { + let expected_checksum = ExpectedChecksums { + md5: None, + sha256: None, + extra: source_checksum, + }; + let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm { + ChecksumMode::Calculate(checksum_algorithm) + } else { + ChecksumMode::Verify(&expected_checksum) + }; // If source and dest encryption use different keys, // we must decrypt content and re-encrypt, so rewrite all data blocks. handle_copy_reencrypt( ctx, dest_key, - dest_object_headers, + dest_object_meta, dest_encryption, source_version, source_version_data, source_encryption, + checksum_mode, ) .await? }; @@ -115,7 +159,7 @@ pub async fn handle_copy( async fn handle_copy_metaonly( ctx: ReqCtx, dest_key: &str, - dest_object_headers: ObjectVersionHeaders, + dest_object_meta: ObjectVersionMetaInner, dest_encryption: EncryptionParams, source_version: &ObjectVersion, source_version_data: &ObjectVersionData, @@ -132,7 +176,7 @@ async fn handle_copy_metaonly( let new_timestamp = now_msec(); let new_meta = ObjectVersionMeta { - encryption: dest_encryption.encrypt_headers(dest_object_headers)?, + encryption: dest_encryption.encrypt_meta(dest_object_meta)?, size: source_version_meta.size, etag: source_version_meta.etag.clone(), }; @@ -180,6 +224,7 @@ async fn handle_copy_metaonly( timestamp: new_timestamp, state: ObjectVersionState::Uploading { encryption: new_meta.encryption.clone(), + checksum_algorithm: None, multipart: false, }, }; @@ -252,11 +297,12 @@ async fn handle_copy_metaonly( async fn handle_copy_reencrypt( ctx: ReqCtx, dest_key: &str, - dest_object_headers: ObjectVersionHeaders, + dest_object_meta: ObjectVersionMetaInner, dest_encryption: EncryptionParams, source_version: &ObjectVersion, source_version_data: &ObjectVersionData, source_encryption: EncryptionParams, + checksum_mode: ChecksumMode<'_>, ) -> Result<SaveStreamResult, Error> { // basically we will read the source data (decrypt if necessary) // and save that in a new object (encrypt if necessary), @@ -270,12 +316,11 @@ async fn handle_copy_reencrypt( save_stream( &ctx, - dest_object_headers, + dest_object_meta, dest_encryption, source_stream.map_err(|e| Error::from(GarageError::from(e))), &dest_key.to_string(), - None, - None, + checksum_mode, ) .await } @@ -313,8 +358,12 @@ pub async fn handle_upload_part_copy( req.headers(), &source_version_meta.encryption, )?; - let dest_object_encryption = match dest_version.state { - ObjectVersionState::Uploading { encryption, .. } => encryption, + let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; let (dest_encryption, _) = @@ -412,7 +461,9 @@ pub async fn handle_upload_part_copy( dest_mpu_part_key, MpuPart { version: dest_version_id, + // These are all filled in later (bottom of this function) etag: None, + checksum: None, size: None, }, ); @@ -429,7 +480,8 @@ pub async fn handle_upload_part_copy( garage.version_table.insert(&dest_version).await?; // Now, actually copy the blocks - let mut md5hasher = Md5::new(); + let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted()) + .add(dest_object_checksum_algorithm); // First, create a stream that is able to read the source blocks // and extract the subrange if necessary. @@ -495,18 +547,24 @@ pub async fn handle_upload_part_copy( } let data_len = data.len() as u64; - md5hasher.update(&data[..]); - - let (final_data, must_upload, final_hash) = match existing_block_hash { - Some(hash) if same_encryption => (data, false, hash), - _ => tokio::task::spawn_blocking(move || { - let data_enc = dest_encryption.encrypt_block(data)?; - let hash = blake2sum(&data_enc); - Ok::<_, Error>((data_enc, true, hash)) + + let (checksummer_updated, (data_to_upload, final_hash)) = + tokio::task::spawn_blocking(move || { + checksummer.update(&data[..]); + + let tup = match existing_block_hash { + Some(hash) if same_encryption => (None, hash), + _ => { + let data_enc = dest_encryption.encrypt_block(data)?; + let hash = blake2sum(&data_enc); + (Some(data_enc), hash) + } + }; + Ok::<_, Error>((checksummer, tup)) }) .await - .unwrap()?, - }; + .unwrap()?; + checksummer = checksummer_updated; dest_version.blocks.clear(); dest_version.blocks.put( @@ -531,7 +589,7 @@ pub async fn handle_upload_part_copy( // Thing 1: if the block is not exactly a block that existed before, // we need to insert that data as a new block. async { - if must_upload { + if let Some(final_data) = data_to_upload { garage .block_manager .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None) @@ -552,8 +610,9 @@ pub async fn handle_upload_part_copy( assert_eq!(current_offset, source_range.length); - let data_md5sum = md5hasher.finalize(); - let etag = dest_encryption.etag_from_md5(&data_md5sum); + let checksums = checksummer.finalize(); + let etag = dest_encryption.etag_from_md5(&checksums.md5); + let checksum = checksums.extract(dest_object_checksum_algorithm); // Put the part's ETag in the Versiontable dest_mpu.parts.put( @@ -561,6 +620,7 @@ pub async fn handle_upload_part_copy( MpuPart { version: dest_version_id, etag: Some(etag.clone()), + checksum, size: Some(current_offset), }, ); diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 2b105e90..2e6ed65c 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -26,9 +26,10 @@ use garage_util::error::Error as GarageError; use garage_util::migrate::Migrate; use garage_model::garage::Garage; -use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders}; +use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; use crate::common_error::*; +use crate::s3::checksum::Md5Checksum; use crate::s3::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = @@ -124,7 +125,7 @@ impl EncryptionParams { garage: &Garage, headers: &HeaderMap, obj_enc: &'a ObjectVersionEncryption, - ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { let key = parse_request_headers( headers, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, @@ -138,7 +139,7 @@ impl EncryptionParams { garage: &Garage, headers: &HeaderMap, obj_enc: &'a ObjectVersionEncryption, - ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { let key = parse_request_headers( headers, &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, @@ -152,14 +153,11 @@ impl EncryptionParams { garage: &Garage, key: Option<(Key<Aes256Gcm>, Md5Output)>, obj_enc: &'a ObjectVersionEncryption, - ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { + ) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> { match (key, &obj_enc) { ( Some((client_key, client_key_md5)), - ObjectVersionEncryption::SseC { - headers, - compressed, - }, + ObjectVersionEncryption::SseC { inner, compressed }, ) => { let enc = Self::SseC { client_key, @@ -170,13 +168,13 @@ impl EncryptionParams { None }, }; - let plaintext = enc.decrypt_blob(&headers)?; - let headers = ObjectVersionHeaders::decode(&plaintext) - .ok_or_internal_error("Could not decode encrypted headers")?; - Ok((enc, Cow::Owned(headers))) + let plaintext = enc.decrypt_blob(&inner)?; + let inner = ObjectVersionMetaInner::decode(&plaintext) + .ok_or_internal_error("Could not decode encrypted metadata")?; + Ok((enc, Cow::Owned(inner))) } - (None, ObjectVersionEncryption::Plaintext { headers }) => { - Ok((Self::Plaintext, Cow::Borrowed(headers))) + (None, ObjectVersionEncryption::Plaintext { inner }) => { + Ok((Self::Plaintext, Cow::Borrowed(inner))) } (_, ObjectVersionEncryption::SseC { .. }) => { Err(Error::bad_request("Object is encrypted")) @@ -188,29 +186,31 @@ impl EncryptionParams { } } - pub fn encrypt_headers( + pub fn encrypt_meta( &self, - h: ObjectVersionHeaders, + meta: ObjectVersionMetaInner, ) -> Result<ObjectVersionEncryption, Error> { match self { Self::SseC { compression_level, .. } => { - let plaintext = h.encode().map_err(GarageError::from)?; + let plaintext = meta.encode().map_err(GarageError::from)?; let ciphertext = self.encrypt_blob(&plaintext)?; Ok(ObjectVersionEncryption::SseC { - headers: ciphertext.into_owned(), + inner: ciphertext.into_owned(), compressed: compression_level.is_some(), }) } - Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }), + Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { inner: meta }), } } // ---- generating object Etag values ---- - pub fn etag_from_md5(&self, md5sum: &[u8]) -> String { + pub fn etag_from_md5(&self, md5sum: &Option<Md5Checksum>) -> String { match self { - Self::Plaintext => hex::encode(md5sum), + Self::Plaintext => md5sum + .map(|x| hex::encode(&x[..])) + .expect("md5 digest should have been computed"), Self::SseC { .. } => { // AWS specifies that for encrypted objects, the Etag is not // the md5sum of the data, but doesn't say what it is. @@ -224,7 +224,7 @@ impl EncryptionParams { // ---- generic function for encrypting / decrypting blobs ---- // Prepends a randomly-generated nonce to the encrypted value. - // This is used for encrypting object headers and inlined data for small objects. + // This is used for encrypting object metadata and inlined data for small objects. // This does not compress anything. pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> { diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 5cb5d04e..2855e0b3 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -69,6 +69,10 @@ pub enum Error { #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)] InvalidEncryptionAlgorithm(String), + /// The client sent invalid XML data + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), + /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), @@ -129,6 +133,7 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", + Error::InvalidDigest(_) => "InvalidDigest", Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError", } @@ -148,6 +153,7 @@ impl ApiError for Error { | Error::InvalidPart | Error::InvalidPartOrder | Error::EntityTooSmall + | Error::InvalidDigest(_) | Error::InvalidEncryptionAlgorithm(_) | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ec300ab7..f5d3cf11 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -27,6 +27,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; @@ -45,8 +46,9 @@ pub struct GetObjectOverrides { fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - headers: &ObjectVersionHeaders, + meta_inner: &ObjectVersionMetaInner, encryption: EncryptionParams, + checksum_mode: ChecksumMode, ) -> http::response::Builder { debug!("Version meta: {:?}", version_meta); @@ -65,7 +67,7 @@ fn object_headers( // have the same name (ignoring case) into a comma-delimited list. // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html let mut headers_by_name = BTreeMap::new(); - for (name, value) in headers.0.iter() { + for (name, value) in meta_inner.headers.iter() { match headers_by_name.get_mut(name) { None => { headers_by_name.insert(name, vec![value.as_str()]); @@ -80,6 +82,10 @@ fn object_headers( resp = resp.header(name, values.join(",")); } + if checksum_mode.enabled { + resp = add_checksum_response_headers(&meta_inner.checksum, resp); + } + encryption.add_response_headers(&mut resp); resp @@ -199,6 +205,8 @@ pub async fn handle_head_without_ctx( let (encryption, headers) = EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?; + let checksum_mode = checksum_mode(&req); + if let Some(pn) = part_number { match version_data { ObjectVersionData::Inline(_, _) => { @@ -206,17 +214,21 @@ pub async fn handle_head_without_ctx( return Err(Error::InvalidPart); } let bytes_len = version_meta.size; - Ok( - object_headers(object_version, version_meta, &headers, encryption) - .header(CONTENT_LENGTH, format!("{}", bytes_len)) - .header( - CONTENT_RANGE, - format!("bytes 0-{}/{}", bytes_len - 1, bytes_len), - ) - .header(X_AMZ_MP_PARTS_COUNT, "1") - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?, + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", bytes_len)) + .header( + CONTENT_RANGE, + format!("bytes 0-{}/{}", bytes_len - 1, bytes_len), ) + .header(X_AMZ_MP_PARTS_COUNT, "1") + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -228,32 +240,40 @@ pub async fn handle_head_without_ctx( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - Ok( - object_headers(object_version, version_meta, &headers, encryption) - .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) - .header( - CONTENT_RANGE, - format!( - "bytes {}-{}/{}", - part_offset, - part_end - 1, - version_meta.size - ), - ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?, + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, ) + .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) + .header( + CONTENT_RANGE, + format!( + "bytes {}-{}/{}", + part_offset, + part_end - 1, + version_meta.size + ), + ) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?) } _ => unreachable!(), } } else { - Ok( - object_headers(object_version, version_meta, &headers, encryption) - .header(CONTENT_LENGTH, format!("{}", version_meta.size)) - .status(StatusCode::OK) - .body(empty_body())?, + Ok(object_headers( + object_version, + version_meta, + &headers, + encryption, + checksum_mode, ) + .header(CONTENT_LENGTH, format!("{}", version_meta.size)) + .status(StatusCode::OK) + .body(empty_body())?) } } @@ -307,12 +327,24 @@ pub async fn handle_get_without_ctx( let (enc, headers) = EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?; + let checksum_mode = checksum_mode(&req); + match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), (Some(pn), None) => { - handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await + handle_get_part( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + pn, + checksum_mode, + ) + .await } (None, Some(range)) => { handle_get_range( @@ -324,6 +356,7 @@ pub async fn handle_get_without_ctx( &headers, range.start, range.start + range.length, + checksum_mode, ) .await } @@ -336,6 +369,7 @@ pub async fn handle_get_without_ctx( enc, &headers, overrides, + checksum_mode, ) .await } @@ -348,12 +382,19 @@ async fn handle_get_full( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, encryption: EncryptionParams, - headers: &ObjectVersionHeaders, + meta_inner: &ObjectVersionMetaInner, overrides: GetObjectOverrides, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { - let mut resp_builder = object_headers(version, version_meta, &headers, encryption) - .header(CONTENT_LENGTH, format!("{}", version_meta.size)) - .status(StatusCode::OK); + let mut resp_builder = object_headers( + version, + version_meta, + &meta_inner, + encryption, + checksum_mode, + ) + .header(CONTENT_LENGTH, format!("{}", version_meta.size)) + .status(StatusCode::OK); getobject_override_headers(overrides, &mut resp_builder)?; let stream = full_object_byte_stream(garage, version, version_data, encryption); @@ -432,14 +473,15 @@ async fn handle_get_range( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, encryption: EncryptionParams, - headers: &ObjectVersionHeaders, + meta_inner: &ObjectVersionMetaInner, begin: u64, end: u64, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { // Here we do not use getobject_override_headers because we don't // want to add any overridden headers (those should not be added // when returning PARTIAL_CONTENT) - let resp_builder = object_headers(version, version_meta, headers, encryption) + let resp_builder = object_headers(version, version_meta, meta_inner, encryption, checksum_mode) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( CONTENT_RANGE, @@ -480,12 +522,19 @@ async fn handle_get_part( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, encryption: EncryptionParams, - headers: &ObjectVersionHeaders, + meta_inner: &ObjectVersionMetaInner, part_number: u64, + checksum_mode: ChecksumMode, ) -> Result<Response<ResBody>, Error> { // Same as for get_range, no getobject_override_headers - let resp_builder = object_headers(object_version, version_meta, headers, encryption) - .status(StatusCode::PARTIAL_CONTENT); + let resp_builder = object_headers( + object_version, + version_meta, + meta_inner, + encryption, + checksum_mode, + ) + .status(StatusCode::PARTIAL_CONTENT); match version_data { ObjectVersionData::Inline(_, bytes) => { @@ -567,6 +616,20 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { None } +struct ChecksumMode { + enabled: bool, +} + +fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { + ChecksumMode { + enabled: req + .headers() + .get(X_AMZ_CHECKSUM_MODE) + .map(|x| x == "ENABLED") + .unwrap_or(false), + } +} + fn body_from_blocks_range( garage: Arc<Garage>, encryption: EncryptionParams, diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 1678f1fa..648bace2 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use base64::prelude::*; -use hyper::Response; +use hyper::{Request, Response}; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -15,7 +15,8 @@ use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::*; -use crate::s3::api_server::ResBody; +use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -271,13 +272,21 @@ pub async fn handle_list_multipart_upload( pub async fn handle_list_parts( ctx: ReqCtx, + req: Request<ReqBody>, query: &ListPartsQuery, ) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; + let (_, object_version, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; + + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, + _ => unreachable!(), + }; + let encryption_res = + EncryptionParams::check_decrypt(&ctx.garage, req.headers(), &object_encryption); let (info, next) = fetch_part_info(query, &mpu)?; @@ -296,11 +305,40 @@ pub async fn handle_list_parts( is_truncated: s3_xml::Value(format!("{}", next.is_some())), parts: info .iter() - .map(|part| s3_xml::PartItem { - etag: s3_xml::Value(format!("\"{}\"", part.etag)), - last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), - part_number: s3_xml::IntValue(part.part_number as i64), - size: s3_xml::IntValue(part.size as i64), + .map(|part| { + // hide checksum if object is encrypted and the decryption + // keys are not provided + let checksum = part.checksum.filter(|_| encryption_res.is_ok()); + s3_xml::PartItem { + etag: s3_xml::Value(format!("\"{}\"", part.etag)), + last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), + part_number: s3_xml::IntValue(part.part_number as i64), + size: s3_xml::IntValue(part.size as i64), + checksum_crc32: match &checksum { + Some(ChecksumValue::Crc32(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_crc32c: match &checksum { + Some(ChecksumValue::Crc32c(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_sha1: match &checksum { + Some(ChecksumValue::Sha1(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + checksum_sha256: match &checksum { + Some(ChecksumValue::Sha256(x)) => { + Some(s3_xml::Value(BASE64_STANDARD.encode(&x))) + } + _ => None, + }, + } }) .collect(), @@ -346,6 +384,7 @@ struct PartInfo<'a> { timestamp: u64, part_number: u64, size: u64, + checksum: Option<ChecksumValue>, } enum ExtractionResult { @@ -486,6 +525,7 @@ fn fetch_part_info<'a>( timestamp: pk.timestamp, etag, size, + checksum: p.checksum, }; match parts.last_mut() { Some(lastpart) if lastpart.part_number == pk.part_number => { @@ -945,8 +985,12 @@ mod tests { state: ObjectVersionState::Uploading { multipart: true, encryption: ObjectVersionEncryption::Plaintext { - headers: ObjectVersionHeaders(vec![]), + inner: ObjectVersionMetaInner { + headers: vec![], + checksum: None, + }, }, + checksum_algorithm: None, }, } } @@ -1135,6 +1179,7 @@ mod tests { version: uuid, size: Some(3), etag: Some("etag1".into()), + checksum: None, }, ), ( @@ -1146,6 +1191,7 @@ mod tests { version: uuid, size: None, etag: None, + checksum: None, }, ), ( @@ -1157,6 +1203,7 @@ mod tests { version: uuid, size: Some(10), etag: Some("etag2".into()), + checksum: None, }, ), ( @@ -1168,6 +1215,7 @@ mod tests { version: uuid, size: Some(7), etag: Some("etag3".into()), + checksum: None, }, ), ( @@ -1179,6 +1227,7 @@ mod tests { version: uuid, size: Some(5), etag: Some("etag4".into()), + checksum: None, }, ), ]; @@ -1217,12 +1266,14 @@ mod tests { etag: "etag1", timestamp: TS, part_number: 1, - size: 3 + size: 3, + checksum: None, }, PartInfo { etag: "etag2", timestamp: TS, part_number: 3, + checksum: None, size: 10 }, ] @@ -1238,12 +1289,14 @@ mod tests { PartInfo { etag: "etag3", timestamp: TS, + checksum: None, part_number: 5, size: 7 }, PartInfo { etag: "etag4", timestamp: TS, + checksum: None, part_number: 8, size: 5 }, @@ -1267,24 +1320,28 @@ mod tests { PartInfo { etag: "etag1", timestamp: TS, + checksum: None, part_number: 1, size: 3 }, PartInfo { etag: "etag2", timestamp: TS, + checksum: None, part_number: 3, size: 10 }, PartInfo { etag: "etag3", timestamp: TS, + checksum: None, part_number: 5, size: 7 }, PartInfo { etag: "etag4", timestamp: TS, + checksum: None, part_number: 8, size: 5 }, diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index 1eb95d40..b9bb1a6f 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -13,6 +13,7 @@ mod post_object; mod put; mod website; +mod checksum; mod encryption; mod router; pub mod xml; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index fcc5769f..3db3e8aa 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::{Request, Response}; -use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; @@ -16,6 +17,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; @@ -41,10 +43,16 @@ pub async fn handle_create_multipart_upload( let timestamp = next_timestamp(existing_object.as_ref()); let headers = get_headers(req.headers())?; + let meta = ObjectVersionMetaInner { + headers, + checksum: None, + }; // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?; - let object_encryption = encryption.encrypt_headers(headers)?; + let object_encryption = encryption.encrypt_meta(meta)?; + + let checksum_algorithm = request_checksum_algorithm(req.headers())?; // Create object in object table let object_version = ObjectVersion { @@ -53,6 +61,7 @@ pub async fn handle_create_multipart_upload( state: ObjectVersionState::Uploading { multipart: true, encryption: object_encryption, + checksum_algorithm, }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); @@ -90,9 +99,13 @@ pub async fn handle_put_part( let upload_id = decode_upload_id(upload_id)?; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists @@ -106,8 +119,12 @@ pub async fn handle_put_part( futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check encryption params - let object_encryption = match object_version.state { - ObjectVersionState::Uploading { encryption, .. } => encryption, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; let (encryption, _) = @@ -138,7 +155,9 @@ pub async fn handle_put_part( mpu_part_key, MpuPart { version: version_uuid, + // all these are filled in later, at the end of this function etag: None, + checksum: None, size: None, }, ); @@ -152,32 +171,31 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks( + let checksummer = + Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); + let (total_size, checksums, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, &mut chunker, + checksummer, ) .await?; // Verify that checksums map - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + checksums.verify(&expected_checksums)?; // Store part etag in version - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, etag: Some(etag.clone()), + checksum: checksums.extract(checksum_algorithm), size: Some(total_size), }, ); @@ -189,6 +207,7 @@ pub async fn handle_put_part( let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag)); encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); Ok(resp.body(empty_body())?) } @@ -236,10 +255,11 @@ pub async fn handle_complete_multipart_upload( bucket_name, .. } = &ctx; + let (req_head, req_body) = req.into_parts(); - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let expected_checksum = request_checksum_value(&req_head.headers)?; + + let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -263,8 +283,12 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let object_encryption = match object_version.state { - ObjectVersionState::Uploading { encryption, .. } => encryption, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; @@ -292,6 +316,13 @@ pub async fn handle_complete_multipart_upload( for req_part in body_list_of_parts.iter() { match have_parts.get(&req_part.part_number) { Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum { + if part.checksum != req_part.checksum { + return Err(Error::InvalidDigest(format!( + "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}", + req_part.part_number, req_part.checksum, part.checksum + ))); + } parts.push(*part) } _ => return Err(Error::InvalidPart), @@ -339,18 +370,23 @@ pub async fn handle_complete_multipart_upload( }); garage.block_ref_table.insert_many(block_refs).await?; - // Calculate etag of final object + // Calculate checksum and etag of final object // To understand how etags are calculated, read more here: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html // https://teppen.io/2018/06/23/aws_s3_etags/ - let mut etag_md5_hasher = Md5::new(); + let mut checksummer = MultipartChecksummer::init(checksum_algorithm); for part in parts.iter() { - etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?; + } + let (checksum_md5, checksum_extra) = checksummer.finalize(); + + if expected_checksum.is_some() && checksum_extra != expected_checksum { + return Err(Error::InvalidDigest( + "Failed to validate x-amz-checksum-*".into(), + )); } - let etag = format!( - "{}-{}", - hex::encode(etag_md5_hasher.finalize()), - parts.len() - ); + + let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len()); // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); @@ -363,6 +399,20 @@ pub async fn handle_complete_multipart_upload( return Err(e); } + // If there is a checksum algorithm, update metadata with checksum + let object_encryption = match checksum_algorithm { + None => object_encryption, + Some(_) => { + let (encryption, meta) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + let new_meta = ObjectVersionMetaInner { + headers: meta.into_owned().headers, + checksum: checksum_extra, + }; + encryption.encrypt_meta(new_meta)? + } + }; + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { @@ -383,10 +433,28 @@ pub async fn handle_complete_multipart_upload( bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), + checksum_crc32: match &checksum_extra { + Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_crc32c: match &checksum_extra { + Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha1: match &checksum_extra { + Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha256: match &checksum_extra { + Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let resp = Response::builder(); + let resp = add_checksum_response_headers(&expected_checksum, resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_abort_multipart_upload( @@ -455,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { struct CompleteMultipartUploadPart { etag: String, part_number: u64, + checksum: Option<ChecksumValue>, } fn parse_complete_multipart_upload_body( @@ -480,9 +549,41 @@ fn parse_complete_multipart_upload_body( .children() .find(|e| e.has_tag_name("PartNumber"))? .text()?; + let checksum = if let Some(crc32) = + item.children().find(|e| e.has_tag_name("ChecksumCRC32")) + { + Some(ChecksumValue::Crc32( + BASE64_STANDARD.decode(crc32.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C")) + { + Some(ChecksumValue::Crc32c( + BASE64_STANDARD.decode(crc32c.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) { + Some(ChecksumValue::Sha1( + BASE64_STANDARD.decode(sha1.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256")) + { + Some(ChecksumValue::Sha256( + BASE64_STANDARD.decode(sha256.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else { + None + }; parts.push(CompleteMultipartUploadPart { etag: etag.trim_matches('"').to_string(), part_number: part_number.parse().ok()?, + checksum, }); } else { return None; diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 7c4219a7..2c106b3b 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,13 +14,15 @@ use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use garage_model::s3::object_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::checksum::*; use crate::s3::cors::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; -use crate::s3::put::{get_headers, save_stream}; +use crate::s3::put::{get_headers, save_stream, ChecksumMode}; use crate::s3::xml as s3_xml; use crate::signature::payload::{verify_v4, Authorization}; @@ -98,10 +100,6 @@ pub async fn handle_post_object( .ok_or_bad_request("No policy was provided")? .to_str()?; let authorization = Authorization::parse_form(¶ms)?; - let content_md5 = params - .get("content-md5") - .map(HeaderValue::to_str) - .transpose()?; let key = if key.contains("${filename}") { // if no filename is provided, don't replace. This matches the behavior of AWS. @@ -226,6 +224,21 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; + let expected_checksums = ExpectedChecksums { + md5: params + .get("content-md5") + .map(HeaderValue::to_str) + .transpose()? + .map(str::to_string), + sha256: None, + extra: request_checksum_algorithm_value(¶ms)?, + }; + + let meta = ObjectVersionMetaInner { + headers, + checksum: expected_checksums.extra, + }; + let encryption = EncryptionParams::new_from_headers(&garage, ¶ms)?; let stream = file_field.map(|r| r.map_err(Into::into)); @@ -239,12 +252,11 @@ pub async fn handle_post_object( let res = save_stream( &ctx, - headers, + meta, encryption, StreamLimiter::new(stream, conditions.content_length), &key, - content_md5.map(str::to_string), - None, + ChecksumMode::Verify(&expected_checksums), ) .await?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 941e4122..1e3b1b44 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,12 +1,9 @@ use std::collections::HashMap; use std::sync::Arc; -use base64::prelude::*; use futures::prelude::*; use futures::stream::FuturesOrdered; use futures::try_join; -use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; -use sha2::Sha256; use tokio::sync::mpsc; @@ -22,7 +19,6 @@ use opentelemetry::{ use garage_net::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; -use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -36,16 +32,22 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; -pub struct SaveStreamResult { - pub version_uuid: Uuid, - pub version_timestamp: u64, +pub(crate) struct SaveStreamResult { + pub(crate) version_uuid: Uuid, + pub(crate) version_timestamp: u64, /// Etag WITHOUT THE QUOTES (just the hex value) - pub etag: String, + pub(crate) etag: String, +} + +pub(crate) enum ChecksumMode<'a> { + Verify(&'a ExpectedChecksums), + Calculate(Option<ChecksumAlgorithm>), } pub async fn handle_put( @@ -58,24 +60,32 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); - // Determine whether object should be encrypted, and if so the key - let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, + }; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let meta = ObjectVersionMetaInner { + headers, + checksum: expected_checksums.extra, }; + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let stream = body_stream(req.into_body()); let res = save_stream( &ctx, - headers, + meta, encryption, stream, key, - content_md5, - content_sha256, + ChecksumMode::Verify(&expected_checksums), ) .await?; @@ -83,17 +93,17 @@ pub async fn handle_put( .header("x-amz-version-id", hex::encode(res.version_uuid)) .header("ETag", format!("\"{}\"", res.etag)); encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); Ok(resp.body(empty_body())?) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, - headers: ObjectVersionHeaders, + mut meta: ObjectVersionMetaInner, encryption: EncryptionParams, body: S, key: &String, - content_md5: Option<String>, - content_sha256: Option<FixedBytes32>, + checksum_mode: ChecksumMode<'_>, ) -> Result<SaveStreamResult, Error> { let ReqCtx { garage, bucket_id, .. @@ -107,32 +117,36 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let first_block = first_block_opt.unwrap_or_default(); - let object_encryption = encryption.encrypt_headers(headers)?; - // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); + let mut checksummer = match checksum_mode { + ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), + ChecksumMode::Calculate(algo) => { + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + } + }; + // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { - let mut md5sum = Md5::new(); - md5sum.update(&first_block[..]); - let data_md5sum = md5sum.finalize(); - - let data_sha256sum = sha256sum(&first_block[..]); + checksummer.update(&first_block); + let checksums = checksummer.finalize(); - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; let size = first_block.len() as u64; check_quotas(ctx, size, existing_object.as_ref()).await?; - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); let inline_data = encryption.encrypt_blob(&first_block)?.to_vec(); let object_version = ObjectVersion { @@ -140,7 +154,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - encryption: object_encryption, + encryption: encryption.encrypt_meta(meta)?, size, etag: etag.clone(), }, @@ -175,7 +189,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - encryption: object_encryption.clone(), + encryption: encryption.encrypt_meta(meta.clone())?, + checksum_algorithm: None, // don't care; overwritten later multipart: false, }, }; @@ -196,25 +211,37 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ); garage.version_table.insert(&version).await?; - // Transfer data and verify checksum - let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?; + // Transfer data + let (total_size, checksums, first_block_hash) = read_and_put_blocks( + ctx, + &version, + encryption, + 1, + first_block, + &mut chunker, + checksummer, + ) + .await?; - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + // Verify checksums are ok / add calculated checksum to metadata + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; + // Verify quotas are respsected check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - encryption: object_encryption, + encryption: encryption.encrypt_meta(meta)?, size: total_size, etag: etag.clone(), }, @@ -234,33 +261,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }) } -/// Validate MD5 sum against content-md5 header -/// and sha256sum against signed content-sha256 -pub(crate) fn ensure_checksum_matches( - data_md5sum: &[u8], - data_sha256sum: garage_util::data::FixedBytes32, - content_md5: Option<&str>, - content_sha256: Option<garage_util::data::FixedBytes32>, -) -> Result<(), Error> { - if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != data_sha256sum { - return Err(Error::bad_request( - "Unable to validate x-amz-content-sha256", - )); - } else { - trace!("Successfully validated x-amz-content-sha256"); - } - } - if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { - return Err(Error::bad_request("Unable to validate content-md5")); - } else { - trace!("Successfully validated content-md5"); - } - } - Ok(()) -} - /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( ctx: &ReqCtx, @@ -332,7 +332,8 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + part_number: u64, first_block: Bytes, chunker: &mut StreamChunker<S>, -) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> { + checksummer: Checksummer, +) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2); @@ -360,20 +361,20 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1); let hash_stream = async { - let md5hasher = AsyncHasher::<Md5>::new(); - let sha256hasher = AsyncHasher::<Sha256>::new(); + let mut checksummer = checksummer; while let Some(next) = block_rx.recv().await { match next { Ok(block) => { block_tx2.send(Ok(block.clone())).await?; - futures::future::join( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - ) + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&block); + checksummer + }) .with_context(Context::current_with_span( tracer.start("Hash block (md5, sha256)"), )) - .await; + .await + .unwrap() } Err(e) => { block_tx2.send(Err(e)).await?; @@ -382,10 +383,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } } drop(block_tx2); - Ok::<_, mpsc::error::SendError<_>>(futures::join!( - md5hasher.finalize(), - sha256hasher.finalize() - )) + Ok::<_, mpsc::error::SendError<_>>(checksummer) }; let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1); @@ -395,33 +393,28 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + match next { Ok(block) => { let unencrypted_len = block.len() as u64; - let block = if encryption.is_encrypted() { - let res = - tokio::task::spawn_blocking(move || encryption.encrypt_block(block)) - .with_context(Context::current_with_span( - tracer.start("Encrypt block"), - )) - .await - .unwrap(); - match res { - Ok(b) => b, - Err(e) => { - block_tx3.send(Err(e)).await?; - break; + let res = tokio::task::spawn_blocking(move || { + let block = encryption.encrypt_block(block)?; + let hash = blake2sum(&block); + Ok((block, hash)) + }) + .with_context(Context::current_with_span( + tracer.start("Encrypt and hash (blake2) block"), + )) + .await + .unwrap(); + match res { + Ok((block, hash)) => { + if first_block_hash.is_none() { + first_block_hash = Some(hash); } + block_tx3.send(Ok((block, unencrypted_len, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; } - } else { - block - }; - let hash = async_blake2sum(block.clone()) - .with_context(Context::current_with_span( - tracer.start("Hash block (blake2)"), - )) - .await; - if first_block_hash.is_none() { - first_block_hash = Some(hash); } - block_tx3.send(Ok((block, unencrypted_len, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -493,12 +486,10 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed // later in the pipeline which already caused a return at the ? on previous line - let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); let first_block_hash = block_hash_result.unwrap(); + let checksums = stream_hash_result.unwrap().finalize(); - let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - - Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) + Ok((total_size, checksums, first_block_hash)) } async fn put_block_and_meta( @@ -609,7 +600,7 @@ impl Drop for InterruptedCleanup { // ============ helpers ============ -pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> { +pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> { let mut ret = Vec::new(); // Preserve standard headers @@ -637,7 +628,7 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers } } - Ok(ObjectVersionHeaders(ret)) + Ok(ret) } pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 06f11288..1e569ade 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -131,6 +131,14 @@ pub struct CompleteMultipartUploadResult { pub key: Value, #[serde(rename = "ETag")] pub etag: Value, + #[serde(rename = "ChecksumCRC32")] + pub checksum_crc32: Option<Value>, + #[serde(rename = "ChecksumCRC32C")] + pub checksum_crc32c: Option<Value>, + #[serde(rename = "ChecksumSHA1")] + pub checksum_sha1: Option<Value>, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option<Value>, } #[derive(Debug, Serialize, PartialEq, Eq)] @@ -197,6 +205,14 @@ pub struct PartItem { pub part_number: IntValue, #[serde(rename = "Size")] pub size: IntValue, + #[serde(rename = "ChecksumCRC32")] + pub checksum_crc32: Option<Value>, + #[serde(rename = "ChecksumCRC32C")] + pub checksum_crc32c: Option<Value>, + #[serde(rename = "ChecksumSHA1")] + pub checksum_sha1: Option<Value>, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option<Value>, } #[derive(Debug, Serialize, PartialEq, Eq)] @@ -500,6 +516,10 @@ mod tests { bucket: Value("mybucket".to_string()), key: Value("a/plop".to_string()), etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()), + checksum_crc32: None, + checksum_crc32c: None, + checksum_sha1: Some(Value("ZJAnHyG8PeKz9tI8UTcHrJos39A=".into())), + checksum_sha256: None, }; assert_eq!( to_xml_with_header(&result)?, @@ -509,6 +529,7 @@ mod tests { <Bucket>mybucket</Bucket>\ <Key>a/plop</Key>\ <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>\ + <ChecksumSHA1>ZJAnHyG8PeKz9tI8UTcHrJos39A=</ChecksumSHA1>\ </CompleteMultipartUploadResult>" ); Ok(()) @@ -780,12 +801,22 @@ mod tests { last_modified: Value("2010-11-10T20:48:34.000Z".to_string()), part_number: IntValue(2), size: IntValue(10485760), + checksum_crc32: None, + checksum_crc32c: None, + checksum_sha256: Some(Value( + "5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=".into(), + )), + checksum_sha1: None, }, PartItem { etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()), last_modified: Value("2010-11-10T20:48:33.000Z".to_string()), part_number: IntValue(3), size: IntValue(10485760), + checksum_sha256: None, + checksum_crc32c: None, + checksum_crc32: Some(Value("ZJAnHyG8=".into())), + checksum_sha1: None, }, ], initiator: Initiator { @@ -820,12 +851,14 @@ mod tests { <LastModified>2010-11-10T20:48:34.000Z</LastModified>\ <PartNumber>2</PartNumber>\ <Size>10485760</Size>\ + <ChecksumSHA256>5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=</ChecksumSHA256>\ </Part>\ <Part>\ <ETag>"aaaa18db4cc2f85cedef654fccc4a4x8"</ETag>\ <LastModified>2010-11-10T20:48:33.000Z</LastModified>\ <PartNumber>3</PartNumber>\ <Size>10485760</Size>\ + <ChecksumCRC32>ZJAnHyG8=</ChecksumCRC32>\ </Part>\ <Initiator>\ <DisplayName>umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx</DisplayName>\ diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 53449a1c..17da68f8 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -42,6 +42,7 @@ tracing.workspace = true tracing-subscriber.workspace = true rand.workspace = true async-trait.workspace = true +sha1.workspace = true sodiumoxide.workspace = true structopt.workspace = true git-version.workspace = true diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 51c9df74..cc424f59 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -1,6 +1,7 @@ use crate::common; use aws_sdk_s3::primitives::ByteStream; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; +use aws_sdk_s3::types::{ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart}; +use base64::prelude::*; const SZ_5MB: usize = 5 * 1024 * 1024; const SZ_10MB: usize = 10 * 1024 * 1024; @@ -190,6 +191,153 @@ async fn test_multipart_upload() { } #[tokio::test] +async fn test_multipart_with_checksum() { + let ctx = common::context(); + let bucket = ctx.create_bucket("testmpu-cksum"); + + let u1 = vec![0x11; SZ_5MB]; + let u2 = vec![0x22; SZ_5MB]; + let u3 = vec![0x33; SZ_5MB]; + + let ck1 = calculate_sha1(&u1); + let ck2 = calculate_sha1(&u2); + let ck3 = calculate_sha1(&u3); + + let up = ctx + .client + .create_multipart_upload() + .bucket(&bucket) + .checksum_algorithm(ChecksumAlgorithm::Sha1) + .key("a") + .send() + .await + .unwrap(); + assert!(up.upload_id.is_some()); + + let uid = up.upload_id.as_ref().unwrap(); + + let p1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .checksum_sha1(&ck1) + .body(ByteStream::from(u1.clone())) + .send() + .await + .unwrap(); + + // wrong checksum value should return an error + let err1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(2) + .checksum_sha1(&ck1) + .body(ByteStream::from(u2.clone())) + .send() + .await; + assert!(err1.is_err()); + + let p2 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(2) + .checksum_sha1(&ck2) + .body(ByteStream::from(u2)) + .send() + .await + .unwrap(); + + let p3 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(3) + .checksum_sha1(&ck3) + .body(ByteStream::from(u3.clone())) + .send() + .await + .unwrap(); + + { + let r = ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .unwrap(); + let parts = r.parts.unwrap(); + assert_eq!(parts.len(), 3); + assert!(parts[0].checksum_crc32.is_none()); + assert!(parts[0].checksum_crc32_c.is_none()); + assert!(parts[0].checksum_sha256.is_none()); + assert_eq!(parts[0].checksum_sha1.as_deref().unwrap(), ck1); + assert_eq!(parts[1].checksum_sha1.as_deref().unwrap(), ck2); + assert_eq!(parts[2].checksum_sha1.as_deref().unwrap(), ck3); + } + + let cmp = CompletedMultipartUpload::builder() + .parts( + CompletedPart::builder() + .part_number(1) + .checksum_sha1(&ck1) + .e_tag(p1.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(2) + .checksum_sha1(&ck2) + .e_tag(p2.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(3) + .checksum_sha1(&ck3) + .e_tag(p3.e_tag.unwrap()) + .build(), + ) + .build(); + + let expected_checksum = calculate_sha1( + &vec![ + BASE64_STANDARD.decode(&ck1).unwrap(), + BASE64_STANDARD.decode(&ck2).unwrap(), + BASE64_STANDARD.decode(&ck3).unwrap(), + ] + .concat(), + ); + + let res = ctx + .client + .complete_multipart_upload() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .checksum_sha1(expected_checksum.clone()) + .multipart_upload(cmp) + .send() + .await + .unwrap(); + + assert_eq!(res.checksum_sha1, Some(expected_checksum)); +} + +#[tokio::test] async fn test_uploadlistpart() { let ctx = common::context(); let bucket = ctx.create_bucket("uploadpart"); @@ -624,3 +772,11 @@ async fn test_uploadpartcopy() { assert_eq!(real_obj.len(), exp_obj.len()); assert_eq!(real_obj, exp_obj); } + +fn calculate_sha1(bytes: &[u8]) -> String { + use sha1::{Digest, Sha1}; + + let mut hasher = Sha1::new(); + hasher.update(bytes); + BASE64_STANDARD.encode(&hasher.finalize()[..]) +} diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs index 238cbf11..c9f79caf 100644 --- a/src/model/s3/mpu_table.rs +++ b/src/model/s3/mpu_table.rs @@ -17,6 +17,7 @@ pub const PARTS: &str = "parts"; pub const BYTES: &str = "bytes"; mod v09 { + use crate::s3::object_table::ChecksumValue; use garage_util::crdt; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; @@ -61,6 +62,9 @@ mod v09 { pub version: Uuid, /// ETag of the content of this part (known only once done uploading) pub etag: Option<String>, + /// Checksum requested by x-amz-checksum-algorithm + #[serde(default)] + pub checksum: Option<ChecksumValue>, /// Size of this part (known only once done uploading) pub size: Option<u64>, } @@ -155,6 +159,11 @@ impl Crdt for MpuPart { (Some(x), Some(y)) if x < y => other.size, (x, _) => x, }; + self.checksum = match (self.checksum.take(), &other.checksum) { + (None, Some(_)) => other.checksum.clone(), + (Some(x), Some(y)) if x < *y => other.checksum.clone(), + (x, _) => x, + }; } } diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index eedb9615..b2f25803 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -208,6 +208,8 @@ mod v010 { Uploading { /// Indicates whether this is a multipart upload multipart: bool, + /// Checksum algorithm to use + checksum_algorithm: Option<ChecksumAlgorithm>, /// Encryption params + headers to be included in the final object encryption: ObjectVersionEncryption, }, @@ -247,10 +249,10 @@ mod v010 { #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionEncryption { SseC { - /// Encrypted serialized ObjectVersionHeaders struct. + /// Encrypted serialized ObjectVersionInner struct. /// This is never compressed, just encrypted using AES256-GCM. #[serde(with = "serde_bytes")] - headers: Vec<u8>, + inner: Vec<u8>, /// Whether data blocks are compressed in addition to being encrypted /// (compression happens before encryption, whereas for non-encrypted /// objects, compression is handled at the level of the block manager) @@ -258,13 +260,35 @@ mod v010 { }, Plaintext { /// Plain-text headers - headers: ObjectVersionHeaders, + inner: ObjectVersionMetaInner, }, } /// Vector of headers, as tuples of the format (header name, header value) #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] - pub struct ObjectVersionHeaders(pub Vec<(String, String)>); + pub struct ObjectVersionMetaInner { + pub headers: HeaderList, + pub checksum: Option<ChecksumValue>, + } + + pub type HeaderList = Vec<(String, String)>; + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ChecksumAlgorithm { + Crc32, + Crc32c, + Sha1, + Sha256, + } + + /// Checksum value for x-amz-checksum-algorithm + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ChecksumValue { + Crc32(#[serde(with = "serde_bytes")] [u8; 4]), + Crc32c(#[serde(with = "serde_bytes")] [u8; 4]), + Sha1(#[serde(with = "serde_bytes")] [u8; 20]), + Sha256(#[serde(with = "serde_bytes")] [u8; 32]), + } impl garage_util::migrate::Migrate for Object { const VERSION_MARKER: &'static [u8] = b"G010s3ob"; @@ -288,6 +312,7 @@ mod v010 { v09::ObjectVersionState::Uploading { multipart, headers } => { ObjectVersionState::Uploading { multipart, + checksum_algorithm: None, encryption: migrate_headers(headers), } } @@ -331,15 +356,18 @@ mod v010 { } ObjectVersionEncryption::Plaintext { - headers: ObjectVersionHeaders(new_headers), + inner: ObjectVersionMetaInner { + headers: new_headers, + checksum: None, + }, } } // Since ObjectVersionHeaders can now be serialized independently, for the // purpose of being encrypted, we need it to support migrations on its own // as well. - impl garage_util::migrate::InitialFormat for ObjectVersionHeaders { - const VERSION_MARKER: &'static [u8] = b"G010s3oh"; + impl garage_util::migrate::InitialFormat for ObjectVersionMetaInner { + const VERSION_MARKER: &'static [u8] = b"G010s3om"; } } @@ -454,6 +482,17 @@ impl Entry<Uuid, String> for Object { } } +impl ChecksumValue { + pub fn algorithm(&self) -> ChecksumAlgorithm { + match self { + ChecksumValue::Crc32(_) => ChecksumAlgorithm::Crc32, + ChecksumValue::Crc32c(_) => ChecksumAlgorithm::Crc32c, + ChecksumValue::Sha1(_) => ChecksumAlgorithm::Sha1, + ChecksumValue::Sha256(_) => ChecksumAlgorithm::Sha256, + } + } +} + impl Crdt for Object { fn merge(&mut self, other: &Self) { // Merge versions from other into here diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs deleted file mode 100644 index 5631ea6b..00000000 --- a/src/util/async_hash.rs +++ /dev/null @@ -1,61 +0,0 @@ -use bytes::Bytes; -use digest::Digest; - -use tokio::sync::mpsc; -use tokio::task::JoinHandle; - -use crate::data::*; - -/// Compute the sha256 of a slice, -/// spawning on a tokio thread for CPU-intensive processing -/// The argument has to be an owned Bytes, as it is moved out to a new thread. -pub async fn async_sha256sum(data: Bytes) -> Hash { - tokio::task::spawn_blocking(move || sha256sum(&data)) - .await - .unwrap() -} - -/// Compute the blake2sum of a slice, -/// spawning on a tokio thread for CPU-intensive processing. -/// The argument has to be an owned Bytes, as it is moved out to a new thread. -pub async fn async_blake2sum(data: Bytes) -> Hash { - tokio::task::spawn_blocking(move || blake2sum(&data)) - .await - .unwrap() -} - -// ---- - -pub struct AsyncHasher<D: Digest> { - sendblk: mpsc::Sender<Bytes>, - task: JoinHandle<digest::Output<D>>, -} - -impl<D: Digest> AsyncHasher<D> { - pub fn new() -> Self { - let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1); - let task = tokio::task::spawn_blocking(move || { - let mut digest = D::new(); - while let Some(blk) = recvblk.blocking_recv() { - digest.update(&blk[..]); - } - digest.finalize() - }); - Self { sendblk, task } - } - - pub async fn update(&self, b: Bytes) { - self.sendblk.send(b).await.unwrap(); - } - - pub async fn finalize(self) -> digest::Output<D> { - drop(self.sendblk); - self.task.await.unwrap() - } -} - -impl<D: Digest> Default for AsyncHasher<D> { - fn default() -> Self { - Self::new() - } -} diff --git a/src/util/lib.rs b/src/util/lib.rs index 7df77959..8b035ff0 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,7 +3,6 @@ #[macro_use] extern crate tracing; -pub mod async_hash; pub mod background; pub mod config; pub mod crdt; |