diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-10 22:26:48 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-10 22:26:48 +0200 |
commit | a50f07dfdc5b8b60deb79e0724ee140f122228b0 (patch) | |
tree | 769b6d42e59e856c42e69875cab64ec07ad02d4e | |
parent | 3477864142ed09c36abea1111937b829fb41c8a4 (diff) | |
download | garage-a50f07dfdc5b8b60deb79e0724ee140f122228b0.tar.gz garage-a50f07dfdc5b8b60deb79e0724ee140f122228b0.zip |
Refactor
-rw-r--r-- | src/api_server.rs | 75 | ||||
-rw-r--r-- | src/http_util.rs | 82 | ||||
-rw-r--r-- | src/main.rs | 1 |
3 files changed, 87 insertions, 71 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index fec89e93..024da7d5 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -1,12 +1,8 @@ -use core::pin::Pin; -use core::task::{Context, Poll}; - use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; use futures::future::Future; -use futures::ready; use futures::stream::*; use hyper::body::{Bytes, HttpBody}; use hyper::server::conn::AddrStream; @@ -16,6 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use crate::data; use crate::data::*; use crate::error::Error; +use crate::http_util::*; use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; @@ -311,9 +308,9 @@ async fn handle_get( } }) .buffered(2); - let body: BodyType = Box::new(NonSyncStreamBody { - stream: Box::pin(body_stream), - }); + let body: BodyType = Box::new(StreamBody::new( + Box::pin(body_stream), + )); Ok(resp_builder.body(body)?) } } @@ -344,67 +341,3 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { } Err(Error::Message(format!("No valid blocks returned"))) } - -pub struct NonSyncStreamBody { - pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, -} - -impl HttpBody for NonSyncStreamBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll<Option<Result<Bytes, Self::Error>>> { - match ready!(self.stream.as_mut().poll_next(cx)) { - Some(res) => Poll::Ready(Some(res)), - None => Poll::Ready(None), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -pub struct BytesBody { - pub bytes: Option<Bytes>, -} - -impl HttpBody for BytesBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - mut self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll<Option<Result<Bytes, Self::Error>>> { - Poll::Ready(self.bytes.take().map(Ok)) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -impl From<String> for BytesBody { - fn from(x: String) -> BytesBody { - BytesBody { - bytes: Some(Bytes::from(x.into_bytes())), - } - } -} -impl From<Vec<u8>> for BytesBody { - fn from(x: Vec<u8>) -> BytesBody { - BytesBody { - bytes: Some(Bytes::from(x)), - } - } -} diff --git a/src/http_util.rs b/src/http_util.rs new file mode 100644 index 00000000..24e64c36 --- /dev/null +++ b/src/http_util.rs @@ -0,0 +1,82 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures::ready; +use futures::stream::*; +use hyper::body::{Bytes, HttpBody}; + +use crate::error::Error; + +type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>; + +pub struct StreamBody { + stream: StreamType, +} + +impl StreamBody { + pub fn new(stream: StreamType) -> Self { + Self{stream} + } +} + +impl HttpBody for StreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(res) => Poll::Ready(Some(res)), + None => Poll::Ready(None), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +pub struct BytesBody { + bytes: Option<Bytes>, +} + +impl BytesBody { + pub fn new(bytes: Bytes) -> Self { + Self{bytes: Some(bytes)} + } +} + +impl HttpBody for BytesBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + Poll::Ready(self.bytes.take().map(Ok)) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +impl From<String> for BytesBody { + fn from(x: String) -> BytesBody { + Self::new(Bytes::from(x)) + } +} +impl From<Vec<u8>> for BytesBody { + fn from(x: Vec<u8>) -> BytesBody { + Self::new(Bytes::from(x)) + } +} diff --git a/src/main.rs b/src/main.rs index 69fb2863..7c2af45b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod object_table; mod version_table; mod api_server; +mod http_util; mod rpc_client; mod rpc_server; mod server; |