diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/api_server.rs | 360 | ||||
-rw-r--r-- | src/api/http_util.rs | 82 | ||||
-rw-r--r-- | src/api/mod.rs | 2 |
3 files changed, 444 insertions, 0 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs new file mode 100644 index 00000000..a80b2ea2 --- /dev/null +++ b/src/api/api_server.rs @@ -0,0 +1,360 @@ +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::Arc; + +use futures::future::Future; +use futures::stream::*; +use hyper::body::{Bytes, HttpBody}; +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; + +use crate::data::*; +use crate::error::Error; +use crate::server::Garage; + +use crate::table::EmptyKey; + +use crate::store::block::INLINE_THRESHOLD; +use crate::store::block_ref_table::*; +use crate::store::object_table::*; +use crate::store::version_table::*; + +use crate::api::http_util::*; + +type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>; + +pub async fn run_api_server( + garage: Arc<Garage>, + shutdown_signal: impl Future<Output = ()>, +) -> Result<(), Error> { + let addr = &garage.config.api_bind_addr; + + let service = make_service_fn(|conn: &AddrStream| { + let garage = garage.clone(); + let client_addr = conn.remote_addr(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let garage = garage.clone(); + handler(garage, req, client_addr) + })) + } + }); + + let server = Server::bind(&addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("API server listening on http://{}", addr); + + graceful.await?; + Ok(()) +} + +async fn handler( + garage: Arc<Garage>, + req: Request<Body>, + addr: SocketAddr, +) -> Result<Response<BodyType>, Error> { + match handler_inner(garage, req, addr).await { + Ok(x) => Ok(x), + Err(e) => { + let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e))); + let mut http_error = Response::new(body); + *http_error.status_mut() = e.http_status_code(); + Ok(http_error) + } + } +} + +async fn handler_inner( + garage: Arc<Garage>, + req: Request<Body>, + addr: SocketAddr, +) -> Result<Response<BodyType>, Error> { + info!("{} {} {}", addr, req.method(), req.uri()); + + let bucket = req + .headers() + .get(hyper::header::HOST) + .map(|x| x.to_str().map_err(Error::from)) + .unwrap_or(Err(Error::BadRequest(format!("Host: header missing"))))? + .to_lowercase(); + let key = req.uri().path().to_string(); + + match req.method() { + &Method::GET => Ok(handle_get(garage, &bucket, &key).await?), + &Method::PUT => { + let mime_type = req + .headers() + .get(hyper::header::CONTENT_TYPE) + .map(|x| x.to_str()) + .unwrap_or(Ok("blob"))? + .to_string(); + let version_uuid = + handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; + let response = format!("{}\n", hex::encode(version_uuid,)); + Ok(Response::new(Box::new(BytesBody::from(response)))) + } + &Method::DELETE => { + let version_uuid = handle_delete(garage, &bucket, &key).await?; + let response = format!("{}\n", hex::encode(version_uuid,)); + Ok(Response::new(Box::new(BytesBody::from(response)))) + } + _ => Err(Error::BadRequest(format!("Invalid method"))), + } +} + +async fn handle_put( + garage: Arc<Garage>, + mime_type: &str, + bucket: &str, + key: &str, + body: Body, +) -> Result<UUID, Error> { + let version_uuid = gen_uuid(); + + let mut chunker = BodyChunker::new(body, garage.config.block_size); + let first_block = match chunker.next().await? { + Some(x) => x, + None => return Err(Error::BadRequest(format!("Empty body"))), + }; + + let mut object = Object { + bucket: bucket.into(), + key: key.into(), + versions: Vec::new(), + }; + object.versions.push(Box::new(ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: mime_type.to_string(), + size: first_block.len() as u64, + is_complete: false, + data: ObjectVersionData::DeleteMarker, + })); + + if first_block.len() < INLINE_THRESHOLD { + object.versions[0].data = ObjectVersionData::Inline(first_block); + object.versions[0].is_complete = true; + garage.object_table.insert(&object).await?; + return Ok(version_uuid); + } + + let version = Version { + uuid: version_uuid, + deleted: false, + blocks: Vec::new(), + bucket: bucket.into(), + key: key.into(), + }; + + let first_block_hash = hash(&first_block[..]); + object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash); + garage.object_table.insert(&object).await?; + + let mut next_offset = first_block.len(); + let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); + let mut put_curr_block = garage + .block_manager + .rpc_put_block(first_block_hash, first_block); + + loop { + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + if let Some(block) = next_block { + let block_hash = hash(&block[..]); + let block_len = block.len(); + put_curr_version_block = + put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); + put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); + next_offset += block_len; + } else { + break; + } + } + + // TODO: if at any step we have an error, we should undo everything we did + + object.versions[0].is_complete = true; + object.versions[0].size = next_offset as u64; + garage.object_table.insert(&object).await?; + Ok(version_uuid) +} + +async fn put_block_meta( + garage: Arc<Garage>, + version: &Version, + offset: u64, + hash: Hash, +) -> Result<(), Error> { + let mut version = version.clone(); + version.blocks.push(VersionBlock { offset, hash: hash }); + + let block_ref = BlockRef { + block: hash, + version: version.uuid, + deleted: false, + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) +} + +struct BodyChunker { + body: Body, + read_all: bool, + block_size: usize, + buf: VecDeque<u8>, +} + +impl BodyChunker { + fn new(body: Body, block_size: usize) -> Self { + Self { + body, + read_all: false, + block_size, + buf: VecDeque::new(), + } + } + async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { + while !self.read_all && self.buf.len() < self.block_size { + if let Some(block) = self.body.next().await { + let bytes = block?; + trace!("Body next: {} bytes", bytes.len()); + self.buf.extend(&bytes[..]); + } else { + self.read_all = true; + } + } + if self.buf.len() == 0 { + Ok(None) + } else if self.buf.len() <= self.block_size { + let block = self.buf.drain(..).collect::<Vec<u8>>(); + Ok(Some(block)) + } else { + let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); + Ok(Some(block)) + } + } +} + +async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<UUID, Error> { + let exists = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => false, + Some(o) => { + let mut has_active_version = false; + for v in o.versions.iter() { + if v.data != ObjectVersionData::DeleteMarker { + has_active_version = true; + break; + } + } + has_active_version + } + }; + + if !exists { + // No need to delete + return Ok([0u8; 32].into()); + } + + let version_uuid = gen_uuid(); + + let mut object = Object { + bucket: bucket.into(), + key: key.into(), + versions: Vec::new(), + }; + object.versions.push(Box::new(ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + is_complete: true, + data: ObjectVersionData::DeleteMarker, + })); + + garage.object_table.insert(&object).await?; + return Ok(version_uuid); +} + +async fn handle_get( + garage: Arc<Garage>, + bucket: &str, + key: &str, +) -> Result<Response<BodyType>, Error> { + let mut object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let last_v = match object + .versions + .drain(..) + .rev() + .filter(|v| v.is_complete) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let resp_builder = Response::builder() + .header("Content-Type", last_v.mime_type) + .status(StatusCode::OK); + + match last_v.data { + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { + let body: BodyType = Box::new(BytesBody::from(bytes)); + Ok(resp_builder.body(body)?) + } + ObjectVersionData::FirstBlock(first_block_hash) => { + let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let mut blocks = version + .blocks + .iter() + .map(|vb| (vb.hash, None)) + .collect::<Vec<_>>(); + blocks[0].1 = Some(first_block); + + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { + if let Some(data) = data_opt { + Ok(Bytes::from(data)) + } else { + garage + .block_manager + .rpc_get_block(&hash) + .await + .map(Bytes::from) + } + } + }) + .buffered(2); + let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + Ok(resp_builder.body(body)?) + } + } +} diff --git a/src/api/http_util.rs b/src/api/http_util.rs new file mode 100644 index 00000000..228448f0 --- /dev/null +++ b/src/api/http_util.rs @@ -0,0 +1,82 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures::ready; +use futures::stream::*; +use hyper::body::{Bytes, HttpBody}; + +use crate::error::Error; + +type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>; + +pub struct StreamBody { + stream: StreamType, +} + +impl StreamBody { + pub fn new(stream: StreamType) -> Self { + Self { stream } + } +} + +impl HttpBody for StreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(res) => Poll::Ready(Some(res)), + None => Poll::Ready(None), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +pub struct BytesBody { + bytes: Option<Bytes>, +} + +impl BytesBody { + pub fn new(bytes: Bytes) -> Self { + Self { bytes: Some(bytes) } + } +} + +impl HttpBody for BytesBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Option<Result<Bytes, Self::Error>>> { + Poll::Ready(self.bytes.take().map(Ok)) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +impl From<String> for BytesBody { + fn from(x: String) -> BytesBody { + Self::new(Bytes::from(x)) + } +} +impl From<Vec<u8>> for BytesBody { + fn from(x: Vec<u8>) -> BytesBody { + Self::new(Bytes::from(x)) + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs new file mode 100644 index 00000000..8e62d1e7 --- /dev/null +++ b/src/api/mod.rs @@ -0,0 +1,2 @@ +pub mod api_server; +pub mod http_util; |