aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:26:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:26:48 +0200
commita50f07dfdc5b8b60deb79e0724ee140f122228b0 (patch)
tree769b6d42e59e856c42e69875cab64ec07ad02d4e /src
parent3477864142ed09c36abea1111937b829fb41c8a4 (diff)
downloadgarage-a50f07dfdc5b8b60deb79e0724ee140f122228b0.tar.gz
garage-a50f07dfdc5b8b60deb79e0724ee140f122228b0.zip
Refactor
Diffstat (limited to 'src')
-rw-r--r--src/api_server.rs75
-rw-r--r--src/http_util.rs82
-rw-r--r--src/main.rs1
3 files changed, 87 insertions, 71 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index fec89e93..024da7d5 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -1,12 +1,8 @@
-use core::pin::Pin;
-use core::task::{Context, Poll};
-
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use futures::future::Future;
-use futures::ready;
use futures::stream::*;
use hyper::body::{Bytes, HttpBody};
use hyper::server::conn::AddrStream;
@@ -16,6 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::data;
use crate::data::*;
use crate::error::Error;
+use crate::http_util::*;
use crate::proto::*;
use crate::rpc_client::*;
use crate::server::Garage;
@@ -311,9 +308,9 @@ async fn handle_get(
}
})
.buffered(2);
- let body: BodyType = Box::new(NonSyncStreamBody {
- stream: Box::pin(body_stream),
- });
+ let body: BodyType = Box::new(StreamBody::new(
+ Box::pin(body_stream),
+ ));
Ok(resp_builder.body(body)?)
}
}
@@ -344,67 +341,3 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
}
Err(Error::Message(format!("No valid blocks returned")))
}
-
-pub struct NonSyncStreamBody {
- pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>,
-}
-
-impl HttpBody for NonSyncStreamBody {
- type Data = Bytes;
- type Error = Error;
-
- fn poll_data(
- mut self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Option<Result<Bytes, Self::Error>>> {
- match ready!(self.stream.as_mut().poll_next(cx)) {
- Some(res) => Poll::Ready(Some(res)),
- None => Poll::Ready(None),
- }
- }
-
- fn poll_trailers(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
- Poll::Ready(Ok(None))
- }
-}
-
-pub struct BytesBody {
- pub bytes: Option<Bytes>,
-}
-
-impl HttpBody for BytesBody {
- type Data = Bytes;
- type Error = Error;
-
- fn poll_data(
- mut self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Option<Result<Bytes, Self::Error>>> {
- Poll::Ready(self.bytes.take().map(Ok))
- }
-
- fn poll_trailers(
- self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
- Poll::Ready(Ok(None))
- }
-}
-
-impl From<String> for BytesBody {
- fn from(x: String) -> BytesBody {
- BytesBody {
- bytes: Some(Bytes::from(x.into_bytes())),
- }
- }
-}
-impl From<Vec<u8>> for BytesBody {
- fn from(x: Vec<u8>) -> BytesBody {
- BytesBody {
- bytes: Some(Bytes::from(x)),
- }
- }
-}
diff --git a/src/http_util.rs b/src/http_util.rs
new file mode 100644
index 00000000..24e64c36
--- /dev/null
+++ b/src/http_util.rs
@@ -0,0 +1,82 @@
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+use futures::ready;
+use futures::stream::*;
+use hyper::body::{Bytes, HttpBody};
+
+use crate::error::Error;
+
+type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
+
+pub struct StreamBody {
+ stream: StreamType,
+}
+
+impl StreamBody {
+ pub fn new(stream: StreamType) -> Self {
+ Self{stream}
+ }
+}
+
+impl HttpBody for StreamBody {
+ type Data = Bytes;
+ type Error = Error;
+
+ fn poll_data(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Result<Bytes, Self::Error>>> {
+ match ready!(self.stream.as_mut().poll_next(cx)) {
+ Some(res) => Poll::Ready(Some(res)),
+ None => Poll::Ready(None),
+ }
+ }
+
+ fn poll_trailers(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
+ Poll::Ready(Ok(None))
+ }
+}
+
+pub struct BytesBody {
+ bytes: Option<Bytes>,
+}
+
+impl BytesBody {
+ pub fn new(bytes: Bytes) -> Self {
+ Self{bytes: Some(bytes)}
+ }
+}
+
+impl HttpBody for BytesBody {
+ type Data = Bytes;
+ type Error = Error;
+
+ fn poll_data(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Option<Result<Bytes, Self::Error>>> {
+ Poll::Ready(self.bytes.take().map(Ok))
+ }
+
+ fn poll_trailers(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
+ Poll::Ready(Ok(None))
+ }
+}
+
+impl From<String> for BytesBody {
+ fn from(x: String) -> BytesBody {
+ Self::new(Bytes::from(x))
+ }
+}
+impl From<Vec<u8>> for BytesBody {
+ fn from(x: Vec<u8>) -> BytesBody {
+ Self::new(Bytes::from(x))
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 69fb2863..7c2af45b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,6 +10,7 @@ mod object_table;
mod version_table;
mod api_server;
+mod http_util;
mod rpc_client;
mod rpc_server;
mod server;