aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-01-25 11:29:21 +0100
committerAlex Auvolat <alex@adnab.me>2022-01-25 12:25:23 +0100
commitacdf893362196125729cd5ec7a6feb7fe5668e9a (patch)
tree3a892af4fb9f670a23622fa6a580b77a83980d01
parent338b1b83eec2ff641cc8387417442697b4138966 (diff)
downloadgarage-acdf893362196125729cd5ec7a6feb7fe5668e9a.tar.gz
garage-acdf893362196125729cd5ec7a6feb7fe5668e9a.zip
Fix partnumber
-rw-r--r--src/api/s3_get.rs208
1 files changed, 117 insertions, 91 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 86b58d54..7f647e15 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -19,6 +19,8 @@ use garage_model::version_table::*;
use crate::error::*;
+const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
+
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
@@ -120,39 +122,49 @@ pub async fn handle_head(
}
if let Some(pn) = part_number {
- if let ObjectVersionData::Inline(_, _) = version_data {
- // Not a multipart upload
- return Err(Error::BadRequest(
- "Cannot process part_number argument: not a multipart upload".into(),
- ));
- }
-
- let version = garage
- .version_table
- .get(&object_version.uuid, &EmptyKey)
- .await?
- .ok_or(Error::NoSuchKey)?;
- if !version.has_part_number(pn) {
- return Err(Error::BadRequest(format!(
- "Part number {} does not exist",
- pn
- )));
+ match version_data {
+ ObjectVersionData::Inline(_, bytes) => {
+ if pn != 1 {
+ return Err(Error::InvalidPart);
+ }
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", bytes.len()))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(Body::empty())?)
+ }
+ ObjectVersionData::FirstBlock(_, _) => {
+ let version = garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ let (part_offset, part_end) =
+ calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
+ let n_parts = version.parts_etags.items().len();
+
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
+ .header(
+ CONTENT_RANGE,
+ format!(
+ "bytes {}-{}/{}",
+ part_offset,
+ part_end - 1,
+ version_meta.size
+ ),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(Body::empty())?)
+ }
+ _ => unreachable!(),
}
-
- let part_size: u64 = version
- .blocks
- .items()
- .iter()
- .filter(|(k, _)| k.part_number == pn)
- .map(|(_, b)| b.size)
- .sum();
- let n_parts = version.parts_etags.items().len();
-
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", part_size))
- .header("x-amz-mp-parts-count", format!("{}", n_parts))
- .status(StatusCode::OK)
- .body(Body::empty())?)
} else {
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
@@ -196,22 +208,27 @@ pub async fn handle_get(
return Ok(cached);
}
- if let Some(pn) = part_number {
- return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
- }
-
- // No part_number specified, it's a normal get object
-
- if let Some(range) = parse_range_header(req, last_v_meta.size)? {
- return handle_get_range(
- garage,
- last_v,
- last_v_data,
- last_v_meta,
- range.start,
- range.start + range.length,
- )
- .await;
+ match (part_number, parse_range_header(req, last_v_meta.size)?) {
+ (Some(_), Some(_)) => {
+ return Err(Error::BadRequest(
+ "Cannot specify both partNumber and Range header".into(),
+ ));
+ }
+ (Some(pn), None) => {
+ return handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await;
+ }
+ (None, Some(range)) => {
+ return handle_get_range(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ range.start,
+ range.start + range.length,
+ )
+ .await;
+ }
+ (None, None) => (),
}
let resp_builder = object_headers(last_v, last_v_meta)
@@ -305,58 +322,51 @@ async fn handle_get_range(
async fn handle_get_part(
garage: Arc<Garage>,
- req: &Request<Body>,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
) -> Result<Response<Body>, Error> {
- let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
- garage
- .version_table
- .get(&object_version.uuid, &EmptyKey)
- .await?
- .ok_or(Error::NoSuchKey)?
- } else {
- return Err(Error::BadRequest(
- "Cannot handle part_number: not a multipart upload.".into(),
- ));
- };
+ let resp_builder =
+ object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
- let blocks = version
- .blocks
- .items()
- .iter()
- .filter(|(k, _)| k.part_number == part_number)
- .cloned()
- .collect::<Vec<_>>();
-
- if blocks.is_empty() {
- return Err(Error::BadRequest(format!("No such part: {}", part_number)));
- }
-
- let part_size = blocks.iter().map(|(_, b)| b.size).sum();
+ match version_data {
+ ObjectVersionData::Inline(_, bytes) => {
+ if part_number != 1 {
+ return Err(Error::InvalidPart);
+ }
+ Ok(resp_builder
+ .header(CONTENT_LENGTH, format!("{}", bytes.len()))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .body(Body::from(bytes.to_vec()))?)
+ }
+ ObjectVersionData::FirstBlock(_, _) => {
+ let version = garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
- if let Some(r) = parse_range_header(req, part_size)? {
- let range_begin = r.start;
- let range_end = r.start + r.length;
- let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
+ let (begin, end) =
+ calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
+ let n_parts = version.parts_etags.items().len();
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
- .header(
- CONTENT_RANGE,
- format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
- )
- .status(StatusCode::PARTIAL_CONTENT)
- .body(body)?)
- } else {
- let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
+ let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", part_size))
- .status(StatusCode::OK)
- .body(body)?)
+ Ok(resp_builder
+ .header(CONTENT_LENGTH, format!("{}", end - begin))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
+ .body(body)?)
+ }
+ _ => unreachable!(),
}
}
@@ -382,6 +392,22 @@ fn parse_range_header(
Ok(range)
}
+fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
+ let mut offset = 0;
+ for (i, (bk, bv)) in v.blocks.items().iter().enumerate() {
+ if bk.part_number == part_number {
+ let size: u64 = v.blocks.items()[i..]
+ .iter()
+ .take_while(|(k, _)| k.part_number == part_number)
+ .map(|(_, v)| v.size)
+ .sum();
+ return Some((offset, offset + size));
+ }
+ offset += bv.size;
+ }
+ None
+}
+
fn body_from_blocks_range(
garage: Arc<Garage>,
all_blocks: &[(VersionBlockKey, VersionBlock)],