aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/copy.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-21 14:06:59 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-26 15:01:34 +0100
commit74949c69cbf1a8222b6d10a02fcf5fe139ccb560 (patch)
tree54128ccf24e8598c52c2a5366ad0b509bf1c296e /src/api/s3/copy.rs
parent7e0107c47db71e8da13990c9111ebde8cbf60d8f (diff)
downloadgarage-74949c69cbf1a8222b6d10a02fcf5fe139ccb560.tar.gz
garage-74949c69cbf1a8222b6d10a02fcf5fe139ccb560.zip
[s3-checksum] implement x-amz-checksum-* headers
Diffstat (limited to 'src/api/s3/copy.rs')
-rw-r--r--src/api/s3/copy.rs132
1 files changed, 96 insertions, 36 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 2b29ec6d..411a6917 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -2,7 +2,6 @@ use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
-use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
use hyper::{Request, Response};
@@ -23,11 +22,12 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::get::full_object_byte_stream;
use crate::s3::multipart;
-use crate::s3::put::{get_headers, save_stream, SaveStreamResult};
+use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
use crate::s3::xml::{self as s3_xml, xmlns_tag};
// -------- CopyObject ---------
@@ -39,6 +39,8 @@ pub async fn handle_copy(
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
+
let source_object = get_copy_source(&ctx, req).await?;
let (source_version, source_version_data, source_version_meta) =
@@ -48,7 +50,7 @@ pub async fn handle_copy(
copy_precondition.check(source_version, &source_version_meta.etag)?;
// Determine encryption parameters
- let (source_encryption, source_object_headers) =
+ let (source_encryption, source_object_meta_inner) =
EncryptionParams::check_decrypt_for_copy_source(
&ctx.garage,
req.headers(),
@@ -56,23 +58,54 @@ pub async fn handle_copy(
)?;
let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
- // Determine headers of destination object
- let dest_object_headers = match req.headers().get("x-amz-metadata-directive") {
- Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
- get_headers(req.headers())?
- }
- _ => source_object_headers.into_owned(),
+ // Extract source checksum info before source_object_meta_inner is consumed
+ let source_checksum = source_object_meta_inner.checksum;
+ let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
+
+ // If source object has a checksum, the destination object must as well.
+ // The x-amz-checksum-algorihtm header allows to change that algorithm,
+ // but if it is absent, we must use the same as before
+ let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
+
+ // Determine metadata of destination object
+ let was_multipart = source_version_meta.etag.contains('-');
+ let dest_object_meta = ObjectVersionMetaInner {
+ headers: match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
+ get_headers(req.headers())?
+ }
+ _ => source_object_meta_inner.into_owned().headers,
+ },
+ checksum: source_checksum,
};
// Do actual object copying
- let res = if EncryptionParams::is_same(&source_encryption, &dest_encryption) {
- // If source and dest are both unencrypted, or if the encryption keys
- // are the same, we can just copy the metadata and link blocks of the
+ //
+ // In any of the following scenarios, we need to read the whole object
+ // data and re-write it again:
+ //
+ // - the data needs to be decrypted or encrypted
+ // - the requested checksum algorithm requires us to recompute a checksum
+ // - the original object was a multipart upload and a checksum algorithm
+ // is defined (AWS specifies that in this case, we must recompute the
+ // checksum from scratch as if this was a single big object and not
+ // a multipart object, as the checksums are not computed in the same way)
+ //
+ // In other cases, we can just copy the metadata and reference the same blocks.
+ //
+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
+
+ let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption)
+ || source_checksum_algorithm != checksum_algorithm
+ || (was_multipart && checksum_algorithm.is_some());
+
+ let res = if !must_recopy {
+ // In most cases, we can just copy the metadata and link blocks of the
// old object from the new object.
handle_copy_metaonly(
ctx,
dest_key,
- dest_object_headers,
+ dest_object_meta,
dest_encryption,
source_version,
source_version_data,
@@ -80,16 +113,27 @@ pub async fn handle_copy(
)
.await?
} else {
+ let expected_checksum = ExpectedChecksums {
+ md5: None,
+ sha256: None,
+ extra: source_checksum,
+ };
+ let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm {
+ ChecksumMode::Calculate(checksum_algorithm)
+ } else {
+ ChecksumMode::Verify(&expected_checksum)
+ };
// If source and dest encryption use different keys,
// we must decrypt content and re-encrypt, so rewrite all data blocks.
handle_copy_reencrypt(
ctx,
dest_key,
- dest_object_headers,
+ dest_object_meta,
dest_encryption,
source_version,
source_version_data,
source_encryption,
+ checksum_mode,
)
.await?
};
@@ -115,7 +159,7 @@ pub async fn handle_copy(
async fn handle_copy_metaonly(
ctx: ReqCtx,
dest_key: &str,
- dest_object_headers: ObjectVersionHeaders,
+ dest_object_meta: ObjectVersionMetaInner,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
@@ -132,7 +176,7 @@ async fn handle_copy_metaonly(
let new_timestamp = now_msec();
let new_meta = ObjectVersionMeta {
- encryption: dest_encryption.encrypt_headers(dest_object_headers)?,
+ encryption: dest_encryption.encrypt_meta(dest_object_meta)?,
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
};
@@ -180,6 +224,7 @@ async fn handle_copy_metaonly(
timestamp: new_timestamp,
state: ObjectVersionState::Uploading {
encryption: new_meta.encryption.clone(),
+ checksum_algorithm: None,
multipart: false,
},
};
@@ -252,11 +297,12 @@ async fn handle_copy_metaonly(
async fn handle_copy_reencrypt(
ctx: ReqCtx,
dest_key: &str,
- dest_object_headers: ObjectVersionHeaders,
+ dest_object_meta: ObjectVersionMetaInner,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
source_encryption: EncryptionParams,
+ checksum_mode: ChecksumMode<'_>,
) -> Result<SaveStreamResult, Error> {
// basically we will read the source data (decrypt if necessary)
// and save that in a new object (encrypt if necessary),
@@ -270,12 +316,11 @@ async fn handle_copy_reencrypt(
save_stream(
&ctx,
- dest_object_headers,
+ dest_object_meta,
dest_encryption,
source_stream.map_err(|e| Error::from(GarageError::from(e))),
&dest_key.to_string(),
- None,
- None,
+ checksum_mode,
)
.await
}
@@ -313,8 +358,12 @@ pub async fn handle_upload_part_copy(
req.headers(),
&source_version_meta.encryption,
)?;
- let dest_object_encryption = match dest_version.state {
- ObjectVersionState::Uploading { encryption, .. } => encryption,
+ let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
_ => unreachable!(),
};
let (dest_encryption, _) =
@@ -412,7 +461,9 @@ pub async fn handle_upload_part_copy(
dest_mpu_part_key,
MpuPart {
version: dest_version_id,
+ // These are all filled in later (bottom of this function)
etag: None,
+ checksum: None,
size: None,
},
);
@@ -429,7 +480,8 @@ pub async fn handle_upload_part_copy(
garage.version_table.insert(&dest_version).await?;
// Now, actually copy the blocks
- let mut md5hasher = Md5::new();
+ let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted())
+ .add(dest_object_checksum_algorithm);
// First, create a stream that is able to read the source blocks
// and extract the subrange if necessary.
@@ -495,18 +547,24 @@ pub async fn handle_upload_part_copy(
}
let data_len = data.len() as u64;
- md5hasher.update(&data[..]);
-
- let (final_data, must_upload, final_hash) = match existing_block_hash {
- Some(hash) if same_encryption => (data, false, hash),
- _ => tokio::task::spawn_blocking(move || {
- let data_enc = dest_encryption.encrypt_block(data)?;
- let hash = blake2sum(&data_enc);
- Ok::<_, Error>((data_enc, true, hash))
+
+ let (checksummer_updated, (data_to_upload, final_hash)) =
+ tokio::task::spawn_blocking(move || {
+ checksummer.update(&data[..]);
+
+ let tup = match existing_block_hash {
+ Some(hash) if same_encryption => (None, hash),
+ _ => {
+ let data_enc = dest_encryption.encrypt_block(data)?;
+ let hash = blake2sum(&data_enc);
+ (Some(data_enc), hash)
+ }
+ };
+ Ok::<_, Error>((checksummer, tup))
})
.await
- .unwrap()?,
- };
+ .unwrap()?;
+ checksummer = checksummer_updated;
dest_version.blocks.clear();
dest_version.blocks.put(
@@ -531,7 +589,7 @@ pub async fn handle_upload_part_copy(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
async {
- if must_upload {
+ if let Some(final_data) = data_to_upload {
garage
.block_manager
.rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
@@ -552,8 +610,9 @@ pub async fn handle_upload_part_copy(
assert_eq!(current_offset, source_range.length);
- let data_md5sum = md5hasher.finalize();
- let etag = dest_encryption.etag_from_md5(&data_md5sum);
+ let checksums = checksummer.finalize();
+ let etag = dest_encryption.etag_from_md5(&checksums.md5);
+ let checksum = checksums.extract(dest_object_checksum_algorithm);
// Put the part's ETag in the Versiontable
dest_mpu.parts.put(
@@ -561,6 +620,7 @@ pub async fn handle_upload_part_copy(
MpuPart {
version: dest_version_id,
etag: Some(etag.clone()),
+ checksum,
size: Some(current_offset),
},
);