aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs199
1 files changed, 161 insertions, 38 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 1d5aeb26..3db3e8aa 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -1,9 +1,10 @@
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
+use base64::prelude::*;
use futures::prelude::*;
use hyper::{Request, Response};
-use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
@@ -16,6 +17,8 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -40,6 +43,16 @@ pub async fn handle_create_multipart_upload(
let timestamp = next_timestamp(existing_object.as_ref());
let headers = get_headers(req.headers())?;
+ let meta = ObjectVersionMetaInner {
+ headers,
+ checksum: None,
+ };
+
+ // Determine whether object should be encrypted, and if so the key
+ let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
+ let object_encryption = encryption.encrypt_meta(meta)?;
+
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@@ -47,7 +60,8 @@ pub async fn handle_create_multipart_upload(
timestamp,
state: ObjectVersionState::Uploading {
multipart: true,
- headers,
+ encryption: object_encryption,
+ checksum_algorithm,
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@@ -68,7 +82,9 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let mut resp = Response::builder();
+ encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_put_part(
@@ -83,20 +99,37 @@ pub async fn handle_put_part(
let upload_id = decode_upload_id(upload_id)?;
- let content_md5 = match req.headers().get("content-md5") {
- Some(x) => Some(x.to_str()?.to_string()),
- None => None,
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: content_sha256,
+ extra: request_checksum_value(req.headers())?,
};
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let stream = body_stream(req.into_body());
+ let (req_head, req_body) = req.into_parts();
+ let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
- let ((_, _, mut mpu), first_block) =
+ let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
+ // Check encryption params
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
+ _ => unreachable!(),
+ };
+ let (encryption, _) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -122,7 +155,9 @@ pub async fn handle_put_part(
mpu_part_key,
MpuPart {
version: version_uuid,
+ // all these are filled in later, at the end of this function
etag: None,
+ checksum: None,
size: None,
},
);
@@ -136,24 +171,31 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
+ let checksummer =
+ Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
+ let (total_size, checksums, _) = read_and_put_blocks(
+ &ctx,
+ &version,
+ encryption,
+ part_number,
+ first_block,
+ &mut chunker,
+ checksummer,
+ )
+ .await?;
// Verify that checksums map
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
+ checksums.verify(&expected_checksums)?;
// Store part etag in version
- let data_md5sum_hex = hex::encode(data_md5sum);
+ let etag = encryption.etag_from_md5(&checksums.md5);
+
mpu.parts.put(
mpu_part_key,
MpuPart {
version: version_uuid,
- etag: Some(data_md5sum_hex.clone()),
+ etag: Some(etag.clone()),
+ checksum: checksums.extract(checksum_algorithm),
size: Some(total_size),
},
);
@@ -163,11 +205,10 @@ pub async fn handle_put_part(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
- let response = Response::builder()
- .header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(empty_body())
- .unwrap();
- Ok(response)
+ let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
+ encryption.add_response_headers(&mut resp);
+ let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
+ Ok(resp.body(empty_body())?)
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
@@ -214,10 +255,11 @@ pub async fn handle_complete_multipart_upload(
bucket_name,
..
} = &ctx;
+ let (req_head, req_body) = req.into_parts();
+
+ let expected_checksum = request_checksum_value(&req_head.headers)?;
- let body = http_body_util::BodyExt::collect(req.into_body())
- .await?
- .to_bytes();
+ let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -241,8 +283,12 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded"));
}
- let headers = match object_version.state {
- ObjectVersionState::Uploading { headers, .. } => headers,
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
_ => unreachable!(),
};
@@ -270,6 +316,13 @@ pub async fn handle_complete_multipart_upload(
for req_part in body_list_of_parts.iter() {
match have_parts.get(&req_part.part_number) {
Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => {
+ // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum {
+ if part.checksum != req_part.checksum {
+ return Err(Error::InvalidDigest(format!(
+ "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}",
+ req_part.part_number, req_part.checksum, part.checksum
+ )));
+ }
parts.push(*part)
}
_ => return Err(Error::InvalidPart),
@@ -317,18 +370,23 @@ pub async fn handle_complete_multipart_upload(
});
garage.block_ref_table.insert_many(block_refs).await?;
- // Calculate etag of final object
+ // Calculate checksum and etag of final object
// To understand how etags are calculated, read more here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
// https://teppen.io/2018/06/23/aws_s3_etags/
- let mut etag_md5_hasher = Md5::new();
+ let mut checksummer = MultipartChecksummer::init(checksum_algorithm);
for part in parts.iter() {
- etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes());
+ checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?;
}
- let etag = format!(
- "{}-{}",
- hex::encode(etag_md5_hasher.finalize()),
- parts.len()
- );
+ let (checksum_md5, checksum_extra) = checksummer.finalize();
+
+ if expected_checksum.is_some() && checksum_extra != expected_checksum {
+ return Err(Error::InvalidDigest(
+ "Failed to validate x-amz-checksum-*".into(),
+ ));
+ }
+
+ let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len());
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
@@ -341,10 +399,24 @@ pub async fn handle_complete_multipart_upload(
return Err(e);
}
+ // If there is a checksum algorithm, update metadata with checksum
+ let object_encryption = match checksum_algorithm {
+ None => object_encryption,
+ Some(_) => {
+ let (encryption, meta) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+ let new_meta = ObjectVersionMetaInner {
+ headers: meta.into_owned().headers,
+ checksum: checksum_extra,
+ };
+ encryption.encrypt_meta(new_meta)?
+ }
+ };
+
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
- headers,
+ encryption: object_encryption,
size: total_size,
etag: etag.clone(),
},
@@ -361,10 +433,28 @@ pub async fn handle_complete_multipart_upload(
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(format!("\"{}\"", etag)),
+ checksum_crc32: match &checksum_extra {
+ Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_crc32c: match &checksum_extra {
+ Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha1: match &checksum_extra {
+ Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha256: match &checksum_extra {
+ Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let resp = Response::builder();
+ let resp = add_checksum_response_headers(&expected_checksum, resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_abort_multipart_upload(
@@ -433,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
struct CompleteMultipartUploadPart {
etag: String,
part_number: u64,
+ checksum: Option<ChecksumValue>,
}
fn parse_complete_multipart_upload_body(
@@ -458,9 +549,41 @@ fn parse_complete_multipart_upload_body(
.children()
.find(|e| e.has_tag_name("PartNumber"))?
.text()?;
+ let checksum = if let Some(crc32) =
+ item.children().find(|e| e.has_tag_name("ChecksumCRC32"))
+ {
+ Some(ChecksumValue::Crc32(
+ BASE64_STANDARD.decode(crc32.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C"))
+ {
+ Some(ChecksumValue::Crc32c(
+ BASE64_STANDARD.decode(crc32c.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) {
+ Some(ChecksumValue::Sha1(
+ BASE64_STANDARD.decode(sha1.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256"))
+ {
+ Some(ChecksumValue::Sha256(
+ BASE64_STANDARD.decode(sha256.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else {
+ None
+ };
parts.push(CompleteMultipartUploadPart {
etag: etag.trim_matches('"').to_string(),
part_number: part_number.parse().ok()?,
+ checksum,
});
} else {
return None;