diff options
author | Alex <alex@adnab.me> | 2024-03-07 15:21:37 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-03-07 15:21:37 +0000 |
commit | 2fd13c7d135949a83ed52ed81672ac7e1956f134 (patch) | |
tree | 7c0eb4ebf106c3e624426bd6569b1c1bb4d20e01 /src/api/s3/get.rs | |
parent | fe2dc5d51c206c21ab15d9cc93fa1d1c52d95c46 (diff) | |
parent | 3fcb54e3cf62cdc9ed84751e1f0522ff553ea63c (diff) | |
download | garage-2fd13c7d135949a83ed52ed81672ac7e1956f134.tar.gz garage-2fd13c7d135949a83ed52ed81672ac7e1956f134.zip |
Merge pull request 'SSE-C encryption' (#730) from sse-c into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/730
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r-- | src/api/s3/get.rs | 216 |
1 files changed, 151 insertions, 65 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ed996fb1..ec300ab7 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -1,10 +1,12 @@ //! Function related to GET and HEAD requests +use std::collections::BTreeMap; use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; +use bytes::Bytes; use futures::future; -use futures::stream::{self, StreamExt}; +use futures::stream::{self, Stream, StreamExt}; use http::header::{ ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, @@ -25,6 +27,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -42,6 +45,8 @@ pub struct GetObjectOverrides { fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, + headers: &ObjectVersionHeaders, + encryption: EncryptionParams, ) -> http::response::Builder { debug!("Version meta: {:?}", version_meta); @@ -49,7 +54,6 @@ fn object_headers( let date_str = httpdate::fmt_http_date(date); let mut resp = Response::builder() - .header(CONTENT_TYPE, version_meta.headers.content_type.to_string()) .header(LAST_MODIFIED, date_str) .header(ACCEPT_RANGES, "bytes".to_string()); @@ -57,10 +61,27 @@ fn object_headers( resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag)); } - for (k, v) in version_meta.headers.other.iter() { - resp = resp.header(k, v.to_string()); + // When metadata is retrieved through the REST API, Amazon S3 combines headers that + // have the same name (ignoring case) into a comma-delimited list. + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html + let mut headers_by_name = BTreeMap::new(); + for (name, value) in headers.0.iter() { + match headers_by_name.get_mut(name) { + None => { + headers_by_name.insert(name, vec![value.as_str()]); + } + Some(headers) => { + headers.push(value.as_str()); + } + } + } + + for (name, values) in headers_by_name { + resp = resp.header(name, values.join(",")); } + encryption.add_response_headers(&mut resp); + resp } @@ -175,21 +196,27 @@ pub async fn handle_head_without_ctx( return Ok(cached); } + let (encryption, headers) = + EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?; + if let Some(pn) = part_number { match version_data { - ObjectVersionData::Inline(_, bytes) => { + ObjectVersionData::Inline(_, _) => { if pn != 1 { return Err(Error::InvalidPart); } - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", bytes.len())) - .header( - CONTENT_RANGE, - format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()), - ) - .header(X_AMZ_MP_PARTS_COUNT, "1") - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?) + let bytes_len = version_meta.size; + Ok( + object_headers(object_version, version_meta, &headers, encryption) + .header(CONTENT_LENGTH, format!("{}", bytes_len)) + .header( + CONTENT_RANGE, + format!("bytes 0-{}/{}", bytes_len - 1, bytes_len), + ) + .header(X_AMZ_MP_PARTS_COUNT, "1") + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?, + ) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -201,28 +228,32 @@ pub async fn handle_head_without_ctx( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) - .header( - CONTENT_RANGE, - format!( - "bytes {}-{}/{}", - part_offset, - part_end - 1, - version_meta.size - ), - ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) - .status(StatusCode::PARTIAL_CONTENT) - .body(empty_body())?) + Ok( + object_headers(object_version, version_meta, &headers, encryption) + .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) + .header( + CONTENT_RANGE, + format!( + "bytes {}-{}/{}", + part_offset, + part_end - 1, + version_meta.size + ), + ) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) + .status(StatusCode::PARTIAL_CONTENT) + .body(empty_body())?, + ) } _ => unreachable!(), } } else { - Ok(object_headers(object_version, version_meta) - .header(CONTENT_LENGTH, format!("{}", version_meta.size)) - .status(StatusCode::OK) - .body(empty_body())?) + Ok( + object_headers(object_version, version_meta, &headers, encryption) + .header(CONTENT_LENGTH, format!("{}", version_meta.size)) + .status(StatusCode::OK) + .body(empty_body())?, + ) } } @@ -273,23 +304,41 @@ pub async fn handle_get_without_ctx( return Ok(cached); } + let (enc, headers) = + EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?; + match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => { + handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await + } (None, Some(range)) => { handle_get_range( garage, last_v, last_v_data, last_v_meta, + enc, + &headers, range.start, range.start + range.length, ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, + (None, None) => { + handle_get_full( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + overrides, + ) + .await + } } } @@ -298,17 +347,36 @@ async fn handle_get_full( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, overrides: GetObjectOverrides, ) -> Result<Response<ResBody>, Error> { - let mut resp_builder = object_headers(version, version_meta) + let mut resp_builder = object_headers(version, version_meta, &headers, encryption) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK); getobject_override_headers(overrides, &mut resp_builder)?; + let stream = full_object_byte_stream(garage, version, version_data, encryption); + + Ok(resp_builder.body(response_body_from_stream(stream))?) +} + +pub fn full_object_byte_stream( + garage: Arc<Garage>, + version: &ObjectVersion, + version_data: &ObjectVersionData, + encryption: EncryptionParams, +) -> ByteStream { match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) + let bytes = bytes.to_vec(); + Box::pin(futures::stream::once(async move { + encryption + .decrypt_blob(&bytes) + .map(|x| Bytes::from(x.to_vec())) + .map_err(std_error_from_read_error) + })) } ObjectVersionData::FirstBlock(_, first_block_hash) => { let (tx, rx) = mpsc::channel::<ByteStream>(2); @@ -324,19 +392,18 @@ async fn handle_get_full( garage2.version_table.get(&version_uuid, &EmptyKey).await }); - let stream_block_0 = garage - .block_manager - .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + let stream_block_0 = encryption + .get_block(&garage, &first_block_hash, Some(order_stream.order(0))) .await?; + tx.send(stream_block_0) .await .ok_or_message("channel closed")?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { - let stream_block_i = garage - .block_manager - .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + let stream_block_i = encryption + .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64))) .await?; tx.send(stream_block_i) .await @@ -354,8 +421,7 @@ async fn handle_get_full( } }); - let body = response_body_from_block_stream(rx); - Ok(resp_builder.body(body)?) + Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten()) } } } @@ -365,13 +431,15 @@ async fn handle_get_range( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, begin: u64, end: u64, ) -> Result<Response<ResBody>, Error> { // Here we do not use getobject_override_headers because we don't // want to add any overridden headers (those should not be added // when returning PARTIAL_CONTENT) - let resp_builder = object_headers(version, version_meta) + let resp_builder = object_headers(version, version_meta, headers, encryption) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( CONTENT_RANGE, @@ -382,6 +450,7 @@ async fn handle_get_range( match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { + let bytes = encryption.decrypt_blob(&bytes)?; if end as usize <= bytes.len() { let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) @@ -398,7 +467,8 @@ async fn handle_get_range( .await? .ok_or(Error::NoSuchKey)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder.body(body)?) } } @@ -409,17 +479,21 @@ async fn handle_get_part( object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, part_number: u64, ) -> Result<Response<ResBody>, Error> { // Same as for get_range, no getobject_override_headers - let resp_builder = - object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); + let resp_builder = object_headers(object_version, version_meta, headers, encryption) + .status(StatusCode::PARTIAL_CONTENT); match version_data { ObjectVersionData::Inline(_, bytes) => { if part_number != 1 { return Err(Error::InvalidPart); } + let bytes = encryption.decrypt_blob(&bytes)?; + assert_eq!(bytes.len() as u64, version_meta.size); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", bytes.len())) .header( @@ -427,7 +501,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(bytes_body(bytes.to_vec().into()))?) + .body(bytes_body(bytes.into_owned().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -439,7 +513,8 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", end - begin)) @@ -494,6 +569,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { fn body_from_blocks_range( garage: Arc<Garage>, + encryption: EncryptionParams, all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, @@ -523,12 +599,11 @@ fn body_from_blocks_range( tokio::spawn(async move { match async { - let garage = garage.clone(); for (i, (block, block_offset)) in blocks.iter().enumerate() { - let block_stream = garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await? + let block_stream = encryption + .get_block(&garage, &block.hash, Some(order_stream.order(i as u64))) + .await?; + let block_stream = block_stream .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { @@ -588,19 +663,30 @@ fn body_from_blocks_range( } fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) - .flatten() - .map(|x| { - x.map(hyper::body::Frame::data) - .map_err(|e| Error::from(garage_util::error::Error::from(e))) - }); + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); + response_body_from_stream(body_stream) +} + +fn response_body_from_stream<S>(stream: S) -> ResBody +where + S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static, +{ + let body_stream = stream.map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); ResBody::new(http_body_util::StreamBody::new(body_stream)) } fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { - let err = std::io::Error::new( + Box::pin(stream::once(future::ready(Err(std_error_from_read_error( + e, + ))))) +} + +fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error { + std::io::Error::new( std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - Box::pin(stream::once(future::ready(Err(err)))) + format!("Error while reading object data: {}", e), + ) } |