diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 199 |
1 files changed, 161 insertions, 38 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 1d5aeb26..3db3e8aa 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::{Request, Response}; -use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; @@ -16,6 +17,8 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -40,6 +43,16 @@ pub async fn handle_create_multipart_upload( let timestamp = next_timestamp(existing_object.as_ref()); let headers = get_headers(req.headers())?; + let meta = ObjectVersionMetaInner { + headers, + checksum: None, + }; + + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?; + let object_encryption = encryption.encrypt_meta(meta)?; + + let checksum_algorithm = request_checksum_algorithm(req.headers())?; // Create object in object table let object_version = ObjectVersion { @@ -47,7 +60,8 @@ pub async fn handle_create_multipart_upload( timestamp, state: ObjectVersionState::Uploading { multipart: true, - headers, + encryption: object_encryption, + checksum_algorithm, }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); @@ -68,7 +82,9 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let mut resp = Response::builder(); + encryption.add_response_headers(&mut resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_put_part( @@ -83,20 +99,37 @@ pub async fn handle_put_part( let upload_id = decode_upload_id(upload_id)?; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let stream = body_stream(req.into_body()); + let (req_head, req_body) = req.into_parts(); + let stream = body_stream(req_body); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = + let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; + // Check encryption params + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), + _ => unreachable!(), + }; + let (encryption, _) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -122,7 +155,9 @@ pub async fn handle_put_part( mpu_part_key, MpuPart { version: version_uuid, + // all these are filled in later, at the end of this function etag: None, + checksum: None, size: None, }, ); @@ -136,24 +171,31 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; + let checksummer = + Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); + let (total_size, checksums, _) = read_and_put_blocks( + &ctx, + &version, + encryption, + part_number, + first_block, + &mut chunker, + checksummer, + ) + .await?; // Verify that checksums map - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + checksums.verify(&expected_checksums)?; // Store part etag in version - let data_md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); + mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, - etag: Some(data_md5sum_hex.clone()), + etag: Some(etag.clone()), + checksum: checksums.extract(checksum_algorithm), size: Some(total_size), }, ); @@ -163,11 +205,10 @@ pub async fn handle_put_part( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - let response = Response::builder() - .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(empty_body()) - .unwrap(); - Ok(response) + let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag)); + encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); + Ok(resp.body(empty_body())?) } struct InterruptedCleanup(Option<InterruptedCleanupInner>); @@ -214,10 +255,11 @@ pub async fn handle_complete_multipart_upload( bucket_name, .. } = &ctx; + let (req_head, req_body) = req.into_parts(); + + let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -241,8 +283,12 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let headers = match object_version.state { - ObjectVersionState::Uploading { headers, .. } => headers, + let (object_encryption, checksum_algorithm) = match object_version.state { + ObjectVersionState::Uploading { + encryption, + checksum_algorithm, + .. + } => (encryption, checksum_algorithm), _ => unreachable!(), }; @@ -270,6 +316,13 @@ pub async fn handle_complete_multipart_upload( for req_part in body_list_of_parts.iter() { match have_parts.get(&req_part.part_number) { Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum { + if part.checksum != req_part.checksum { + return Err(Error::InvalidDigest(format!( + "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}", + req_part.part_number, req_part.checksum, part.checksum + ))); + } parts.push(*part) } _ => return Err(Error::InvalidPart), @@ -317,18 +370,23 @@ pub async fn handle_complete_multipart_upload( }); garage.block_ref_table.insert_many(block_refs).await?; - // Calculate etag of final object + // Calculate checksum and etag of final object // To understand how etags are calculated, read more here: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html // https://teppen.io/2018/06/23/aws_s3_etags/ - let mut etag_md5_hasher = Md5::new(); + let mut checksummer = MultipartChecksummer::init(checksum_algorithm); for part in parts.iter() { - etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?; } - let etag = format!( - "{}-{}", - hex::encode(etag_md5_hasher.finalize()), - parts.len() - ); + let (checksum_md5, checksum_extra) = checksummer.finalize(); + + if expected_checksum.is_some() && checksum_extra != expected_checksum { + return Err(Error::InvalidDigest( + "Failed to validate x-amz-checksum-*".into(), + )); + } + + let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len()); // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); @@ -341,10 +399,24 @@ pub async fn handle_complete_multipart_upload( return Err(e); } + // If there is a checksum algorithm, update metadata with checksum + let object_encryption = match checksum_algorithm { + None => object_encryption, + Some(_) => { + let (encryption, meta) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + let new_meta = ObjectVersionMetaInner { + headers: meta.into_owned().headers, + checksum: checksum_extra, + }; + encryption.encrypt_meta(new_meta)? + } + }; + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, etag: etag.clone(), }, @@ -361,10 +433,28 @@ pub async fn handle_complete_multipart_upload( bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), + checksum_crc32: match &checksum_extra { + Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_crc32c: match &checksum_extra { + Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha1: match &checksum_extra { + Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, + checksum_sha256: match &checksum_extra { + Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))), + _ => None, + }, }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let resp = Response::builder(); + let resp = add_checksum_response_headers(&expected_checksum, resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_abort_multipart_upload( @@ -433,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { struct CompleteMultipartUploadPart { etag: String, part_number: u64, + checksum: Option<ChecksumValue>, } fn parse_complete_multipart_upload_body( @@ -458,9 +549,41 @@ fn parse_complete_multipart_upload_body( .children() .find(|e| e.has_tag_name("PartNumber"))? .text()?; + let checksum = if let Some(crc32) = + item.children().find(|e| e.has_tag_name("ChecksumCRC32")) + { + Some(ChecksumValue::Crc32( + BASE64_STANDARD.decode(crc32.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C")) + { + Some(ChecksumValue::Crc32c( + BASE64_STANDARD.decode(crc32c.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) { + Some(ChecksumValue::Sha1( + BASE64_STANDARD.decode(sha1.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256")) + { + Some(ChecksumValue::Sha256( + BASE64_STANDARD.decode(sha256.text()?).ok()?[..] + .try_into() + .ok()?, + )) + } else { + None + }; parts.push(CompleteMultipartUploadPart { etag: etag.trim_matches('"').to_string(), part_number: part_number.parse().ok()?, + checksum, }); } else { return None; |