aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-01-13 15:29:39 +0100
committerAlex Auvolat <alex@adnab.me>2022-01-24 21:04:42 +0100
commit338b1b83eec2ff641cc8387417442697b4138966 (patch)
tree7a1523ddc7e11efc0d006fa63cbc09ee1a1f16fb
parent6dab836f3a5646af4a06afa52338702d82c6eb9d (diff)
downloadgarage-338b1b83eec2ff641cc8387417442697b4138966.tar.gz
garage-338b1b83eec2ff641cc8387417442697b4138966.zip
Implement part_number for GetObject
-rw-r--r--src/api/s3_get.rs219
1 files changed, 146 insertions, 73 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index fdb36231..86b58d54 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -5,7 +5,7 @@ use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*;
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
- IF_NONE_MATCH, LAST_MODIFIED,
+ IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
@@ -15,6 +15,7 @@ use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
+use garage_model::version_table::*;
use crate::error::*;
@@ -168,12 +169,6 @@ pub async fn handle_get(
key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> {
- if part_number.is_some() {
- return Err(Error::NotImplemented(
- "part_number not supported for GetObject".into(),
- ));
- }
-
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -201,22 +196,13 @@ pub async fn handle_get(
return Ok(cached);
}
- let range = match req.headers().get("range") {
- Some(range) => {
- let range_str = range.to_str()?;
- let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
- .map_err(|e| (e, last_v_meta.size))?;
- if ranges.len() > 1 {
- // garage does not support multi-range requests yet, so we respond with the entire
- // object when multiple ranges are requested
- None
- } else {
- ranges.pop()
- }
- }
- None => None,
- };
- if let Some(range) = range {
+ if let Some(pn) = part_number {
+ return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
+ }
+
+ // No part_number specified, it's a normal get object
+
+ if let Some(range) = parse_range_header(req, last_v_meta.size)? {
return handle_get_range(
garage,
last_v,
@@ -305,58 +291,145 @@ async fn handle_get_range(
}
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
- let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
- let version = match version {
- Some(v) => v,
- None => return Err(Error::NoSuchKey),
- };
-
- // We will store here the list of blocks that have an intersection with the requested
- // range, as well as their "true offset", which is their actual offset in the complete
- // file (whereas block.offset designates the offset of the block WITHIN THE PART
- // block.part_number, which is not the same in the case of a multipart upload)
- let mut blocks = Vec::with_capacity(std::cmp::min(
- version.blocks.len(),
- 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
- as usize,
- ));
- let mut true_offset = 0;
- for (_, b) in version.blocks.items().iter() {
- if true_offset >= end {
- break;
- }
- // Keep only blocks that have an intersection with the requested range
- if true_offset < end && true_offset + b.size > begin {
- blocks.push((*b, true_offset));
- }
- true_offset += b.size;
- }
+ let version = garage
+ .version_table
+ .get(&version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
- let body_stream = futures::stream::iter(blocks)
- .map(move |(block, true_offset)| {
- let garage = garage.clone();
- async move {
- let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let data = Bytes::from(data);
- let start_in_block = if true_offset > begin {
- 0
- } else {
- begin - true_offset
- };
- let end_in_block = if true_offset + block.size < end {
- block.size
- } else {
- end - true_offset
- };
- Result::<Bytes, Error>::Ok(
- data.slice(start_in_block as usize..end_in_block as usize),
- )
- }
- })
- .buffered(2);
-
- let body = hyper::body::Body::wrap_stream(body_stream);
+ let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
}
+
+async fn handle_get_part(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ object_version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ version_meta: &ObjectVersionMeta,
+ part_number: u64,
+) -> Result<Response<Body>, Error> {
+ let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
+ garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?
+ } else {
+ return Err(Error::BadRequest(
+ "Cannot handle part_number: not a multipart upload.".into(),
+ ));
+ };
+
+ let blocks = version
+ .blocks
+ .items()
+ .iter()
+ .filter(|(k, _)| k.part_number == part_number)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ if blocks.is_empty() {
+ return Err(Error::BadRequest(format!("No such part: {}", part_number)));
+ }
+
+ let part_size = blocks.iter().map(|(_, b)| b.size).sum();
+
+ if let Some(r) = parse_range_header(req, part_size)? {
+ let range_begin = r.start;
+ let range_end = r.start + r.length;
+ let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
+
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
+ )
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(body)?)
+ } else {
+ let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
+
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", part_size))
+ .status(StatusCode::OK)
+ .body(body)?)
+ }
+}
+
+fn parse_range_header(
+ req: &Request<Body>,
+ total_size: u64,
+) -> Result<Option<http_range::HttpRange>, Error> {
+ let range = match req.headers().get(RANGE) {
+ Some(range) => {
+ let range_str = range.to_str()?;
+ let mut ranges =
+ http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
+ if ranges.len() > 1 {
+ // garage does not support multi-range requests yet, so we respond with the entire
+ // object when multiple ranges are requested
+ None
+ } else {
+ ranges.pop()
+ }
+ }
+ None => None,
+ };
+ Ok(range)
+}
+
+fn body_from_blocks_range(
+ garage: Arc<Garage>,
+ all_blocks: &[(VersionBlockKey, VersionBlock)],
+ begin: u64,
+ end: u64,
+) -> Body {
+ // We will store here the list of blocks that have an intersection with the requested
+ // range, as well as their "true offset", which is their actual offset in the complete
+ // file (whereas block.offset designates the offset of the block WITHIN THE PART
+ // block.part_number, which is not the same in the case of a multipart upload)
+ let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
+ all_blocks.len(),
+ 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
+ ));
+ let mut true_offset = 0;
+ for (_, b) in all_blocks.iter() {
+ if true_offset >= end {
+ break;
+ }
+ // Keep only blocks that have an intersection with the requested range
+ if true_offset < end && true_offset + b.size > begin {
+ blocks.push((*b, true_offset));
+ }
+ true_offset += b.size;
+ }
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |(block, true_offset)| {
+ let garage = garage.clone();
+ async move {
+ let data = garage.block_manager.rpc_get_block(&block.hash).await?;
+ let data = Bytes::from(data);
+ let start_in_block = if true_offset > begin {
+ 0
+ } else {
+ begin - true_offset
+ };
+ let end_in_block = if true_offset + block.size < end {
+ block.size
+ } else {
+ end - true_offset
+ };
+ Result::<Bytes, Error>::Ok(
+ data.slice(start_in_block as usize..end_in_block as usize),
+ )
+ }
+ })
+ .buffered(2);
+
+ hyper::body::Body::wrap_stream(body_stream)
+}