aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-07-07 17:15:53 +0200
committerAlex Auvolat <alex@adnab.me>2020-07-07 17:15:53 +0200
commitf22ecb60a8e1848de95e1bd3104b0ceec7058f0c (patch)
tree1e51db048f17295a070aae4025bfa16036a3c0fd /src
parent3b0b11085e0501afcc16f6a75632a4d425146158 (diff)
downloadgarage-f22ecb60a8e1848de95e1bd3104b0ceec7058f0c.tar.gz
garage-f22ecb60a8e1848de95e1bd3104b0ceec7058f0c.zip
Update to Hyper 0.13.6 that accepts non-Sync streams in wrap_stream.
Simplifies code and makes it possible to publish on crates.io
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/api_server.rs11
-rw-r--r--src/api/http_util.rs88
-rw-r--r--src/api/lib.rs1
-rw-r--r--src/api/s3_copy.rs8
-rw-r--r--src/api/s3_delete.rs11
-rw-r--r--src/api/s3_get.rs24
-rw-r--r--src/api/s3_list.rs7
-rw-r--r--src/api/s3_put.rs23
9 files changed, 40 insertions, 135 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 410adbb3..f9cd32b2 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -30,7 +30,7 @@ futures-util = "0.3"
tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "0.13"
+hyper = "^0.13.6"
url = "2.1"
httpdate = "0.3"
percent-encoding = "2.1.0"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 699dc5c4..92a9f2a6 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -11,7 +11,6 @@ use garage_util::error::Error;
use garage_model::garage::Garage;
-use crate::http_util::*;
use crate::signature::check_signature;
use crate::s3_copy::*;
@@ -50,7 +49,7 @@ async fn handler(
garage: Arc<Garage>,
req: Request<Body>,
addr: SocketAddr,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
info!("{} {} {}", addr, req.method(), req.uri());
debug!("{:?}", req);
match handler_inner(garage, req).await {
@@ -59,7 +58,7 @@ async fn handler(
Ok(x)
}
Err(e) => {
- let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e)));
+ let body: Body = Body::from(format!("{}\n", e));
let mut http_error = Response::new(body);
*http_error.status_mut() = e.http_status_code();
warn!("Response: error {}, {}", e.http_status_code(), e);
@@ -71,7 +70,7 @@ async fn handler(
async fn handler_inner(
garage: Arc<Garage>,
req: Request<Body>,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let path = req.uri().path().to_string();
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
@@ -180,7 +179,7 @@ async fn handler_inner(
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
.unwrap_or("<invalid utf8>")
);
- let empty_body: BodyType = Box::new(BytesBody::from(vec![]));
+ let empty_body: Body = Body::from(vec![]);
let response = Response::builder()
.header("Location", format!("/{}", bucket))
.body(empty_body)
@@ -189,7 +188,7 @@ async fn handler_inner(
}
&Method::HEAD => {
// HeadBucket
- let empty_body: BodyType = Box::new(BytesBody::from(vec![]));
+ let empty_body: Body = Body::from(vec![]);
let response = Response::builder().body(empty_body).unwrap();
Ok(response)
}
diff --git a/src/api/http_util.rs b/src/api/http_util.rs
deleted file mode 100644
index 8a8cf9d8..00000000
--- a/src/api/http_util.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-use core::pin::Pin;
-use core::task::{Context, Poll};
-
-use futures::ready;
-use futures::stream::*;
-use hyper::body::{Bytes, HttpBody};
-
-use garage_util::error::Error;
-
-pub type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>;
-
-type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
-
-pub struct StreamBody {
- stream: StreamType,
-}
-
-impl StreamBody {
- pub fn new(stream: StreamType) -> Self {
- Self { stream }
- }
-}
-
-impl HttpBody for StreamBody {
- type Data = Bytes;
- type Error = Error;
-
- fn poll_data(
- mut self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Option<Result<Bytes, Self::Error>>> {
- match ready!(self.stream.as_mut().poll_next(cx)) {
- Some(res) => Poll::Ready(Some(res)),
- None => Poll::Ready(None),
- }
- }
-
- fn poll_trailers(
- self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
- Poll::Ready(Ok(None))
- }
-}
-
-pub struct BytesBody {
- bytes: Option<Bytes>,
-}
-
-impl BytesBody {
- pub fn new(bytes: Bytes) -> Self {
- Self { bytes: Some(bytes) }
- }
-}
-
-impl HttpBody for BytesBody {
- type Data = Bytes;
- type Error = Error;
-
- fn poll_data(
- mut self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Option<Result<Bytes, Self::Error>>> {
- Poll::Ready(self.bytes.take().map(Ok))
- }
-
- fn poll_trailers(
- self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
- Poll::Ready(Ok(None))
- }
-}
-
-impl From<String> for BytesBody {
- fn from(x: String) -> BytesBody {
- Self::new(Bytes::from(x))
- }
-}
-impl From<Vec<u8>> for BytesBody {
- fn from(x: Vec<u8>) -> BytesBody {
- Self::new(Bytes::from(x))
- }
-}
-
-pub fn empty_body() -> BodyType {
- Box::new(BytesBody::from(vec![]))
-}
diff --git a/src/api/lib.rs b/src/api/lib.rs
index 5f872941..df2fd045 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -2,7 +2,6 @@
extern crate log;
pub mod encoding;
-pub mod http_util;
pub mod api_server;
pub mod signature;
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index 6a8d8f87..0af1b13a 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -2,7 +2,7 @@ use std::fmt::Write;
use std::sync::Arc;
use chrono::{SecondsFormat, Utc};
-use hyper::Response;
+use hyper::{Body, Response};
use garage_table::*;
use garage_util::data::*;
@@ -13,15 +13,13 @@ use garage_model::garage::Garage;
use garage_model::object_table::*;
use garage_model::version_table::*;
-use crate::http_util::*;
-
pub async fn handle_copy(
garage: Arc<Garage>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
source_key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let source_object = match garage
.object_table
.get(&source_bucket.to_string(), &source_key.to_string())
@@ -116,5 +114,5 @@ pub async fn handle_copy(
writeln!(&mut xml, "\t<LastModified>{}</LastModified>", last_modified).unwrap();
writeln!(&mut xml, "</CopyObjectResult>").unwrap();
- Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
+ Ok(Response::new(Body::from(xml.into_bytes())))
}
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 60714b0c..91e5e20d 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -10,7 +10,6 @@ use garage_model::garage::Garage;
use garage_model::object_table::*;
use crate::encoding::*;
-use crate::http_util::*;
async fn handle_delete_internal(
garage: &Garage,
@@ -70,13 +69,13 @@ pub async fn handle_delete(
garage: Arc<Garage>,
bucket: &str,
key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let (_deleted_version, delete_marker_version) =
handle_delete_internal(&garage, bucket, key).await?;
Ok(Response::builder()
.header("x-amz-version-id", hex::encode(delete_marker_version))
- .body(empty_body())
+ .body(Body::from(vec![]))
.unwrap())
}
@@ -84,7 +83,7 @@ pub async fn handle_delete_objects(
garage: Arc<Garage>,
bucket: &str,
req: Request<Body>,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml)
@@ -130,9 +129,9 @@ pub async fn handle_delete_objects(
writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap();
- Ok(Response::new(Box::new(BytesBody::from(
+ Ok(Response::new(Body::from(
retxml.into_bytes(),
- ))))
+ )))
}
struct DeleteRequest {
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 63200ca3..5f2151b5 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -12,8 +12,6 @@ use garage_table::EmptyKey;
use garage_model::garage::Garage;
use garage_model::object_table::*;
-use crate::http_util::*;
-
fn object_headers(version: &ObjectVersion) -> http::response::Builder {
let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
let date_str = httpdate::fmt_http_date(date);
@@ -29,7 +27,7 @@ pub async fn handle_head(
garage: Arc<Garage>,
bucket: &str,
key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let object = match garage
.object_table
.get(&bucket.to_string(), &key.to_string())
@@ -50,7 +48,7 @@ pub async fn handle_head(
None => return Err(Error::NotFound),
};
- let body: BodyType = Box::new(BytesBody::from(vec![]));
+ let body: Body = Body::from(vec![]);
let response = object_headers(&version)
.status(StatusCode::OK)
.body(body)
@@ -63,7 +61,7 @@ pub async fn handle_get(
req: &Request<Body>,
bucket: &str,
key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let object = match garage
.object_table
.get(&bucket.to_string(), &key.to_string())
@@ -111,7 +109,7 @@ pub async fn handle_get(
))),
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
- let body: BodyType = Box::new(BytesBody::from(bytes.to_vec()));
+ let body: Body = Body::from(bytes.to_vec());
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
@@ -147,7 +145,8 @@ pub async fn handle_get(
}
})
.buffered(2);
- let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
+ //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
+ let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
}
}
@@ -158,7 +157,7 @@ pub async fn handle_get_range(
version: &ObjectVersion,
begin: u64,
end: u64,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
if end > version.size {
return Err(Error::BadRequest(format!("Range not included in file")));
}
@@ -177,9 +176,9 @@ pub async fn handle_get_range(
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
if end as usize <= bytes.len() {
- let body: BodyType = Box::new(BytesBody::from(
+ let body: Body = Body::from(
bytes[begin as usize..end as usize].to_vec(),
- ));
+ );
Ok(resp_builder.body(body)?)
} else {
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
@@ -214,13 +213,14 @@ pub async fn handle_get_range(
} else {
end - block.offset
};
- Ok(Bytes::from(
+ Result::<Bytes,Error>::Ok(Bytes::from(
data[start_in_block as usize..end_in_block as usize].to_vec(),
))
}
})
.buffered(2);
- let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
+ //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
+ let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
}
}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index c4fbf6f2..1f0eccf5 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -3,14 +3,13 @@ use std::fmt::Write;
use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
-use hyper::Response;
+use hyper::{Body, Response};
use garage_util::error::Error;
use garage_model::garage::Garage;
use crate::encoding::*;
-use crate::http_util::*;
#[derive(Debug)]
struct ListResultInfo {
@@ -26,7 +25,7 @@ pub async fn handle_list(
prefix: &str,
marker: Option<&str>,
urlencode_resp: bool,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
let mut result_common_prefixes = BTreeSet::<String>::new();
@@ -141,5 +140,5 @@ pub async fn handle_list(
writeln!(&mut xml, "</ListBucketResult>").unwrap();
println!("{}", xml);
- Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
+ Ok(Response::new(Body::from(xml.into_bytes())))
}
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index bddfa444..c5d0a31c 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -16,14 +16,13 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::encoding::*;
-use crate::http_util::*;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket: &str,
key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
let mime_type = get_mime_type(&req)?;
let body = req.into_body();
@@ -195,10 +194,10 @@ impl BodyChunker {
}
}
-pub fn put_response(version_uuid: UUID) -> Response<BodyType> {
+pub fn put_response(version_uuid: UUID) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
- .body(empty_body())
+ .body(Body::from(vec![]))
.unwrap()
}
@@ -207,7 +206,7 @@ pub async fn handle_create_multipart_upload(
req: &Request<Body>,
bucket: &str,
key: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
let mime_type = get_mime_type(req)?;
@@ -239,7 +238,7 @@ pub async fn handle_create_multipart_upload(
.unwrap();
writeln!(&mut xml, "</InitiateMultipartUploadResult>").unwrap();
- Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
+ Ok(Response::new(Body::from(xml.into_bytes())))
}
pub async fn handle_put_part(
@@ -249,7 +248,7 @@ pub async fn handle_put_part(
key: &str,
part_number_str: &str,
upload_id: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
// Check parameters
let part_number = part_number_str
.parse::<u64>()
@@ -299,7 +298,7 @@ pub async fn handle_put_part(
)
.await?;
- Ok(Response::new(Box::new(BytesBody::from(vec![]))))
+ Ok(Response::new(Body::from(vec![])))
}
pub async fn handle_complete_multipart_upload(
@@ -308,7 +307,7 @@ pub async fn handle_complete_multipart_upload(
bucket: &str,
key: &str,
upload_id: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let version_uuid =
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
@@ -374,7 +373,7 @@ pub async fn handle_complete_multipart_upload(
writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap();
writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap();
- Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
+ Ok(Response::new(Body::from(xml.into_bytes())))
}
pub async fn handle_abort_multipart_upload(
@@ -382,7 +381,7 @@ pub async fn handle_abort_multipart_upload(
bucket: &str,
key: &str,
upload_id: &str,
-) -> Result<Response<BodyType>, Error> {
+) -> Result<Response<Body>, Error> {
let version_uuid =
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
@@ -412,7 +411,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Box::new(BytesBody::from(vec![]))))
+ Ok(Response::new(Body::from(vec![])))
}
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {