diff options
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/multipart.rs | 9 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 9 | ||||
-rw-r--r-- | src/api/s3/put.rs | 14 |
3 files changed, 12 insertions, 20 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 4aa27eaf..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use futures::{prelude::*, TryStreamExt}; -use http_body_util::BodyStream; +use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -89,10 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut chunker = StreamChunker::new(body_stream, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index e9732dc4..bca8d6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -7,8 +7,7 @@ use std::task::{Context, Poll}; use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::{Stream, StreamExt, TryStreamExt}; -use http_body_util::BodyStream; +use futures::{Stream, StreamExt}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; @@ -45,10 +44,8 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let body_stream = BodyStream::new(body) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints); + let stream = body_stream::<_, Error>(body); + let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 3d43eee8..17424862 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use http_body_util::BodyStream; -use hyper::body::Bytes; -use hyper::header::{HeaderMap, HeaderValue}; -use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use hyper::body::{Body, Bytes}; +use hyper::header::{HeaderMap, HeaderValue}; +use hyper::{Request, Response}; + use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, Context, @@ -51,14 +51,12 @@ pub async fn handle_put( None => None, }; - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); + let stream = body_stream(req.into_body()); save_stream( garage, headers, - body_stream, + stream, bucket, key, content_md5, |