diff options
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r-- | src/api/s3/get.rs | 83 |
1 files changed, 61 insertions, 22 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 0d18e775..3f85eecb 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -25,6 +25,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -42,6 +43,7 @@ pub struct GetObjectOverrides { fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, + headers: &ObjectVersionHeaders, ) -> http::response::Builder { debug!("Version meta: {:?}", version_meta); @@ -49,7 +51,7 @@ fn object_headers( let date_str = httpdate::fmt_http_date(date); let mut resp = Response::builder() - .header(CONTENT_TYPE, version_meta.headers.content_type.to_string()) + .header(CONTENT_TYPE, headers.content_type.to_string()) .header(LAST_MODIFIED, date_str) .header(ACCEPT_RANGES, "bytes".to_string()); @@ -57,7 +59,7 @@ fn object_headers( resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag)); } - for (k, v) in version_meta.headers.other.iter() { + for (k, v) in headers.other.iter() { resp = resp.header(k, v.to_string()); } @@ -165,13 +167,16 @@ pub async fn handle_head( return Ok(cached); } + let (_enc, headers) = + EncryptionParams::check_decrypt_for_get(&garage, req, &version_meta.encryption)?; + if let Some(pn) = part_number { match version_data { ObjectVersionData::Inline(_, bytes) => { if pn != 1 { return Err(Error::InvalidPart); } - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", bytes.len())) .header( CONTENT_RANGE, @@ -191,7 +196,7 @@ pub async fn handle_head( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) .header( CONTENT_RANGE, @@ -209,7 +214,7 @@ pub async fn handle_head( _ => unreachable!(), } } else { - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) .body(empty_body())?) @@ -252,23 +257,41 @@ pub async fn handle_get( return Ok(cached); } + let (enc, headers) = + EncryptionParams::check_decrypt_for_get(&garage, req, &last_v_meta.encryption)?; + match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => { + handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await + } (None, Some(range)) => { handle_get_range( garage, last_v, last_v_data, last_v_meta, + enc, + &headers, range.start, range.start + range.length, ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, + (None, None) => { + handle_get_full( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + overrides, + ) + .await + } } } @@ -277,9 +300,11 @@ async fn handle_get_full( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, overrides: GetObjectOverrides, ) -> Result<Response<ResBody>, Error> { - let mut resp_builder = object_headers(version, version_meta) + let mut resp_builder = object_headers(version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK); getobject_override_headers(overrides, &mut resp_builder)?; @@ -303,19 +328,26 @@ async fn handle_get_full( garage2.version_table.get(&version_uuid, &EmptyKey).await }); - let stream_block_0 = garage - .block_manager - .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + let stream_block_0 = encryption + .get_and_decrypt_block( + &garage, + &first_block_hash, + Some(order_stream.order(0)), + ) .await?; + tx.send(stream_block_0) .await .ok_or_message("channel closed")?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { - let stream_block_i = garage - .block_manager - .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + let stream_block_i = encryption + .get_and_decrypt_block( + &garage, + &vb.hash, + Some(order_stream.order(i as u64)), + ) .await?; tx.send(stream_block_i) .await @@ -344,13 +376,15 @@ async fn handle_get_range( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, begin: u64, end: u64, ) -> Result<Response<ResBody>, Error> { // Here we do not use getobject_override_headers because we don't // want to add any overridden headers (those should not be added // when returning PARTIAL_CONTENT) - let resp_builder = object_headers(version, version_meta) + let resp_builder = object_headers(version, version_meta, headers) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( CONTENT_RANGE, @@ -377,7 +411,8 @@ async fn handle_get_range( .await? .ok_or(Error::NoSuchKey)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder.body(body)?) } } @@ -388,11 +423,13 @@ async fn handle_get_part( object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, part_number: u64, ) -> Result<Response<ResBody>, Error> { // Same as for get_range, no getobject_override_headers let resp_builder = - object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); + object_headers(object_version, version_meta, headers).status(StatusCode::PARTIAL_CONTENT); match version_data { ObjectVersionData::Inline(_, bytes) => { @@ -418,7 +455,8 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", end - begin)) @@ -473,6 +511,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { fn body_from_blocks_range( garage: Arc<Garage>, + encryption: EncryptionParams, all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, @@ -504,10 +543,10 @@ fn body_from_blocks_range( match async { let garage = garage.clone(); for (i, (block, block_offset)) in blocks.iter().enumerate() { - let block_stream = garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await? + let block_stream = encryption + .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64))) + .await?; + let block_stream = block_stream .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { |