aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs33
1 files changed, 18 insertions, 15 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 6b786318..b9d15b21 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::prelude::*;
-use hyper::body::Body;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
@@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -26,11 +27,11 @@ use crate::signature::verify_signed_content;
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_put_part(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -87,8 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body = req.into_body().map_err(Error::from);
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let stream = body_stream(req.into_body());
+ let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
@@ -172,7 +173,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::empty())
+ .body(empty_body())
.unwrap();
Ok(response)
}
@@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup {
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_abort_multipart_upload(
@@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload(
bucket_id: Uuid,
key: &str,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
@@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Body::from(vec![])))
+ Ok(Response::new(empty_body()))
}
// ======== helpers ============