diff options
-rw-r--r-- | src/api/s3_get.rs | 208 |
1 files changed, 117 insertions, 91 deletions
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 86b58d54..7f647e15 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -19,6 +19,8 @@ use garage_model::version_table::*; use crate::error::*; +const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; + fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -120,39 +122,49 @@ pub async fn handle_head( } if let Some(pn) = part_number { - if let ObjectVersionData::Inline(_, _) = version_data { - // Not a multipart upload - return Err(Error::BadRequest( - "Cannot process part_number argument: not a multipart upload".into(), - )); - } - - let version = garage - .version_table - .get(&object_version.uuid, &EmptyKey) - .await? - .ok_or(Error::NoSuchKey)?; - if !version.has_part_number(pn) { - return Err(Error::BadRequest(format!( - "Part number {} does not exist", - pn - ))); + match version_data { + ObjectVersionData::Inline(_, bytes) => { + if pn != 1 { + return Err(Error::InvalidPart); + } + Ok(object_headers(object_version, version_meta) + .header(CONTENT_LENGTH, format!("{}", bytes.len())) + .header( + CONTENT_RANGE, + format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()), + ) + .header(X_AMZ_MP_PARTS_COUNT, "1") + .status(StatusCode::PARTIAL_CONTENT) + .body(Body::empty())?) + } + ObjectVersionData::FirstBlock(_, _) => { + let version = garage + .version_table + .get(&object_version.uuid, &EmptyKey) + .await? + .ok_or(Error::NoSuchKey)?; + + let (part_offset, part_end) = + calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; + let n_parts = version.parts_etags.items().len(); + + Ok(object_headers(object_version, version_meta) + .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) + .header( + CONTENT_RANGE, + format!( + "bytes {}-{}/{}", + part_offset, + part_end - 1, + version_meta.size + ), + ) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .status(StatusCode::PARTIAL_CONTENT) + .body(Body::empty())?) + } + _ => unreachable!(), } - - let part_size: u64 = version - .blocks - .items() - .iter() - .filter(|(k, _)| k.part_number == pn) - .map(|(_, b)| b.size) - .sum(); - let n_parts = version.parts_etags.items().len(); - - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", part_size)) - .header("x-amz-mp-parts-count", format!("{}", n_parts)) - .status(StatusCode::OK) - .body(Body::empty())?) } else { Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) @@ -196,22 +208,27 @@ pub async fn handle_get( return Ok(cached); } - if let Some(pn) = part_number { - return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await; - } - - // No part_number specified, it's a normal get object - - if let Some(range) = parse_range_header(req, last_v_meta.size)? { - return handle_get_range( - garage, - last_v, - last_v_data, - last_v_meta, - range.start, - range.start + range.length, - ) - .await; + match (part_number, parse_range_header(req, last_v_meta.size)?) { + (Some(_), Some(_)) => { + return Err(Error::BadRequest( + "Cannot specify both partNumber and Range header".into(), + )); + } + (Some(pn), None) => { + return handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await; + } + (None, Some(range)) => { + return handle_get_range( + garage, + last_v, + last_v_data, + last_v_meta, + range.start, + range.start + range.length, + ) + .await; + } + (None, None) => (), } let resp_builder = object_headers(last_v, last_v_meta) @@ -305,58 +322,51 @@ async fn handle_get_range( async fn handle_get_part( garage: Arc<Garage>, - req: &Request<Body>, object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, part_number: u64, ) -> Result<Response<Body>, Error> { - let version = if let ObjectVersionData::FirstBlock(_, _) = version_data { - garage - .version_table - .get(&object_version.uuid, &EmptyKey) - .await? - .ok_or(Error::NoSuchKey)? - } else { - return Err(Error::BadRequest( - "Cannot handle part_number: not a multipart upload.".into(), - )); - }; + let resp_builder = + object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); - let blocks = version - .blocks - .items() - .iter() - .filter(|(k, _)| k.part_number == part_number) - .cloned() - .collect::<Vec<_>>(); - - if blocks.is_empty() { - return Err(Error::BadRequest(format!("No such part: {}", part_number))); - } - - let part_size = blocks.iter().map(|(_, b)| b.size).sum(); + match version_data { + ObjectVersionData::Inline(_, bytes) => { + if part_number != 1 { + return Err(Error::InvalidPart); + } + Ok(resp_builder + .header(CONTENT_LENGTH, format!("{}", bytes.len())) + .header( + CONTENT_RANGE, + format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), + ) + .header(X_AMZ_MP_PARTS_COUNT, "1") + .body(Body::from(bytes.to_vec()))?) + } + ObjectVersionData::FirstBlock(_, _) => { + let version = garage + .version_table + .get(&object_version.uuid, &EmptyKey) + .await? + .ok_or(Error::NoSuchKey)?; - if let Some(r) = parse_range_header(req, part_size)? { - let range_begin = r.start; - let range_end = r.start + r.length; - let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end); + let (begin, end) = + calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; + let n_parts = version.parts_etags.items().len(); - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", range_end - range_begin)) - .header( - CONTENT_RANGE, - format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size), - ) - .status(StatusCode::PARTIAL_CONTENT) - .body(body)?) - } else { - let body = body_from_blocks_range(garage, &blocks[..], 0, part_size); + let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", part_size)) - .status(StatusCode::OK) - .body(body)?) + Ok(resp_builder + .header(CONTENT_LENGTH, format!("{}", end - begin)) + .header( + CONTENT_RANGE, + format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), + ) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .body(body)?) + } + _ => unreachable!(), } } @@ -382,6 +392,22 @@ fn parse_range_header( Ok(range) } +fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { + let mut offset = 0; + for (i, (bk, bv)) in v.blocks.items().iter().enumerate() { + if bk.part_number == part_number { + let size: u64 = v.blocks.items()[i..] + .iter() + .take_while(|(k, _)| k.part_number == part_number) + .map(|(_, v)| v.size) + .sum(); + return Some((offset, offset + size)); + } + offset += bv.size; + } + None +} + fn body_from_blocks_range( garage: Arc<Garage>, all_blocks: &[(VersionBlockKey, VersionBlock)], |