aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-07 15:25:49 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-07 15:32:51 +0100
commite011941964b1c1e0b90f85014d166d64a83ae8e2 (patch)
tree5c5cef9af72d48dd7347922341e43f0013380c60 /src
parent53746b59e525ff5f518ed59d7831b05e2732785d (diff)
downloadgarage-e011941964b1c1e0b90f85014d166d64a83ae8e2.tar.gz
garage-e011941964b1c1e0b90f85014d166d64a83ae8e2.zip
[dep-upgrade-202402] refactor use of BodyStream
Diffstat (limited to 'src')
-rw-r--r--src/api/helpers.rs23
-rw-r--r--src/api/s3/multipart.rs9
-rw-r--r--src/api/s3/post_object.rs9
-rw-r--r--src/api/s3/put.rs14
-rw-r--r--src/api/signature/streaming.rs8
5 files changed, 37 insertions, 26 deletions
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index ba7b1599..5f488912 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,7 +1,12 @@
use std::convert::Infallible;
+use futures::{Stream, StreamExt, TryStreamExt};
+
use http_body_util::{BodyExt, Full as FullBody};
-use hyper::{body::Body, Request, Response};
+use hyper::{
+ body::{Body, Bytes},
+ Request, Response,
+};
use idna::domain_to_unicode;
use serde::{Deserialize, Serialize};
@@ -187,6 +192,22 @@ where
.unwrap())
}
+pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
+where
+ B: Body<Data = Bytes>,
+ <B as Body>::Error: Into<E>,
+ E: From<Error>,
+{
+ let stream = http_body_util::BodyStream::new(body);
+ let stream = TryStreamExt::map_err(stream, Into::into);
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| E::from(Error::bad_request("non-data frame")))
+ })
+ })
+}
+
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {
*v == T::default()
}
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 4aa27eaf..b9d15b21 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -1,8 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
-use futures::{prelude::*, TryStreamExt};
-use http_body_util::BodyStream;
+use futures::prelude::*;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
@@ -89,10 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body_stream = BodyStream::new(req.into_body())
- .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
- .map_err(Error::from);
- let mut chunker = StreamChunker::new(body_stream, garage.config.block_size);
+ let stream = body_stream(req.into_body());
+ let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index e9732dc4..bca8d6c6 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -7,8 +7,7 @@ use std::task::{Context, Poll};
use base64::prelude::*;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
-use futures::{Stream, StreamExt, TryStreamExt};
-use http_body_util::BodyStream;
+use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
@@ -45,10 +44,8 @@ pub async fn handle_post_object(
);
let (head, body) = req.into_parts();
- let body_stream = BodyStream::new(body)
- .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
- .map_err(Error::from);
- let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints);
+ let stream = body_stream::<_, Error>(body);
+ let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 3d43eee8..17424862 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -4,13 +4,13 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
use futures::try_join;
-use http_body_util::BodyStream;
-use hyper::body::Bytes;
-use hyper::header::{HeaderMap, HeaderValue};
-use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use hyper::body::{Body, Bytes};
+use hyper::header::{HeaderMap, HeaderValue};
+use hyper::{Request, Response};
+
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
@@ -51,14 +51,12 @@ pub async fn handle_put(
None => None,
};
- let body_stream = BodyStream::new(req.into_body())
- .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
- .map_err(Error::from);
+ let stream = body_stream(req.into_body());
save_stream(
garage,
headers,
- body_stream,
+ stream,
bucket,
key,
content_md5,
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index ea5a64e2..39147ca0 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -5,7 +5,7 @@ use futures::prelude::*;
use futures::task;
use garage_model::key_table::Key;
use hmac::Mac;
-use http_body_util::{BodyStream, StreamBody};
+use http_body_util::StreamBody;
use hyper::body::{Bytes, Incoming as IncomingBody};
use hyper::Request;
@@ -51,11 +51,9 @@ pub fn parse_streaming_body(
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
- let body_stream = BodyStream::new(body)
- .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
- .map_err(Error::from);
+ let stream = body_stream::<_, Error>(body);
let signed_payload_stream =
- SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature)
+ SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
.map(|x| x.map(hyper::body::Frame::data))
.map_err(Error::from);
ReqBody::new(StreamBody::new(signed_payload_stream))