aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 19:54:25 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 19:54:25 +0100
commit658541d812103662be88ad6d3d1c0fdf1a948862 (patch)
tree89919eada94167b94174e4eb2aaa48e5a2f46c6f /src/api/s3
parentc5df820e2c2b4bff5e239b8e99f07178b98b3f5a (diff)
downloadgarage-658541d812103662be88ad6d3d1c0fdf1a948862.tar.gz
garage-658541d812103662be88ad6d3d1c0fdf1a948862.zip
api: use checksumming in api_common::signature for put/putpart
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/api_server.rs11
-rw-r--r--src/api/s3/multipart.rs27
-rw-r--r--src/api/s3/put.rs44
3 files changed, 52 insertions, 30 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index fe6545cc..e26c2b65 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -15,7 +15,7 @@ use garage_model::key_table::Key;
use garage_api_common::cors::*;
use garage_api_common::generic_server::*;
use garage_api_common::helpers::*;
-use garage_api_common::signature::{verify_request, ContentSha256Header};
+use garage_api_common::signature::verify_request;
use crate::bucket::*;
use crate::copy::*;
@@ -124,11 +124,6 @@ impl ApiHandler for S3ApiServer {
let verified_request = verify_request(&garage, req, "s3").await?;
let req = verified_request.request;
let api_key = verified_request.access_key;
- let content_sha256 = match verified_request.content_sha256_header {
- ContentSha256Header::Sha256Checksum(h) => Some(h),
- // TODO take into account streaming/trailer checksums, etc.
- _ => None,
- };
let bucket_name = match bucket_name {
None => {
@@ -205,14 +200,14 @@ impl ApiHandler for S3ApiServer {
key,
part_number,
upload_id,
- } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await,
+ } => handle_put_part(ctx, req, &key, part_number, &upload_id).await,
Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
Endpoint::UploadPartCopy {
key,
part_number,
upload_id,
} => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
- Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await,
+ Endpoint::PutObject { key } => handle_put(ctx, req, &key).await,
Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(ctx, &key, &upload_id).await
}
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index f381d670..59a469d1 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -94,7 +94,6 @@ pub async fn handle_put_part(
key: &str,
part_number: u64,
upload_id: &str,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
@@ -105,18 +104,23 @@ pub async fn handle_put_part(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
- sha256: content_sha256,
+ sha256: None,
extra: request_checksum_value(req.headers())?,
};
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let (req_head, req_body) = req.into_parts();
+ let (req_head, mut req_body) = req.into_parts();
+
+ req_body.add_expected_checksums(expected_checksums.clone());
+ // TODO: avoid parsing encryption headers twice...
+ if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() {
+ req_body.add_md5();
+ }
- let (stream, checksums) = req_body.streaming_with_checksums(true);
+ let (stream, stream_checksums) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from);
- // TODO checksums
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
@@ -176,21 +180,22 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let checksummer =
- Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
- let (total_size, checksums, _) = read_and_put_blocks(
+ // TODO don't duplicate checksums
+ let (total_size, _, _) = read_and_put_blocks(
&ctx,
&version,
encryption,
part_number,
first_block,
- &mut chunker,
- checksummer,
+ chunker,
+ Checksummer::new(),
)
.await?;
// Verify that checksums map
- checksums.verify(&expected_checksums)?;
+ let checksums = stream_checksums
+ .await
+ .ok_or_internal_error("checksum calculation")??;
// Store part etag in version
let etag = encryption.etag_from_md5(&checksums.md5);
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 551c3b76..24f888bc 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -31,6 +31,7 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
+use garage_api_common::signature::body::StreamingChecksumReceiver;
use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody};
@@ -49,6 +50,7 @@ pub(crate) struct SaveStreamResult {
pub(crate) enum ChecksumMode<'a> {
Verify(&'a ExpectedChecksums),
+ VerifyFrom(StreamingChecksumReceiver),
Calculate(Option<ChecksumAlgorithm>),
}
@@ -56,7 +58,6 @@ pub async fn handle_put(
ctx: ReqCtx,
req: Request<ReqBody>,
key: &String,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
let headers = get_headers(req.headers())?;
@@ -67,7 +68,7 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
- sha256: content_sha256,
+ sha256: None,
extra: request_checksum_value(req.headers())?,
};
@@ -79,9 +80,14 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
- let (stream, checksums) = req.into_body().streaming_with_checksums(true);
+ let mut req_body = req.into_body();
+ req_body.add_expected_checksums(expected_checksums.clone());
+ if !encryption.is_encrypted() {
+ req_body.add_md5();
+ }
+
+ let (stream, checksums) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from);
- // TODO checksums
let res = save_stream(
&ctx,
@@ -89,7 +95,7 @@ pub async fn handle_put(
encryption,
stream,
key,
- ChecksumMode::Verify(&expected_checksums),
+ ChecksumMode::VerifyFrom(checksums),
)
.await?;
@@ -125,10 +131,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref());
- let mut checksummer = match checksum_mode {
+ let mut checksummer = match &checksum_mode {
ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
ChecksumMode::Calculate(algo) => {
- Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo)
+ Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo)
+ }
+ ChecksumMode::VerifyFrom(_) => {
+ // Checksums are calculated by the garage_api_common::signature module
+ // so here we can just have an empty checksummer that does nothing
+ Checksummer::new()
}
};
@@ -136,7 +147,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD {
checksummer.update(&first_block);
- let checksums = checksummer.finalize();
+ let mut checksums = checksummer.finalize();
match checksum_mode {
ChecksumMode::Verify(expected) => {
@@ -145,6 +156,12 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
+ ChecksumMode::VerifyFrom(checksummer) => {
+ drop(chunker);
+ checksums = checksummer
+ .await
+ .ok_or_internal_error("checksum calculation")??;
+ }
};
let size = first_block.len() as u64;
@@ -216,13 +233,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data
- let (total_size, checksums, first_block_hash) = read_and_put_blocks(
+ let (total_size, mut checksums, first_block_hash) = read_and_put_blocks(
ctx,
&version,
encryption,
1,
first_block,
- &mut chunker,
+ chunker,
checksummer,
)
.await?;
@@ -235,6 +252,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
+ ChecksumMode::VerifyFrom(checksummer) => {
+ checksums = checksummer
+ .await
+ .ok_or_internal_error("checksum calculation")??;
+ }
};
// Verify quotas are respsected
@@ -335,7 +357,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
- chunker: &mut StreamChunker<S>,
+ mut chunker: StreamChunker<S>,
checksummer: Checksummer,
) -> Result<(u64, Checksums, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage");