diff options
Diffstat (limited to 'src/api/s3_get.rs')
-rw-r--r-- | src/api/s3_get.rs | 96 |
1 files changed, 95 insertions, 1 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 75478e46..3ed0f914 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -3,7 +3,7 @@ use std::time::{Duration, UNIX_EPOCH}; use futures::stream::*; use hyper::body::Bytes; -use hyper::{Response, StatusCode}; +use hyper::{Body, Request, Response, StatusCode}; use garage_util::error::Error; @@ -22,6 +22,7 @@ fn object_headers(version: &ObjectVersion) -> http::response::Builder { .header("Content-Type", version.mime_type.to_string()) .header("Content-Length", format!("{}", version.size)) .header("Last-Modified", date_str) + .header("Accept-Ranges", format!("bytes")) } pub async fn handle_head( @@ -59,6 +60,7 @@ pub async fn handle_head( pub async fn handle_get( garage: Arc<Garage>, + req: &Request<Body>, bucket: &str, key: &str, ) -> Result<Response<BodyType>, Error> { @@ -82,6 +84,25 @@ pub async fn handle_get( None => return Err(Error::NotFound), }; + let range = match req.headers().get("range") { + Some(range) => { + let range_str = range + .to_str() + .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?; + let mut ranges = http_range::HttpRange::parse(range_str, last_v.size) + .map_err(|_e| Error::BadRequest(format!("Invalid range")))?; + if ranges.len() > 1 { + return Err(Error::BadRequest(format!("Multiple ranges not supported"))); + } else { + ranges.pop() + } + } + None => None, + }; + if let Some(range) = range { + return handle_get_range(garage, last_v, range.start, range.start + range.length).await; + } + let resp_builder = object_headers(&last_v).status(StatusCode::OK); match &last_v.data { @@ -131,3 +152,76 @@ pub async fn handle_get( } } } + +pub async fn handle_get_range( + garage: Arc<Garage>, + version: &ObjectVersion, + begin: u64, + end: u64, +) -> Result<Response<BodyType>, Error> { + if end > version.size { + return Err(Error::BadRequest(format!("Range not included in file"))); + } + + let resp_builder = object_headers(&version) + .header( + "Content-Range", + format!("bytes {}-{}/{}", begin, end, version.size), + ) + .status(StatusCode::PARTIAL_CONTENT); + + match &version.data { + ObjectVersionData::Uploading => Err(Error::Message(format!( + "Version is_complete() but data is stil Uploading (internal error)" + ))), + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { + if end as usize <= bytes.len() { + let body: BodyType = Box::new(BytesBody::from( + bytes[begin as usize..end as usize].to_vec(), + )); + Ok(resp_builder.body(body)?) + } else { + Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) + } + } + ObjectVersionData::FirstBlock(_first_block_hash) => { + let version = garage.version_table.get(&version.uuid, &EmptyKey).await?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let blocks = version + .blocks() + .iter() + .cloned() + .filter(|block| block.offset + block.size > begin && block.offset < end) + .collect::<Vec<_>>(); + + let body_stream = futures::stream::iter(blocks) + .map(move |block| { + let garage = garage.clone(); + async move { + let data = garage.block_manager.rpc_get_block(&block.hash).await?; + let start_in_block = if block.offset > begin { + 0 + } else { + begin - block.offset + }; + let end_in_block = if block.offset + block.size < end { + block.size + } else { + end - block.offset + }; + Ok(Bytes::from( + data[start_in_block as usize..end_in_block as usize].to_vec(), + )) + } + }) + .buffered(2); + let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + Ok(resp_builder.body(body)?) + } + } +} |