diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 159 |
1 files changed, 130 insertions, 29 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index fcc5769f..3db3e8aa 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::{Request, Response}; -use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; @@ -16,6 +17,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; @@ -41,10 +43,16 @@ pub async fn handle_create_multipart_upload( let timestamp = next_timestamp(existing_object.as_ref()); let headers = get_headers(req.headers())?; + let meta = ObjectVersionMetaInner { + headers, + checksum: None, + }; // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?; - let object_encryption = encryption.encrypt_headers(headers)?; + let object_encryption = encryption.encrypt_meta(meta)?; + + let checksum_algorithm = request_checksum_algorithm(req.headers())?; // Create object in object table let object_version = ObjectVersion { @@ -53,6 +61,7 @@ pub async fn handle_create_multipart_upload( state: ObjectVersionState::Uploading { multipart: true, encryption: object_encryption, + checksum_algorithm, }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); @@ -90,9 +99,13 @@ pub async fn handle_put_part( let upload_id = decode_upload_id(upload_id)?; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists @@ -106,8 +119,12 @@ pub async fn handle_put_part( futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check encryption params - let object_encryption = match object_version.state { - ObjectVersionState::Uploading { encryption, .. } => encryption, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; let (encryption, _) = @@ -138,7 +155,9 @@ pub async fn handle_put_part( mpu_part_key, MpuPart { version: version_uuid, + // all these are filled in later, at the end of this function etag: None, + checksum: None, size: None, }, ); @@ -152,32 +171,31 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks( + let checksummer = + Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); + let (total_size, checksums, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, &mut chunker, + checksummer, ) .await?; // Verify that checksums map - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + checksums.verify(&expected_checksums)?; // Store part etag in version - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, etag: Some(etag.clone()), + checksum: checksums.extract(checksum_algorithm), size: Some(total_size), }, ); @@ -189,6 +207,7 @@ pub async fn handle_put_part( let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag)); encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); Ok(resp.body(empty_body())?) } @@ -236,10 +255,11 @@ pub async fn handle_complete_multipart_upload( bucket_name, .. } = &ctx; + let (req_head, req_body) = req.into_parts(); - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let expected_checksum = request_checksum_value(&req_head.headers)?; + + let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -263,8 +283,12 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let object_encryption = match object_version.state { - ObjectVersionState::Uploading { encryption, .. } => encryption, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; @@ -292,6 +316,13 @@ pub async fn handle_complete_multipart_upload( for req_part in body_list_of_parts.iter() { match have_parts.get(&req_part.part_number) { Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum { + if part.checksum != req_part.checksum { + return Err(Error::InvalidDigest(format!( + "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}", + req_part.part_number, req_part.checksum, part.checksum + ))); + } parts.push(*part) } _ => return Err(Error::InvalidPart), @@ -339,18 +370,23 @@ pub async fn handle_complete_multipart_upload( }); garage.block_ref_table.insert_many(block_refs).await?; - // Calculate etag of final object + // Calculate checksum and etag of final object // To understand how etags are calculated, read more here: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html // https://teppen.io/2018/06/23/aws_s3_etags/ - let mut etag_md5_hasher = Md5::new(); + let mut checksummer = MultipartChecksummer::init(checksum_algorithm); for part in parts.iter() { - etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?; + } + let (checksum_md5, checksum_extra) = checksummer.finalize(); + + if expected_checksum.is_some() && checksum_extra != expected_checksum { + return Err(Error::InvalidDigest( + "Failed to validate x-amz-checksum-*".into(), + )); } - let etag = format!( - "{}-{}", - hex::encode(etag_md5_hasher.finalize()), - parts.len() - ); + + let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len()); // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); @@ -363,6 +399,20 @@ pub async fn handle_complete_multipart_upload( return Err(e); } + // If there is a checksum algorithm, update metadata with checksum + let object_encryption = match checksum_algorithm { + None => object_encryption, + Some(_) => { + let (encryption, meta) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + let new_meta = ObjectVersionMetaInner { + headers: meta.into_owned().headers, + checksum: checksum_extra, + }; + encryption.encrypt_meta(new_meta)? + } + }; + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { @@ -383,10 +433,28 @@ pub async fn handle_complete_multipart_upload( bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), + checksum_crc32: match &checksum_extra { + Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_crc32c: match &checksum_extra { + Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha1: match &checksum_extra { + Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha256: match &checksum_extra { + Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let resp = Response::builder(); + let resp = add_checksum_response_headers(&expected_checksum, resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_abort_multipart_upload( @@ -455,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { struct CompleteMultipartUploadPart { etag: String, part_number: u64, + checksum: Option<ChecksumValue>, } fn parse_complete_multipart_upload_body( @@ -480,9 +549,41 @@ fn parse_complete_multipart_upload_body( .children() .find(|e| e.has_tag_name("PartNumber"))? .text()?; + let checksum = if let Some(crc32) = + item.children().find(|e| e.has_tag_name("ChecksumCRC32")) + { + Some(ChecksumValue::Crc32( + BASE64_STANDARD.decode(crc32.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C")) + { + Some(ChecksumValue::Crc32c( + BASE64_STANDARD.decode(crc32c.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) { + Some(ChecksumValue::Sha1( + BASE64_STANDARD.decode(sha1.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256")) + { + Some(ChecksumValue::Sha256( + BASE64_STANDARD.decode(sha256.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else { + None + }; parts.push(CompleteMultipartUploadPart { etag: etag.trim_matches('"').to_string(), part_number: part_number.parse().ok()?, + checksum, }); } else { return None; |