From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/api/s3/copy.rs | 121 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 72 insertions(+), 49 deletions(-) (limited to 'src/api/s3/copy.rs') diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7eb6459d..68b4f0c9 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; +use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; @@ -18,12 +18,14 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::helpers::parse_bucket_key; use crate::s3::error::*; -use crate::s3::put::{decode_upload_id, get_headers}; +use crate::s3::multipart; +use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( @@ -92,7 +94,10 @@ pub async fn handle_copy( let tmp_dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, - state: ObjectVersionState::Uploading(new_meta.headers.clone()), + state: ObjectVersionState::Uploading { + headers: new_meta.headers.clone(), + multipart: false, + }, }; let tmp_dest_object = Object::new( dest_bucket_id, @@ -105,8 +110,14 @@ pub async fn handle_copy( // this means that the BlockRef entries linked to this version cannot be // marked as deleted (they are marked as deleted only if the Version // doesn't exist or is marked as deleted). - let mut dest_version = - Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); + let mut dest_version = Version::new( + new_uuid, + VersionBacklink::Object { + bucket_id: dest_bucket_id, + key: dest_key.to_string(), + }, + false, + ); garage.version_table.insert(&dest_version).await?; // Fill in block list for version and insert block refs @@ -179,17 +190,13 @@ pub async fn handle_upload_part_copy( ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let dest_version_uuid = decode_upload_id(upload_id)?; + let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, dest_object) = futures::try_join!( + let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( get_copy_source(&garage, api_key, req), - garage - .object_table - .get(&dest_bucket_id, &dest_key) - .map_err(Error::from), + multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) )?; - let dest_object = dest_object.ok_or(Error::NoSuchKey)?; let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -217,15 +224,6 @@ pub async fn handle_upload_part_copy( }, }; - // Check destination version is indeed in uploading state - if !dest_object - .versions() - .iter() - .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - // Check source version is not inlined match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), @@ -242,23 +240,11 @@ pub async fn handle_upload_part_copy( // Fetch source versin with its block list, // and destination version to check part hasn't yet been uploaded - let (source_version, dest_version) = futures::try_join!( - garage - .version_table - .get(&source_object_version.uuid, &EmptyKey), - garage.version_table.get(&dest_version_uuid, &EmptyKey), - )?; - let source_version = source_version.ok_or(Error::NoSuchKey)?; - - // Check this part number hasn't yet been uploaded - if let Some(dv) = dest_version { - if dv.has_part_number(part_number) { - return Err(Error::bad_request(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } + let source_version = garage + .version_table + .get(&source_object_version.uuid, &EmptyKey) + .await? + .ok_or(Error::NoSuchKey)?; // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks @@ -299,6 +285,33 @@ pub async fn handle_upload_part_copy( current_offset = block_end; } + // Calculate the identity of destination part: timestamp, version id + let dest_version_id = gen_uuid(); + let dest_mpu_part_key = MpuPartKey { + part_number, + timestamp: dest_mpu.next_timestamp(part_number), + }; + + // Create the uploaded part + dest_mpu.parts.clear(); + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: None, + size: None, + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; + + let mut dest_version = Version::new( + dest_version_id, + VersionBacklink::MultipartUpload { + upload_id: dest_upload_id, + }, + false, + ); + // Now, actually copy the blocks let mut md5hasher = Md5::new(); @@ -348,8 +361,8 @@ pub async fn handle_upload_part_copy( let must_upload = existing_block_hash.is_none(); let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( + dest_version.blocks.clear(); + dest_version.blocks.put( VersionBlockKey { part_number, offset: current_offset, @@ -363,7 +376,7 @@ pub async fn handle_upload_part_copy( let block_ref = BlockRef { block: final_hash, - version: dest_version_uuid, + version: dest_version_id, deleted: false.into(), }; @@ -378,23 +391,33 @@ pub async fn handle_upload_part_copy( Ok(()) } }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), + async { + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&dest_version).await?; + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref).await + }, // Thing 4: we need to prefetch the next block defragmenter.next(), )?; - next_block = res.3; + next_block = res.2; } + assert_eq!(current_offset, source_range.length); + let data_md5sum = md5hasher.finalize(); let etag = hex::encode(data_md5sum); // Put the part's ETag in the Versiontable - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.parts_etags.put(part_number, etag.clone()); - garage.version_table.insert(&version).await?; + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: Some(etag.clone()), + size: Some(current_offset), + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { -- cgit v1.2.3