aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs68
1 files changed, 39 insertions, 29 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 5959bcd6..1d5aeb26 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
-use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::mpu_table::*;
@@ -25,12 +24,16 @@ use crate::signature::verify_signed_content;
// ----
pub async fn handle_create_multipart_upload(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_name: &str,
- bucket_id: Uuid,
key: &String,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ ..
+ } = &ctx;
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload(
headers,
},
};
- let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
// Create multipart upload in mpu table
// This multipart upload will hold references to uploaded parts
// (which are entries in the Version table)
- let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false);
+ let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
garage.mpu_table.insert(&mpu).await?;
// Send success response
@@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload(
}
pub async fn handle_put_part(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { garage, .. } = &ctx;
+
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -90,10 +94,8 @@ pub async fn handle_put_part(
let stream = body_stream(req.into_body());
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
- let ((_, _, mut mpu), first_block) = futures::try_join!(
- get_upload(&garage, &bucket_id, &key, &upload_id),
- chunker.next(),
- )?;
+ let ((_, _, mut mpu), first_block) =
+ futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -135,7 +137,7 @@ pub async fn handle_put_part(
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?;
+ read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
// Verify that checksums map
ensure_checksum_matches(
@@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup {
}
pub async fn handle_complete_multipart_upload(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_name: &str,
- bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ ..
+ } = &ctx;
+
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
@@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and multipart upload
let key = key.to_string();
- let (object, mut object_version, mpu) =
- get_upload(&garage, &bucket.id, &key, &upload_id).await?;
+ let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?;
if mpu.parts.is_empty() {
return Err(Error::bad_request("No data was uploaded"));
@@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload(
let mut final_version = Version::new(
upload_id,
VersionBacklink::Object {
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.to_string(),
},
false,
@@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
- if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
+ if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
- let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
return Err(e);
@@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload(
final_version.blocks.items()[0].1.hash,
));
- let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
@@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload(
}
pub async fn handle_abort_multipart_upload(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
key: &str,
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let upload_id = decode_upload_id(upload_id)?;
- let (_, mut object_version, _) =
- get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?;
+ let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?;
object_version.state = ObjectVersionState::Aborted;
- let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
Ok(Response::new(empty_body()))
@@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload(
#[allow(clippy::ptr_arg)]
pub(crate) async fn get_upload(
- garage: &Garage,
- bucket_id: &Uuid,
+ ctx: &ReqCtx,
key: &String,
upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
let (object, mpu) = futures::try_join!(
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage