diff options
Diffstat (limited to 'src/api_server.rs')
-rw-r--r-- | src/api_server.rs | 281 |
1 files changed, 202 insertions, 79 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index a5e6e322..fec89e93 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -1,26 +1,35 @@ -use std::sync::Arc; -use std::net::SocketAddr; +use core::pin::Pin; +use core::task::{Context, Poll}; + use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::Arc; +use futures::future::Future; +use futures::ready; use futures::stream::*; -use hyper::service::{make_service_fn, service_fn}; +use hyper::body::{Bytes, HttpBody}; use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use hyper::body::Bytes; -use futures::future::Future; -use crate::error::Error; -use crate::data::*; use crate::data; +use crate::data::*; +use crate::error::Error; use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; use crate::table::EmptySortKey; -pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { - let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); +type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>; + +pub async fn run_api_server( + garage: Arc<Garage>, + shutdown_signal: impl Future<Output = ()>, +) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); - let service = make_service_fn(|conn: &AddrStream| { + let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); let client_addr = conn.remote_addr(); async move { @@ -31,59 +40,72 @@ pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou } }); - let server = Server::bind(&addr).serve(service); + let server = Server::bind(&addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("API server listening on http://{}", addr); + println!("API server listening on http://{}", addr); graceful.await } -async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { +async fn handler( + garage: Arc<Garage>, + req: Request<Body>, + addr: SocketAddr, +) -> Result<Response<BodyType>, Error> { match handler_inner(garage, req, addr).await { Ok(x) => Ok(x), Err(e) => { - let mut http_error = Response::new(Body::from(format!("{}\n", e))); + let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e))); + let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); Ok(http_error) } } } -async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { +async fn handler_inner( + garage: Arc<Garage>, + req: Request<Body>, + addr: SocketAddr, +) -> Result<Response<BodyType>, Error> { eprintln!("{} {} {}", addr, req.method(), req.uri()); - let bucket = req.headers() + let bucket = req + .headers() .get(hyper::header::HOST) .map(|x| x.to_str().map_err(Error::from)) .unwrap_or(Err(Error::BadRequest(format!("Host: header missing"))))? .to_lowercase(); let key = req.uri().path().to_string(); - match req.method() { - &Method::GET => { - Ok(handle_get(garage, &bucket, &key).await?) - } + match req.method() { + &Method::GET => Ok(handle_get(garage, &bucket, &key).await?), &Method::PUT => { - let mime_type = req.headers() + let mime_type = req + .headers() .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) .unwrap_or(Ok("blob"))? .to_string(); - let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; - Ok(Response::new(Body::from( - format!("{:?}\n", version_uuid), - ))) + let version_uuid = + handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; + Ok(Response::new(Box::new(BytesBody::from(format!( + "{:?}\n", + version_uuid + ))))) } - _ => Err(Error::BadRequest(format!("Invalid method"))), - } + _ => Err(Error::BadRequest(format!("Invalid method"))), + } } -async fn handle_put(garage: Arc<Garage>, - mime_type: &str, - bucket: &str, key: &str, body: Body) - -> Result<UUID, Error> -{ +async fn handle_put( + garage: Arc<Garage>, + mime_type: &str, + bucket: &str, + key: &str, + body: Body, +) -> Result<UUID, Error> { let version_uuid = gen_uuid(); let mut chunker = BodyChunker::new(body, garage.system.config.block_size); @@ -97,7 +119,7 @@ async fn handle_put(garage: Arc<Garage>, key: key.into(), versions: Vec::new(), }; - object.versions.push(Box::new(ObjectVersion{ + object.versions.push(Box::new(ObjectVersion { uuid: version_uuid.clone(), timestamp: now_msec(), mime_type: mime_type.to_string(), @@ -110,7 +132,7 @@ async fn handle_put(garage: Arc<Garage>, object.versions[0].data = ObjectVersionData::Inline(first_block); object.versions[0].is_complete = true; garage.object_table.insert(&object).await?; - return Ok(version_uuid) + return Ok(version_uuid); } let version = Version { @@ -126,15 +148,22 @@ async fn handle_put(garage: Arc<Garage>, garage.object_table.insert(&object).await?; let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); + let mut put_curr_version_block = + put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); loop { - let (_, _, next_block) = futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = put_version_block(garage.clone(), &version, next_offset as u64, block_hash.clone()); + put_curr_version_block = put_version_block( + garage.clone(), + &version, + next_offset as u64, + block_hash.clone(), + ); put_curr_block = put_block(garage.clone(), block_hash, block); next_offset += block_len; } else { @@ -150,27 +179,33 @@ async fn handle_put(garage: Arc<Garage>, Ok(version_uuid) } -async fn put_version_block(garage: Arc<Garage>, version: &Version, offset: u64, hash: Hash) -> Result<(), Error> { +async fn put_version_block( + garage: Arc<Garage>, + version: &Version, + offset: u64, + hash: Hash, +) -> Result<(), Error> { let mut version = version.clone(); - version.blocks.push(VersionBlock{ - offset, - hash, - }); + version.blocks.push(VersionBlock { offset, hash }); garage.version_table.insert(&version).await?; Ok(()) } async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let who = garage.system.members.read().await + let who = garage + .system + .members + .read() + .await .walk_ring(&hash, garage.system.config.meta_replication_factor); - rpc_try_call_many(garage.system.clone(), - &who[..], - &Message::PutBlock(PutBlockMessage{ - hash, - data, - }), - (garage.system.config.meta_replication_factor+1)/2, - DEFAULT_TIMEOUT).await?; + rpc_try_call_many( + garage.system.clone(), + &who[..], + &Message::PutBlock(PutBlockMessage { hash, data }), + (garage.system.config.meta_replication_factor + 1) / 2, + DEFAULT_TIMEOUT, + ) + .await?; Ok(()) } @@ -183,7 +218,7 @@ struct BodyChunker { impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { - Self{ + Self { body, read_all: false, block_size, @@ -203,26 +238,36 @@ impl BodyChunker { if self.buf.len() == 0 { Ok(None) } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..) - .collect::<Vec<u8>>(); + let block = self.buf.drain(..).collect::<Vec<u8>>(); Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size) - .collect::<Vec<u8>>(); + let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); Ok(Some(block)) } } } -async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Response<Body>, Error> { - let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? { +async fn handle_get( + garage: Arc<Garage>, + bucket: &str, + key: &str, +) -> Result<Response<BodyType>, Error> { + let mut object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { None => return Err(Error::NotFound), - Some(o) => o + Some(o) => o, }; - let last_v = match object.versions.drain(..) - .rev().filter(|v| v.is_complete) - .next() { + let last_v = match object + .versions + .drain(..) + .rev() + .filter(|v| v.is_complete) + .next() + { Some(v) => v, None => return Err(Error::NotFound), }; @@ -234,7 +279,8 @@ async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Resp match last_v.data { ObjectVersionData::DeleteMarker => Err(Error::NotFound), ObjectVersionData::Inline(bytes) => { - Ok(resp_builder.body(bytes.into())?) + let body: BodyType = Box::new(BytesBody::from(bytes)); + Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { let read_first_block = get_block(garage.clone(), &first_block_hash); @@ -246,42 +292,119 @@ async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Resp None => return Err(Error::NotFound), }; - let mut blocks = version.blocks.iter() + let mut blocks = version + .blocks + .iter() .map(|vb| (vb.hash.clone(), None)) .collect::<Vec<_>>(); blocks[0].1 = Some(first_block); - let block_futures = blocks.drain(..) - .map(move |(hash, data_opt)| async { + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { if let Some(data) = data_opt { - Ok(data) + Ok(Bytes::from(data)) } else { - get_block(garage.clone(), &hash).await - .map_err(|e| format!("{}", e)) + get_block(garage.clone(), &hash).await.map(Bytes::from) } - }); - let body_stream = futures::stream::iter(block_futures).buffered(2); - let body = Body::wrap_stream(body_stream); + } + }) + .buffered(2); + let body: BodyType = Box::new(NonSyncStreamBody { + stream: Box::pin(body_stream), + }); Ok(resp_builder.body(body)?) } } } async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { - let who = garage.system.members.read().await + let who = garage + .system + .members + .read() + .await .walk_ring(&hash, garage.system.config.meta_replication_factor); - let resps = rpc_try_call_many(garage.system.clone(), - &who[..], - &Message::GetBlock(hash.clone()), - 1, - DEFAULT_TIMEOUT).await?; + let resps = rpc_try_call_many( + garage.system.clone(), + &who[..], + &Message::GetBlock(hash.clone()), + 1, + DEFAULT_TIMEOUT, + ) + .await?; for resp in resps { if let Message::PutBlock(pbm) = resp { if data::hash(&pbm.data) == *hash { - return Ok(pbm.data) + return Ok(pbm.data); } } } Err(Error::Message(format!("No valid blocks returned"))) } + +pub struct NonSyncStreamBody { + pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, +} + +impl HttpBody for NonSyncStreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(res) => Poll::Ready(Some(res)), + None => Poll::Ready(None), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +pub struct BytesBody { + pub bytes: Option<Bytes>, +} + +impl HttpBody for BytesBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + Poll::Ready(self.bytes.take().map(Ok)) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +impl From<String> for BytesBody { + fn from(x: String) -> BytesBody { + BytesBody { + bytes: Some(Bytes::from(x.into_bytes())), + } + } +} +impl From<Vec<u8>> for BytesBody { + fn from(x: Vec<u8>) -> BytesBody { + BytesBody { + bytes: Some(Bytes::from(x)), + } + } +} |