aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_get.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_get.rs')
-rw-r--r--src/api/s3_get.rs96
1 files changed, 95 insertions, 1 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 75478e46..3ed0f914 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -3,7 +3,7 @@ use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*;
use hyper::body::Bytes;
-use hyper::{Response, StatusCode};
+use hyper::{Body, Request, Response, StatusCode};
use garage_util::error::Error;
@@ -22,6 +22,7 @@ fn object_headers(version: &ObjectVersion) -> http::response::Builder {
.header("Content-Type", version.mime_type.to_string())
.header("Content-Length", format!("{}", version.size))
.header("Last-Modified", date_str)
+ .header("Accept-Ranges", format!("bytes"))
}
pub async fn handle_head(
@@ -59,6 +60,7 @@ pub async fn handle_head(
pub async fn handle_get(
garage: Arc<Garage>,
+ req: &Request<Body>,
bucket: &str,
key: &str,
) -> Result<Response<BodyType>, Error> {
@@ -82,6 +84,25 @@ pub async fn handle_get(
None => return Err(Error::NotFound),
};
+ let range = match req.headers().get("range") {
+ Some(range) => {
+ let range_str = range
+ .to_str()
+ .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
+ let mut ranges = http_range::HttpRange::parse(range_str, last_v.size)
+ .map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
+ if ranges.len() > 1 {
+ return Err(Error::BadRequest(format!("Multiple ranges not supported")));
+ } else {
+ ranges.pop()
+ }
+ }
+ None => None,
+ };
+ if let Some(range) = range {
+ return handle_get_range(garage, last_v, range.start, range.start + range.length).await;
+ }
+
let resp_builder = object_headers(&last_v).status(StatusCode::OK);
match &last_v.data {
@@ -131,3 +152,76 @@ pub async fn handle_get(
}
}
}
+
+pub async fn handle_get_range(
+ garage: Arc<Garage>,
+ version: &ObjectVersion,
+ begin: u64,
+ end: u64,
+) -> Result<Response<BodyType>, Error> {
+ if end > version.size {
+ return Err(Error::BadRequest(format!("Range not included in file")));
+ }
+
+ let resp_builder = object_headers(&version)
+ .header(
+ "Content-Range",
+ format!("bytes {}-{}/{}", begin, end, version.size),
+ )
+ .status(StatusCode::PARTIAL_CONTENT);
+
+ match &version.data {
+ ObjectVersionData::Uploading => Err(Error::Message(format!(
+ "Version is_complete() but data is stil Uploading (internal error)"
+ ))),
+ ObjectVersionData::DeleteMarker => Err(Error::NotFound),
+ ObjectVersionData::Inline(bytes) => {
+ if end as usize <= bytes.len() {
+ let body: BodyType = Box::new(BytesBody::from(
+ bytes[begin as usize..end as usize].to_vec(),
+ ));
+ Ok(resp_builder.body(body)?)
+ } else {
+ Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
+ }
+ }
+ ObjectVersionData::FirstBlock(_first_block_hash) => {
+ let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
+ let version = match version {
+ Some(v) => v,
+ None => return Err(Error::NotFound),
+ };
+
+ let blocks = version
+ .blocks()
+ .iter()
+ .cloned()
+ .filter(|block| block.offset + block.size > begin && block.offset < end)
+ .collect::<Vec<_>>();
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |block| {
+ let garage = garage.clone();
+ async move {
+ let data = garage.block_manager.rpc_get_block(&block.hash).await?;
+ let start_in_block = if block.offset > begin {
+ 0
+ } else {
+ begin - block.offset
+ };
+ let end_in_block = if block.offset + block.size < end {
+ block.size
+ } else {
+ end - block.offset
+ };
+ Ok(Bytes::from(
+ data[start_in_block as usize..end_in_block as usize].to_vec(),
+ ))
+ }
+ })
+ .buffered(2);
+ let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
+ Ok(resp_builder.body(body)?)
+ }
+ }
+}