diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 68 |
1 files changed, 39 insertions, 29 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 5959bcd6..1d5aeb26 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; @@ -25,12 +24,16 @@ use crate::signature::verify_signed_content; // ---- pub async fn handle_create_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: &Request<ReqBody>, - bucket_name: &str, - bucket_id: Uuid, key: &String, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload( headers, }, }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) - let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); + let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response @@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload( } pub async fn handle_put_part( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; + let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -90,10 +94,8 @@ pub async fn handle_put_part( let stream = body_stream(req.into_body()); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = futures::try_join!( - get_upload(&garage, &bucket_id, &key, &upload_id), - chunker.next(), - )?; + let ((_, _, mut mpu), first_block) = + futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -135,7 +137,7 @@ pub async fn handle_put_part( // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; + read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( @@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup { } pub async fn handle_complete_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_name: &str, - bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; + let body = http_body_util::BodyExt::collect(req.into_body()) .await? .to_bytes(); @@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload( // Get object and multipart upload let key = key.to_string(); - let (object, mut object_version, mpu) = - get_upload(&garage, &bucket.id, &key, &upload_id).await?; + let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); @@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload( let mut final_version = Version::new( upload_id, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.to_string(), }, false, @@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); - if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { + if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; return Err(e); @@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload( final_version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done @@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload( } pub async fn handle_abort_multipart_upload( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, key: &str, upload_id: &str, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let upload_id = decode_upload_id(upload_id)?; - let (_, mut object_version, _) = - get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?; object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(empty_body())) @@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload( #[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( - garage: &Garage, - bucket_id: &Uuid, + ctx: &ReqCtx, key: &String, upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let (object, mpu) = futures::try_join!( garage.object_table.get(bucket_id, key).map_err(Error::from), garage |