aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/copy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/copy.rs')
-rw-r--r--src/api/s3/copy.rs373
1 files changed, 288 insertions, 85 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 3c2bd483..411a6917 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -1,17 +1,18 @@
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use futures::{stream, stream::Stream, StreamExt};
-use md5::{Digest as Md5Digest, Md5};
+use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
use bytes::Bytes;
use hyper::{Request, Response};
use serde::Serialize;
use garage_net::bytes_buf::BytesBuf;
+use garage_net::stream::read_stream_to_end;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
+use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::s3::block_ref_table::*;
@@ -21,11 +22,16 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
+use crate::s3::checksum::*;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
+use crate::s3::get::full_object_byte_stream;
use crate::s3::multipart;
-use crate::s3::put::get_headers;
+use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
use crate::s3::xml::{self as s3_xml, xmlns_tag};
+// -------- CopyObject ---------
+
pub async fn handle_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@@ -33,13 +39,9 @@ pub async fn handle_copy(
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
- let source_object = get_copy_source(&ctx, req).await?;
+ let checksum_algorithm = request_checksum_algorithm(req.headers())?;
- let ReqCtx {
- garage,
- bucket_id: dest_bucket_id,
- ..
- } = ctx;
+ let source_object = get_copy_source(&ctx, req).await?;
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@@ -47,26 +49,150 @@ pub async fn handle_copy(
// Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_version, &source_version_meta.etag)?;
+ // Determine encryption parameters
+ let (source_encryption, source_object_meta_inner) =
+ EncryptionParams::check_decrypt_for_copy_source(
+ &ctx.garage,
+ req.headers(),
+ &source_version_meta.encryption,
+ )?;
+ let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
+
+ // Extract source checksum info before source_object_meta_inner is consumed
+ let source_checksum = source_object_meta_inner.checksum;
+ let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
+
+ // If source object has a checksum, the destination object must as well.
+ // The x-amz-checksum-algorihtm header allows to change that algorithm,
+ // but if it is absent, we must use the same as before
+ let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
+
+ // Determine metadata of destination object
+ let was_multipart = source_version_meta.etag.contains('-');
+ let dest_object_meta = ObjectVersionMetaInner {
+ headers: match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
+ get_headers(req.headers())?
+ }
+ _ => source_object_meta_inner.into_owned().headers,
+ },
+ checksum: source_checksum,
+ };
+
+ // Do actual object copying
+ //
+ // In any of the following scenarios, we need to read the whole object
+ // data and re-write it again:
+ //
+ // - the data needs to be decrypted or encrypted
+ // - the requested checksum algorithm requires us to recompute a checksum
+ // - the original object was a multipart upload and a checksum algorithm
+ // is defined (AWS specifies that in this case, we must recompute the
+ // checksum from scratch as if this was a single big object and not
+ // a multipart object, as the checksums are not computed in the same way)
+ //
+ // In other cases, we can just copy the metadata and reference the same blocks.
+ //
+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
+
+ let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption)
+ || source_checksum_algorithm != checksum_algorithm
+ || (was_multipart && checksum_algorithm.is_some());
+
+ let res = if !must_recopy {
+ // In most cases, we can just copy the metadata and link blocks of the
+ // old object from the new object.
+ handle_copy_metaonly(
+ ctx,
+ dest_key,
+ dest_object_meta,
+ dest_encryption,
+ source_version,
+ source_version_data,
+ source_version_meta,
+ )
+ .await?
+ } else {
+ let expected_checksum = ExpectedChecksums {
+ md5: None,
+ sha256: None,
+ extra: source_checksum,
+ };
+ let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm {
+ ChecksumMode::Calculate(checksum_algorithm)
+ } else {
+ ChecksumMode::Verify(&expected_checksum)
+ };
+ // If source and dest encryption use different keys,
+ // we must decrypt content and re-encrypt, so rewrite all data blocks.
+ handle_copy_reencrypt(
+ ctx,
+ dest_key,
+ dest_object_meta,
+ dest_encryption,
+ source_version,
+ source_version_data,
+ source_encryption,
+ checksum_mode,
+ )
+ .await?
+ };
+
+ let last_modified = msec_to_rfc3339(res.version_timestamp);
+ let result = CopyObjectResult {
+ last_modified: s3_xml::Value(last_modified),
+ etag: s3_xml::Value(format!("\"{}\"", res.etag)),
+ };
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ let mut resp = Response::builder()
+ .header("Content-Type", "application/xml")
+ .header("x-amz-version-id", hex::encode(res.version_uuid))
+ .header(
+ "x-amz-copy-source-version-id",
+ hex::encode(source_version.uuid),
+ );
+ dest_encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(xml))?)
+}
+
+async fn handle_copy_metaonly(
+ ctx: ReqCtx,
+ dest_key: &str,
+ dest_object_meta: ObjectVersionMetaInner,
+ dest_encryption: EncryptionParams,
+ source_version: &ObjectVersion,
+ source_version_data: &ObjectVersionData,
+ source_version_meta: &ObjectVersionMeta,
+) -> Result<SaveStreamResult, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id: dest_bucket_id,
+ ..
+ } = ctx;
+
// Generate parameters for copied object
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
- // Implement x-amz-metadata-directive: REPLACE
- let new_meta = match req.headers().get("x-amz-metadata-directive") {
- Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
- headers: get_headers(req.headers())?,
- size: source_version_meta.size,
- etag: source_version_meta.etag.clone(),
- },
- _ => source_version_meta.clone(),
+ let new_meta = ObjectVersionMeta {
+ encryption: dest_encryption.encrypt_meta(dest_object_meta)?,
+ size: source_version_meta.size,
+ etag: source_version_meta.etag.clone(),
};
- let etag = new_meta.etag.to_string();
+ let res = SaveStreamResult {
+ version_uuid: new_uuid,
+ version_timestamp: new_timestamp,
+ etag: new_meta.etag.clone(),
+ };
// Save object copy
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
+ // bytes is either plaintext before&after or encrypted with the
+ // same keys, so it's ok to just copy it as is
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
@@ -97,7 +223,8 @@ pub async fn handle_copy(
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Uploading {
- headers: new_meta.headers.clone(),
+ encryption: new_meta.encryption.clone(),
+ checksum_algorithm: None,
multipart: false,
},
};
@@ -164,23 +291,42 @@ pub async fn handle_copy(
}
}
- let last_modified = msec_to_rfc3339(new_timestamp);
- let result = CopyObjectResult {
- last_modified: s3_xml::Value(last_modified),
- etag: s3_xml::Value(format!("\"{}\"", etag)),
- };
- let xml = s3_xml::to_xml_with_header(&result)?;
+ Ok(res)
+}
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .header("x-amz-version-id", hex::encode(new_uuid))
- .header(
- "x-amz-copy-source-version-id",
- hex::encode(source_version.uuid),
- )
- .body(string_body(xml))?)
+async fn handle_copy_reencrypt(
+ ctx: ReqCtx,
+ dest_key: &str,
+ dest_object_meta: ObjectVersionMetaInner,
+ dest_encryption: EncryptionParams,
+ source_version: &ObjectVersion,
+ source_version_data: &ObjectVersionData,
+ source_encryption: EncryptionParams,
+ checksum_mode: ChecksumMode<'_>,
+) -> Result<SaveStreamResult, Error> {
+ // basically we will read the source data (decrypt if necessary)
+ // and save that in a new object (encrypt if necessary),
+ // by combining the code used in getobject and putobject
+ let source_stream = full_object_byte_stream(
+ ctx.garage.clone(),
+ source_version,
+ source_version_data,
+ source_encryption,
+ );
+
+ save_stream(
+ &ctx,
+ dest_object_meta,
+ dest_encryption,
+ source_stream.map_err(|e| Error::from(GarageError::from(e))),
+ &dest_key.to_string(),
+ checksum_mode,
+ )
+ .await
}
+// -------- UploadPartCopy ---------
+
pub async fn handle_upload_part_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@@ -193,7 +339,7 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
- let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
+ let (source_object, (_, dest_version, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
@@ -206,6 +352,24 @@ pub async fn handle_upload_part_copy(
// Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_object_version, &source_version_meta.etag)?;
+ // Determine encryption parameters
+ let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source(
+ &garage,
+ req.headers(),
+ &source_version_meta.encryption,
+ )?;
+ let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state {
+ ObjectVersionState::Uploading {
+ encryption,
+ checksum_algorithm,
+ ..
+ } => (encryption, checksum_algorithm),
+ _ => unreachable!(),
+ };
+ let (dest_encryption, _) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &dest_object_encryption)?;
+ let same_encryption = EncryptionParams::is_same(&source_encryption, &dest_encryption);
+
// Check source range is valid
let source_range = match req.headers().get("x-amz-copy-source-range") {
Some(range) => {
@@ -227,21 +391,16 @@ pub async fn handle_upload_part_copy(
};
// Check source version is not inlined
- match source_version_data {
- ObjectVersionData::DeleteMarker => unreachable!(),
- ObjectVersionData::Inline(_meta, _bytes) => {
- // This is only for small files, we don't bother handling this.
- // (in AWS UploadPartCopy works for parts at least 5MB which
- // is never the case of an inline object)
- return Err(Error::bad_request(
- "Source object is too small (minimum part size is 5Mb)",
- ));
- }
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
- };
+ if matches!(source_version_data, ObjectVersionData::Inline(_, _)) {
+ // This is only for small files, we don't bother handling this.
+ // (in AWS UploadPartCopy works for parts at least 5MB which
+ // is never the case of an inline object)
+ return Err(Error::bad_request(
+ "Source object is too small (minimum part size is 5Mb)",
+ ));
+ }
- // Fetch source versin with its block list,
- // and destination version to check part hasn't yet been uploaded
+ // Fetch source version with its block list
let source_version = garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
@@ -251,7 +410,9 @@ pub async fn handle_upload_part_copy(
// We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part
- // which is used as its ETag.
+ // which is used as its ETag. For encrypted sources or destinations,
+ // we must always read(+decrypt) and then write(+encrypt), so we
+ // can never reuse data blocks as is.
// First, calculate what blocks we want to keep,
// and the subrange of the block to take, if the bounds of the
@@ -300,7 +461,9 @@ pub async fn handle_upload_part_copy(
dest_mpu_part_key,
MpuPart {
version: dest_version_id,
+ // These are all filled in later (bottom of this function)
etag: None,
+ checksum: None,
size: None,
},
);
@@ -313,32 +476,55 @@ pub async fn handle_upload_part_copy(
},
false,
);
+ // write an empty version now to be the parent of the block_ref entries
+ garage.version_table.insert(&dest_version).await?;
// Now, actually copy the blocks
- let mut md5hasher = Md5::new();
+ let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted())
+ .add(dest_object_checksum_algorithm);
// First, create a stream that is able to read the source blocks
// and extract the subrange if necessary.
// The second returned value is an Option<Hash>, that is Some
// if and only if the block returned is a block that already existed
- // in the Garage data store (thus we don't need to save it again).
+ // in the Garage data store and can be reused as-is instead of having
+ // to save it again. This excludes encrypted source blocks that we had
+ // to decrypt.
let garage2 = garage.clone();
let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
.enumerate()
- .flat_map(|(i, (block_hash, range_to_copy))| {
+ .map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
- stream::once(async move {
- let data = garage3
- .block_manager
- .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ async move {
+ let stream = source_encryption
+ .get_block(&garage3, &block_hash, Some(order_stream.order(i as u64)))
.await?;
+ let data = read_stream_to_end(stream).await?.into_bytes();
+ // For each item, we return a tuple of:
+ // 1. the full data block (decrypted)
+ // 2. an Option<Hash> that indicates the hash of the block in the block store,
+ // only if it can be re-used as-is in the copied object
match range_to_copy {
- Some(r) => Ok((data.slice(r), None)),
- None => Ok((data, Some(block_hash))),
+ Some(r) => {
+ // If we are taking a subslice of the data, we cannot reuse the block as-is
+ Ok((data.slice(r), None))
+ }
+ None if same_encryption => {
+ // If the data is unencrypted before & after, or if we are using
+ // the same encryption key, we can reuse the stored block, no need
+ // to re-send it to storage nodes.
+ Ok((data, Some(block_hash)))
+ }
+ None => {
+ // If we are decrypting / (re)encrypting with different keys,
+ // we cannot reuse the block as-is
+ Ok((data, None))
+ }
}
- })
+ }
})
+ .buffered(2)
.peekable();
// The defragmenter is a custom stream (defined below) that concatenates
@@ -346,22 +532,39 @@ pub async fn handle_upload_part_copy(
// It returns a series of (Vec<u8>, Option<Hash>).
// When it is done, it returns an empty vec.
// Same as the previous iterator, the Option is Some(_) if and only if
- // it's an existing block of the Garage data store.
+ // it's an existing block of the Garage data store that can be reused.
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
let mut current_offset = 0;
let mut next_block = defragmenter.next().await?;
+ // TODO this could be optimized similarly to read_and_put_blocks
+ // low priority because uploadpartcopy is rarely used
loop {
let (data, existing_block_hash) = next_block;
if data.is_empty() {
break;
}
- md5hasher.update(&data[..]);
-
- let must_upload = existing_block_hash.is_none();
- let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
+ let data_len = data.len() as u64;
+
+ let (checksummer_updated, (data_to_upload, final_hash)) =
+ tokio::task::spawn_blocking(move || {
+ checksummer.update(&data[..]);
+
+ let tup = match existing_block_hash {
+ Some(hash) if same_encryption => (None, hash),
+ _ => {
+ let data_enc = dest_encryption.encrypt_block(data)?;
+ let hash = blake2sum(&data_enc);
+ (Some(data_enc), hash)
+ }
+ };
+ Ok::<_, Error>((checksummer, tup))
+ })
+ .await
+ .unwrap()?;
+ checksummer = checksummer_updated;
dest_version.blocks.clear();
dest_version.blocks.put(
@@ -371,10 +574,10 @@ pub async fn handle_upload_part_copy(
},
VersionBlock {
hash: final_hash,
- size: data.len() as u64,
+ size: data_len,
},
);
- current_offset += data.len() as u64;
+ current_offset += data_len;
let block_ref = BlockRef {
block: final_hash,
@@ -382,36 +585,34 @@ pub async fn handle_upload_part_copy(
deleted: false.into(),
};
- let garage2 = garage.clone();
- let res = futures::try_join!(
+ let (_, _, _, next) = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
- async move {
- if must_upload {
- garage2
+ async {
+ if let Some(final_data) = data_to_upload {
+ garage
.block_manager
- .rpc_put_block(final_hash, data, None)
+ .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
.await
} else {
Ok(())
}
},
- async {
- // Thing 2: we need to insert the block in the version
- garage.version_table.insert(&dest_version).await?;
- // Thing 3: we need to add a block reference
- garage.block_ref_table.insert(&block_ref).await
- },
- // Thing 4: we need to prefetch the next block
+ // Thing 2: we need to insert the block in the version
+ garage.version_table.insert(&dest_version),
+ // Thing 3: we need to add a block reference
+ garage.block_ref_table.insert(&block_ref),
+ // Thing 4: we need to read the next block
defragmenter.next(),
)?;
- next_block = res.2;
+ next_block = next;
}
assert_eq!(current_offset, source_range.length);
- let data_md5sum = md5hasher.finalize();
- let etag = hex::encode(data_md5sum);
+ let checksums = checksummer.finalize();
+ let etag = dest_encryption.etag_from_md5(&checksums.md5);
+ let checksum = checksums.extract(dest_object_checksum_algorithm);
// Put the part's ETag in the Versiontable
dest_mpu.parts.put(
@@ -419,6 +620,7 @@ pub async fn handle_upload_part_copy(
MpuPart {
version: dest_version_id,
etag: Some(etag.clone()),
+ checksum,
size: Some(current_offset),
},
);
@@ -431,13 +633,14 @@ pub async fn handle_upload_part_copy(
last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
})?;
- Ok(Response::builder()
+ let mut resp = Response::builder()
.header("Content-Type", "application/xml")
.header(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
- )
- .body(string_body(resp_xml))?)
+ );
+ dest_encryption.add_response_headers(&mut resp);
+ Ok(resp.body(string_body(resp_xml))?)
}
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {