diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 19:54:25 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 19:54:25 +0100 |
commit | 658541d812103662be88ad6d3d1c0fdf1a948862 (patch) | |
tree | 89919eada94167b94174e4eb2aaa48e5a2f46c6f /src/api/s3/put.rs | |
parent | c5df820e2c2b4bff5e239b8e99f07178b98b3f5a (diff) | |
download | garage-658541d812103662be88ad6d3d1c0fdf1a948862.tar.gz garage-658541d812103662be88ad6d3d1c0fdf1a948862.zip |
api: use checksumming in api_common::signature for put/putpart
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 551c3b76..24f888bc 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -31,6 +31,7 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; @@ -49,6 +50,7 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom(StreamingChecksumReceiver), Calculate(Option<ChecksumAlgorithm>), } @@ -56,7 +58,6 @@ pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, key: &String, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; @@ -67,7 +68,7 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; @@ -79,9 +80,14 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let (stream, checksums) = req.into_body().streaming_with_checksums(true); + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + req_body.add_md5(); + } + + let (stream, checksums) = req_body.streaming_with_checksums(); let stream = stream.map_err(Error::from); - // TODO checksums let res = save_stream( &ctx, @@ -89,7 +95,7 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom(checksums), ) .await?; @@ -125,10 +131,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom(_) => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -136,7 +147,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -145,6 +156,12 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; let size = first_block.len() as u64; @@ -216,13 +233,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -235,6 +252,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; // Verify quotas are respsected @@ -335,7 +357,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker<S>, + mut chunker: StreamChunker<S>, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); |