aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/multipart.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r--src/api/s3/multipart.rs38
1 files changed, 22 insertions, 16 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 96c4d044..4aa27eaf 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -2,8 +2,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::{prelude::*, TryStreamExt};
-use hyper::body::Body;
-use hyper::{body::HttpBody, Request, Response};
+use http_body_util::BodyStream;
+use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
@@ -17,6 +17,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -26,11 +28,11 @@ use crate::signature::verify_signed_content;
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -65,18 +67,18 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_put_part(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -87,8 +89,10 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body = TryStreamExt::map_err(req.into_body(), Error::from);
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let body_stream = BodyStream::new(req.into_body())
+ .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
+ .map_err(Error::from);
+ let mut chunker = StreamChunker::new(body_stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
@@ -172,7 +176,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::empty())
+ .body(empty_body())
.unwrap();
Ok(response)
}
@@ -210,14 +214,16 @@ impl Drop for InterruptedCleanup {
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = HttpBody::collect(req.into_body()).await?.to_bytes();
+) -> Result<Response<ResBody>, Error> {
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -365,7 +371,7 @@ pub async fn handle_complete_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_abort_multipart_upload(
@@ -373,7 +379,7 @@ pub async fn handle_abort_multipart_upload(
bucket_id: Uuid,
key: &str,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
@@ -383,7 +389,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Body::from(vec![])))
+ Ok(Response::new(empty_body()))
}
// ======== helpers ============