aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs56
1 files changed, 39 insertions, 17 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 1d5aeb26..fcc5769f 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -16,6 +16,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -41,13 +42,17 @@ pub async fn handle_create_multipart_upload(
let headers = get_headers(req.headers())?;
+ // Determine whether object should be encrypted, and if so the key
+ let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
+ let object_encryption = encryption.encrypt_headers(headers)?;
+
// Create object in object table
let object_version = ObjectVersion {
uuid: upload_id,
timestamp,
state: ObjectVersionState::Uploading {
multipart: true,
- headers,
+ encryption: object_encryption,
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@@ -68,7 +73,9 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let mut resp = Response::builder();
+ encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_put_part(
@@ -91,12 +98,21 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let stream = body_stream(req.into_body());
+ let (req_head, req_body) = req.into_parts();
+ let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
- let ((_, _, mut mpu), first_block) =
+ let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
+ // Check encryption params
+ let object_encryption = match object_version.state {
+ ObjectVersionState::Uploading { encryption, .. } => encryption,
+ _ => unreachable!(),
+ };
+ let (encryption, _) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -136,24 +152,32 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
+ let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
+ &ctx,
+ &version,
+ encryption,
+ part_number,
+ first_block,
+ &mut chunker,
+ )
+ .await?;
// Verify that checksums map
ensure_checksum_matches(
- data_md5sum.as_slice(),
+ &data_md5sum,
data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
// Store part etag in version
- let data_md5sum_hex = hex::encode(data_md5sum);
+ let etag = encryption.etag_from_md5(&data_md5sum);
+
mpu.parts.put(
mpu_part_key,
MpuPart {
version: version_uuid,
- etag: Some(data_md5sum_hex.clone()),
+ etag: Some(etag.clone()),
size: Some(total_size),
},
);
@@ -163,11 +187,9 @@ pub async fn handle_put_part(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
- let response = Response::builder()
- .header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(empty_body())
- .unwrap();
- Ok(response)
+ let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
+ encryption.add_response_headers(&mut resp);
+ Ok(resp.body(empty_body())?)
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
@@ -241,8 +263,8 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded"));
}
- let headers = match object_version.state {
- ObjectVersionState::Uploading { headers, .. } => headers,
+ let object_encryption = match object_version.state {
+ ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
@@ -344,7 +366,7 @@ pub async fn handle_complete_multipart_upload(
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
- headers,
+ encryption: object_encryption,
size: total_size,
etag: etag.clone(),
},