diff options
Diffstat (limited to 'src/api/s3_get.rs')
-rw-r--r-- | src/api/s3_get.rs | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs new file mode 100644 index 00000000..b933f549 --- /dev/null +++ b/src/api/s3_get.rs @@ -0,0 +1,131 @@ +use std::sync::Arc; +use std::time::{Duration, UNIX_EPOCH}; + +use futures::stream::*; +use hyper::body::Bytes; +use hyper::{Response, StatusCode}; + +use garage_util::error::Error; + +use garage_table::EmptyKey; + +use garage_core::garage::Garage; +use garage_core::object_table::*; + +use crate::api_server::BodyType; +use crate::http_util::*; + +fn object_headers(version: &ObjectVersion) -> http::response::Builder { + let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); + let date_str = httpdate::fmt_http_date(date); + + Response::builder() + .header("Content-Type", version.mime_type.to_string()) + .header("Content-Length", format!("{}", version.size)) + .header("Last-Modified", date_str) +} + +pub async fn handle_head( + garage: Arc<Garage>, + bucket: &str, + key: &str, +) -> Result<Response<BodyType>, Error> { + let object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let version = match object + .versions() + .iter() + .rev() + .filter(|v| v.is_complete && v.data != ObjectVersionData::DeleteMarker) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let body: BodyType = Box::new(BytesBody::from(vec![])); + let response = object_headers(&version) + .status(StatusCode::OK) + .body(body) + .unwrap(); + Ok(response) +} + +pub async fn handle_get( + garage: Arc<Garage>, + bucket: &str, + key: &str, +) -> Result<Response<BodyType>, Error> { + let object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let last_v = match object + .versions() + .iter() + .rev() + .filter(|v| v.is_complete) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let resp_builder = object_headers(&last_v).status(StatusCode::OK); + + match &last_v.data { + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { + let body: BodyType = Box::new(BytesBody::from(bytes.to_vec())); + Ok(resp_builder.body(body)?) + } + ObjectVersionData::FirstBlock(first_block_hash) => { + let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let mut blocks = version + .blocks() + .iter() + .map(|vb| (vb.hash, None)) + .collect::<Vec<_>>(); + blocks[0].1 = Some(first_block); + + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { + if let Some(data) = data_opt { + Ok(Bytes::from(data)) + } else { + garage + .block_manager + .rpc_get_block(&hash) + .await + .map(Bytes::from) + } + } + }) + .buffered(2); + let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + Ok(resp_builder.body(body)?) + } + } +} |