diff options
author | Alex Auvolat <alex@adnab.me> | 2020-07-07 17:15:53 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-07-07 17:15:53 +0200 |
commit | f22ecb60a8e1848de95e1bd3104b0ceec7058f0c (patch) | |
tree | 1e51db048f17295a070aae4025bfa16036a3c0fd /src | |
parent | 3b0b11085e0501afcc16f6a75632a4d425146158 (diff) | |
download | garage-f22ecb60a8e1848de95e1bd3104b0ceec7058f0c.tar.gz garage-f22ecb60a8e1848de95e1bd3104b0ceec7058f0c.zip |
Update to Hyper 0.13.6 that accepts non-Sync streams in wrap_stream.
Simplifies code and makes it possible to publish on crates.io
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/api_server.rs | 11 | ||||
-rw-r--r-- | src/api/http_util.rs | 88 | ||||
-rw-r--r-- | src/api/lib.rs | 1 | ||||
-rw-r--r-- | src/api/s3_copy.rs | 8 | ||||
-rw-r--r-- | src/api/s3_delete.rs | 11 | ||||
-rw-r--r-- | src/api/s3_get.rs | 24 | ||||
-rw-r--r-- | src/api/s3_list.rs | 7 | ||||
-rw-r--r-- | src/api/s3_put.rs | 23 |
9 files changed, 40 insertions, 135 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 410adbb3..f9cd32b2 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -30,7 +30,7 @@ futures-util = "0.3" tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } http = "0.2" -hyper = "0.13" +hyper = "^0.13.6" url = "2.1" httpdate = "0.3" percent-encoding = "2.1.0" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 699dc5c4..92a9f2a6 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -11,7 +11,6 @@ use garage_util::error::Error; use garage_model::garage::Garage; -use crate::http_util::*; use crate::signature::check_signature; use crate::s3_copy::*; @@ -50,7 +49,7 @@ async fn handler( garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { info!("{} {} {}", addr, req.method(), req.uri()); debug!("{:?}", req); match handler_inner(garage, req).await { @@ -59,7 +58,7 @@ async fn handler( Ok(x) } Err(e) => { - let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e))); + let body: Body = Body::from(format!("{}\n", e)); let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); warn!("Response: error {}, {}", e.http_status_code(), e); @@ -71,7 +70,7 @@ async fn handler( async fn handler_inner( garage: Arc<Garage>, req: Request<Body>, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let path = req.uri().path().to_string(); let path = percent_encoding::percent_decode_str(&path).decode_utf8()?; @@ -180,7 +179,7 @@ async fn handler_inner( std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) .unwrap_or("<invalid utf8>") ); - let empty_body: BodyType = Box::new(BytesBody::from(vec![])); + let empty_body: Body = Body::from(vec![]); let response = Response::builder() .header("Location", format!("/{}", bucket)) .body(empty_body) @@ -189,7 +188,7 @@ async fn handler_inner( } &Method::HEAD => { // HeadBucket - let empty_body: BodyType = Box::new(BytesBody::from(vec![])); + let empty_body: Body = Body::from(vec![]); let response = Response::builder().body(empty_body).unwrap(); Ok(response) } diff --git a/src/api/http_util.rs b/src/api/http_util.rs deleted file mode 100644 index 8a8cf9d8..00000000 --- a/src/api/http_util.rs +++ /dev/null @@ -1,88 +0,0 @@ -use core::pin::Pin; -use core::task::{Context, Poll}; - -use futures::ready; -use futures::stream::*; -use hyper::body::{Bytes, HttpBody}; - -use garage_util::error::Error; - -pub type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>; - -type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>; - -pub struct StreamBody { - stream: StreamType, -} - -impl StreamBody { - pub fn new(stream: StreamType) -> Self { - Self { stream } - } -} - -impl HttpBody for StreamBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll<Option<Result<Bytes, Self::Error>>> { - match ready!(self.stream.as_mut().poll_next(cx)) { - Some(res) => Poll::Ready(Some(res)), - None => Poll::Ready(None), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -pub struct BytesBody { - bytes: Option<Bytes>, -} - -impl BytesBody { - pub fn new(bytes: Bytes) -> Self { - Self { bytes: Some(bytes) } - } -} - -impl HttpBody for BytesBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - mut self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll<Option<Result<Bytes, Self::Error>>> { - Poll::Ready(self.bytes.take().map(Ok)) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -impl From<String> for BytesBody { - fn from(x: String) -> BytesBody { - Self::new(Bytes::from(x)) - } -} -impl From<Vec<u8>> for BytesBody { - fn from(x: Vec<u8>) -> BytesBody { - Self::new(Bytes::from(x)) - } -} - -pub fn empty_body() -> BodyType { - Box::new(BytesBody::from(vec![])) -} diff --git a/src/api/lib.rs b/src/api/lib.rs index 5f872941..df2fd045 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -2,7 +2,6 @@ extern crate log; pub mod encoding; -pub mod http_util; pub mod api_server; pub mod signature; diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 6a8d8f87..0af1b13a 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -2,7 +2,7 @@ use std::fmt::Write; use std::sync::Arc; use chrono::{SecondsFormat, Utc}; -use hyper::Response; +use hyper::{Body, Response}; use garage_table::*; use garage_util::data::*; @@ -13,15 +13,13 @@ use garage_model::garage::Garage; use garage_model::object_table::*; use garage_model::version_table::*; -use crate::http_util::*; - pub async fn handle_copy( garage: Arc<Garage>, dest_bucket: &str, dest_key: &str, source_bucket: &str, source_key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let source_object = match garage .object_table .get(&source_bucket.to_string(), &source_key.to_string()) @@ -116,5 +114,5 @@ pub async fn handle_copy( writeln!(&mut xml, "\t<LastModified>{}</LastModified>", last_modified).unwrap(); writeln!(&mut xml, "</CopyObjectResult>").unwrap(); - Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) + Ok(Response::new(Body::from(xml.into_bytes()))) } diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 60714b0c..91e5e20d 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -10,7 +10,6 @@ use garage_model::garage::Garage; use garage_model::object_table::*; use crate::encoding::*; -use crate::http_util::*; async fn handle_delete_internal( garage: &Garage, @@ -70,13 +69,13 @@ pub async fn handle_delete( garage: Arc<Garage>, bucket: &str, key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let (_deleted_version, delete_marker_version) = handle_delete_internal(&garage, bucket, key).await?; Ok(Response::builder() .header("x-amz-version-id", hex::encode(delete_marker_version)) - .body(empty_body()) + .body(Body::from(vec![])) .unwrap()) } @@ -84,7 +83,7 @@ pub async fn handle_delete_objects( garage: Arc<Garage>, bucket: &str, req: Request<Body>, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml) @@ -130,9 +129,9 @@ pub async fn handle_delete_objects( writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap(); - Ok(Response::new(Box::new(BytesBody::from( + Ok(Response::new(Body::from( retxml.into_bytes(), - )))) + ))) } struct DeleteRequest { diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 63200ca3..5f2151b5 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -12,8 +12,6 @@ use garage_table::EmptyKey; use garage_model::garage::Garage; use garage_model::object_table::*; -use crate::http_util::*; - fn object_headers(version: &ObjectVersion) -> http::response::Builder { let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let date_str = httpdate::fmt_http_date(date); @@ -29,7 +27,7 @@ pub async fn handle_head( garage: Arc<Garage>, bucket: &str, key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let object = match garage .object_table .get(&bucket.to_string(), &key.to_string()) @@ -50,7 +48,7 @@ pub async fn handle_head( None => return Err(Error::NotFound), }; - let body: BodyType = Box::new(BytesBody::from(vec![])); + let body: Body = Body::from(vec![]); let response = object_headers(&version) .status(StatusCode::OK) .body(body) @@ -63,7 +61,7 @@ pub async fn handle_get( req: &Request<Body>, bucket: &str, key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let object = match garage .object_table .get(&bucket.to_string(), &key.to_string()) @@ -111,7 +109,7 @@ pub async fn handle_get( ))), ObjectVersionData::DeleteMarker => Err(Error::NotFound), ObjectVersionData::Inline(bytes) => { - let body: BodyType = Box::new(BytesBody::from(bytes.to_vec())); + let body: Body = Body::from(bytes.to_vec()); Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { @@ -147,7 +145,8 @@ pub async fn handle_get( } }) .buffered(2); - let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream))); + let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) } } @@ -158,7 +157,7 @@ pub async fn handle_get_range( version: &ObjectVersion, begin: u64, end: u64, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { if end > version.size { return Err(Error::BadRequest(format!("Range not included in file"))); } @@ -177,9 +176,9 @@ pub async fn handle_get_range( ObjectVersionData::DeleteMarker => Err(Error::NotFound), ObjectVersionData::Inline(bytes) => { if end as usize <= bytes.len() { - let body: BodyType = Box::new(BytesBody::from( + let body: Body = Body::from( bytes[begin as usize..end as usize].to_vec(), - )); + ); Ok(resp_builder.body(body)?) } else { Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) @@ -214,13 +213,14 @@ pub async fn handle_get_range( } else { end - block.offset }; - Ok(Bytes::from( + Result::<Bytes,Error>::Ok(Bytes::from( data[start_in_block as usize..end_in_block as usize].to_vec(), )) } }) .buffered(2); - let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream))); + let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) } } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index c4fbf6f2..1f0eccf5 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -3,14 +3,13 @@ use std::fmt::Write; use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc}; -use hyper::Response; +use hyper::{Body, Response}; use garage_util::error::Error; use garage_model::garage::Garage; use crate::encoding::*; -use crate::http_util::*; #[derive(Debug)] struct ListResultInfo { @@ -26,7 +25,7 @@ pub async fn handle_list( prefix: &str, marker: Option<&str>, urlencode_resp: bool, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let mut result_keys = BTreeMap::<String, ListResultInfo>::new(); let mut result_common_prefixes = BTreeSet::<String>::new(); @@ -141,5 +140,5 @@ pub async fn handle_list( writeln!(&mut xml, "</ListBucketResult>").unwrap(); println!("{}", xml); - Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) + Ok(Response::new(Body::from(xml.into_bytes()))) } diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index bddfa444..c5d0a31c 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -16,14 +16,13 @@ use garage_model::object_table::*; use garage_model::version_table::*; use crate::encoding::*; -use crate::http_util::*; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, bucket: &str, key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); let mime_type = get_mime_type(&req)?; let body = req.into_body(); @@ -195,10 +194,10 @@ impl BodyChunker { } } -pub fn put_response(version_uuid: UUID) -> Response<BodyType> { +pub fn put_response(version_uuid: UUID) -> Response<Body> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) - .body(empty_body()) + .body(Body::from(vec![])) .unwrap() } @@ -207,7 +206,7 @@ pub async fn handle_create_multipart_upload( req: &Request<Body>, bucket: &str, key: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); let mime_type = get_mime_type(req)?; @@ -239,7 +238,7 @@ pub async fn handle_create_multipart_upload( .unwrap(); writeln!(&mut xml, "</InitiateMultipartUploadResult>").unwrap(); - Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) + Ok(Response::new(Body::from(xml.into_bytes()))) } pub async fn handle_put_part( @@ -249,7 +248,7 @@ pub async fn handle_put_part( key: &str, part_number_str: &str, upload_id: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { // Check parameters let part_number = part_number_str .parse::<u64>() @@ -299,7 +298,7 @@ pub async fn handle_put_part( ) .await?; - Ok(Response::new(Box::new(BytesBody::from(vec![])))) + Ok(Response::new(Body::from(vec![]))) } pub async fn handle_complete_multipart_upload( @@ -308,7 +307,7 @@ pub async fn handle_complete_multipart_upload( bucket: &str, key: &str, upload_id: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; @@ -374,7 +373,7 @@ pub async fn handle_complete_multipart_upload( writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap(); writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap(); - Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) + Ok(Response::new(Body::from(xml.into_bytes()))) } pub async fn handle_abort_multipart_upload( @@ -382,7 +381,7 @@ pub async fn handle_abort_multipart_upload( bucket: &str, key: &str, upload_id: &str, -) -> Result<Response<BodyType>, Error> { +) -> Result<Response<Body>, Error> { let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; @@ -412,7 +411,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Box::new(BytesBody::from(vec![])))) + Ok(Response::new(Body::from(vec![]))) } fn get_mime_type(req: &Request<Body>) -> Result<String, Error> { |