diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 33 |
1 files changed, 18 insertions, 15 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 6b786318..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use futures::prelude::*; -use hyper::body::Body; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -26,11 +27,11 @@ use crate::signature::verify_signed_content; pub async fn handle_create_multipart_upload( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_name: &str, bucket_id: Uuid, key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -87,8 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = req.into_body().map_err(Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), @@ -172,7 +173,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) + .body(empty_body()) .unwrap(); Ok(response) } @@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup { pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( @@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload( bucket_id: Uuid, key: &str, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = @@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Body::from(vec![]))) + Ok(Response::new(empty_body())) } // ======== helpers ============ |