diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 19:54:25 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 19:54:25 +0100 |
commit | 658541d812103662be88ad6d3d1c0fdf1a948862 (patch) | |
tree | 89919eada94167b94174e4eb2aaa48e5a2f46c6f /src/api/s3 | |
parent | c5df820e2c2b4bff5e239b8e99f07178b98b3f5a (diff) | |
download | garage-658541d812103662be88ad6d3d1c0fdf1a948862.tar.gz garage-658541d812103662be88ad6d3d1c0fdf1a948862.zip |
api: use checksumming in api_common::signature for put/putpart
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/api_server.rs | 11 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 27 | ||||
-rw-r--r-- | src/api/s3/put.rs | 44 |
3 files changed, 52 insertions, 30 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index fe6545cc..e26c2b65 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -15,7 +15,7 @@ use garage_model::key_table::Key; use garage_api_common::cors::*; use garage_api_common::generic_server::*; use garage_api_common::helpers::*; -use garage_api_common::signature::{verify_request, ContentSha256Header}; +use garage_api_common::signature::verify_request; use crate::bucket::*; use crate::copy::*; @@ -124,11 +124,6 @@ impl ApiHandler for S3ApiServer { let verified_request = verify_request(&garage, req, "s3").await?; let req = verified_request.request; let api_key = verified_request.access_key; - let content_sha256 = match verified_request.content_sha256_header { - ContentSha256Header::Sha256Checksum(h) => Some(h), - // TODO take into account streaming/trailer checksums, etc. - _ => None, - }; let bucket_name = match bucket_name { None => { @@ -205,14 +200,14 @@ impl ApiHandler for S3ApiServer { key, part_number, upload_id, - } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + } => handle_put_part(ctx, req, &key, part_number, &upload_id).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, - Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key).await, Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(ctx, &key, &upload_id).await } diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index f381d670..59a469d1 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -94,7 +94,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = &ctx; @@ -105,18 +104,23 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); + let (req_head, mut req_body) = req.into_parts(); + + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + req_body.add_md5(); + } - let (stream, checksums) = req_body.streaming_with_checksums(true); + let (stream, stream_checksums) = req_body.streaming_with_checksums(); let stream = stream.map_err(Error::from); - // TODO checksums let mut chunker = StreamChunker::new(stream, garage.config.block_size); @@ -176,21 +180,22 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + // TODO don't duplicate checksums + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; // Verify that checksums map - checksums.verify(&expected_checksums)?; + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 551c3b76..24f888bc 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -31,6 +31,7 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; @@ -49,6 +50,7 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom(StreamingChecksumReceiver), Calculate(Option<ChecksumAlgorithm>), } @@ -56,7 +58,6 @@ pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, key: &String, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; @@ -67,7 +68,7 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; @@ -79,9 +80,14 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let (stream, checksums) = req.into_body().streaming_with_checksums(true); + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + req_body.add_md5(); + } + + let (stream, checksums) = req_body.streaming_with_checksums(); let stream = stream.map_err(Error::from); - // TODO checksums let res = save_stream( &ctx, @@ -89,7 +95,7 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom(checksums), ) .await?; @@ -125,10 +131,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom(_) => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -136,7 +147,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -145,6 +156,12 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; let size = first_block.len() as u64; @@ -216,13 +233,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -235,6 +252,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; // Verify quotas are respsected @@ -335,7 +357,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker<S>, + mut chunker: StreamChunker<S>, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); |