aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-21 14:06:59 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-26 14:56:47 +0100
commit8fbc75059a018b288ae828c948285aedb31f8489 (patch)
treebfeafe807c4524d0d5ce63c56f3003f5827984f1 /src/api/s3/multipart.rs
parent7e0107c47db71e8da13990c9111ebde8cbf60d8f (diff)
downloadgarage-s3-checksum.tar.gz
garage-s3-checksum.zip
[s3-checksum] implement x-amz-checksum-* headerss3-checksum
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs159
1 files changed, 130 insertions, 29 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index fcc5769f..3db3e8aa 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -1,9 +1,10 @@
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
+use base64::prelude::*;
use futures::prelude::*;
use hyper::{Request, Response};
-use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
@@ -16,6 +17,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::*;
@@ -41,10 +43,16 @@ pub async fn handle_create_multipart_upload(
let timestamp = next_timestamp(existing_object.as_ref());
let headers = get_headers(req.headers())?;
+ let meta = ObjectVersionMetaInner {
+ headers,
+ checksum: None,
+ };
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
- let object_encryption = encryption.encrypt_headers(headers)?;
+ let object_encryption = encryption.encrypt_meta(meta)?;
+
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@@ -53,6 +61,7 @@ pub async fn handle_create_multipart_upload(
state: ObjectVersionState::Uploading {
multipart: true,
encryption: object_encryption,
+ checksum_algorithm,
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@@ -90,9 +99,13 @@ pub async fn handle_put_part(
let upload_id = decode_upload_id(upload_id)?;
- let content_md5 = match req.headers().get("content-md5") {
- Some(x) => Some(x.to_str()?.to_string()),
- None => None,
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: content_sha256,
+ extra: request_checksum_value(req.headers())?,
};
// Read first chuck, and at the same time try to get object to see if it exists
@@ -106,8 +119,12 @@ pub async fn handle_put_part(
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check encryption params
- let object_encryption = match object_version.state {
- ObjectVersionState::Uploading { encryption, .. } => encryption,
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
_ => unreachable!(),
};
let (encryption, _) =
@@ -138,7 +155,9 @@ pub async fn handle_put_part(
mpu_part_key,
MpuPart {
version: version_uuid,
+ // all these are filled in later, at the end of this function
etag: None,
+ checksum: None,
size: None,
},
);
@@ -152,32 +171,31 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
+ let checksummer =
+ Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
+ let (total_size, checksums, _) = read_and_put_blocks(
&ctx,
&version,
encryption,
part_number,
first_block,
&mut chunker,
+ checksummer,
)
.await?;
// Verify that checksums map
- ensure_checksum_matches(
- &data_md5sum,
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
+ checksums.verify(&expected_checksums)?;
// Store part etag in version
- let etag = encryption.etag_from_md5(&data_md5sum);
+ let etag = encryption.etag_from_md5(&checksums.md5);
mpu.parts.put(
mpu_part_key,
MpuPart {
version: version_uuid,
etag: Some(etag.clone()),
+ checksum: checksums.extract(checksum_algorithm),
size: Some(total_size),
},
);
@@ -189,6 +207,7 @@ pub async fn handle_put_part(
let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
encryption.add_response_headers(&mut resp);
+ let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
Ok(resp.body(empty_body())?)
}
@@ -236,10 +255,11 @@ pub async fn handle_complete_multipart_upload(
bucket_name,
..
} = &ctx;
+ let (req_head, req_body) = req.into_parts();
- let body = http_body_util::BodyExt::collect(req.into_body())
- .await?
- .to_bytes();
+ let expected_checksum = request_checksum_value(&req_head.headers)?;
+
+ let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -263,8 +283,12 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded"));
}
- let object_encryption = match object_version.state {
- ObjectVersionState::Uploading { encryption, .. } => encryption,
+ let (object_encryption, checksum_algorithm) = match object_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
_ => unreachable!(),
};
@@ -292,6 +316,13 @@ pub async fn handle_complete_multipart_upload(
for req_part in body_list_of_parts.iter() {
match have_parts.get(&req_part.part_number) {
Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => {
+ // alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum {
+ if part.checksum != req_part.checksum {
+ return Err(Error::InvalidDigest(format!(
+ "Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}",
+ req_part.part_number, req_part.checksum, part.checksum
+ )));
+ }
parts.push(*part)
}
_ => return Err(Error::InvalidPart),
@@ -339,18 +370,23 @@ pub async fn handle_complete_multipart_upload(
});
garage.block_ref_table.insert_many(block_refs).await?;
- // Calculate etag of final object
+ // Calculate checksum and etag of final object
// To understand how etags are calculated, read more here:
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
// https://teppen.io/2018/06/23/aws_s3_etags/
- let mut etag_md5_hasher = Md5::new();
+ let mut checksummer = MultipartChecksummer::init(checksum_algorithm);
for part in parts.iter() {
- etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes());
+ checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?;
+ }
+ let (checksum_md5, checksum_extra) = checksummer.finalize();
+
+ if expected_checksum.is_some() && checksum_extra != expected_checksum {
+ return Err(Error::InvalidDigest(
+ "Failed to validate x-amz-checksum-*".into(),
+ ));
}
- let etag = format!(
- "{}-{}",
- hex::encode(etag_md5_hasher.finalize()),
- parts.len()
- );
+
+ let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len());
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
@@ -363,6 +399,20 @@ pub async fn handle_complete_multipart_upload(
return Err(e);
}
+ // If there is a checksum algorithm, update metadata with checksum
+ let object_encryption = match checksum_algorithm {
+ None => object_encryption,
+ Some(_) => {
+ let (encryption, meta) =
+ EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
+ let new_meta = ObjectVersionMetaInner {
+ headers: meta.into_owned().headers,
+ checksum: checksum_extra,
+ };
+ encryption.encrypt_meta(new_meta)?
+ }
+ };
+
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
@@ -383,10 +433,28 @@ pub async fn handle_complete_multipart_upload(
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(format!("\"{}\"", etag)),
+ checksum_crc32: match &checksum_extra {
+ Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_crc32c: match &checksum_extra {
+ Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha1: match &checksum_extra {
+ Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
+ checksum_sha256: match &checksum_extra {
+ Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
+ _ => None,
+ },
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(string_body(xml)))
+ let resp = Response::builder();
+ let resp = add_checksum_response_headers(&expected_checksum, resp);
+ Ok(resp.body(string_body(xml))?)
}
pub async fn handle_abort_multipart_upload(
@@ -455,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
struct CompleteMultipartUploadPart {
etag: String,
part_number: u64,
+ checksum: Option<ChecksumValue>,
}
fn parse_complete_multipart_upload_body(
@@ -480,9 +549,41 @@ fn parse_complete_multipart_upload_body(
.children()
.find(|e| e.has_tag_name("PartNumber"))?
.text()?;
+ let checksum = if let Some(crc32) =
+ item.children().find(|e| e.has_tag_name("ChecksumCRC32"))
+ {
+ Some(ChecksumValue::Crc32(
+ BASE64_STANDARD.decode(crc32.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C"))
+ {
+ Some(ChecksumValue::Crc32c(
+ BASE64_STANDARD.decode(crc32c.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) {
+ Some(ChecksumValue::Sha1(
+ BASE64_STANDARD.decode(sha1.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256"))
+ {
+ Some(ChecksumValue::Sha256(
+ BASE64_STANDARD.decode(sha256.text()?).ok()?[..]
+ .try_into()
+ .ok()?,
+ ))
+ } else {
+ None
+ };
parts.push(CompleteMultipartUploadPart {
etag: etag.trim_matches('"').to_string(),
part_number: part_number.parse().ok()?,
+ checksum,
});
} else {
return None;