diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 56 |
1 files changed, 39 insertions, 17 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 1d5aeb26..fcc5769f 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -16,6 +16,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -41,13 +42,17 @@ pub async fn handle_create_multipart_upload( let headers = get_headers(req.headers())?; + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?; + let object_encryption = encryption.encrypt_headers(headers)?; + // Create object in object table let object_version = ObjectVersion { uuid: upload_id, timestamp, state: ObjectVersionState::Uploading { multipart: true, - headers, + encryption: object_encryption, }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); @@ -68,7 +73,9 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(string_body(xml))) + let mut resp = Response::builder(); + encryption.add_response_headers(&mut resp); + Ok(resp.body(string_body(xml))?) } pub async fn handle_put_part( @@ -91,12 +98,21 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let stream = body_stream(req.into_body()); + let (req_head, req_body) = req.into_parts(); + let stream = body_stream(req_body); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = + let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; + // Check encryption params + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, + _ => unreachable!(), + }; + let (encryption, _) = + EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?; + // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -136,24 +152,32 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; + let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks( + &ctx, + &version, + encryption, + part_number, + first_block, + &mut chunker, + ) + .await?; // Verify that checksums map ensure_checksum_matches( - data_md5sum.as_slice(), + &data_md5sum, data_sha256sum, content_md5.as_deref(), content_sha256, )?; // Store part etag in version - let data_md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&data_md5sum); + mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, - etag: Some(data_md5sum_hex.clone()), + etag: Some(etag.clone()), size: Some(total_size), }, ); @@ -163,11 +187,9 @@ pub async fn handle_put_part( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - let response = Response::builder() - .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(empty_body()) - .unwrap(); - Ok(response) + let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag)); + encryption.add_response_headers(&mut resp); + Ok(resp.body(empty_body())?) } struct InterruptedCleanup(Option<InterruptedCleanupInner>); @@ -241,8 +263,8 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let headers = match object_version.state { - ObjectVersionState::Uploading { headers, .. } => headers, + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, _ => unreachable!(), }; @@ -344,7 +366,7 @@ pub async fn handle_complete_multipart_upload( // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, etag: etag.clone(), }, |