aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/get.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r--src/api/s3/get.rs83
1 files changed, 61 insertions, 22 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 0d18e775..3f85eecb 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -25,6 +25,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -42,6 +43,7 @@ pub struct GetObjectOverrides {
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
+ headers: &ObjectVersionHeaders,
) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta);
@@ -49,7 +51,7 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
- .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
+ .header(CONTENT_TYPE, headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
@@ -57,7 +59,7 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
- for (k, v) in version_meta.headers.other.iter() {
+ for (k, v) in headers.other.iter() {
resp = resp.header(k, v.to_string());
}
@@ -165,13 +167,16 @@ pub async fn handle_head(
return Ok(cached);
}
+ let (_enc, headers) =
+ EncryptionParams::check_decrypt_for_get(&garage, req, &version_meta.encryption)?;
+
if let Some(pn) = part_number {
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
CONTENT_RANGE,
@@ -191,7 +196,7 @@ pub async fn handle_head(
let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
.header(
CONTENT_RANGE,
@@ -209,7 +214,7 @@ pub async fn handle_head(
_ => unreachable!(),
}
} else {
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(empty_body())?)
@@ -252,23 +257,41 @@ pub async fn handle_get(
return Ok(cached);
}
+ let (enc, headers) =
+ EncryptionParams::check_decrypt_for_get(&garage, req, &last_v_meta.encryption)?;
+
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
- (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
+ (Some(pn), None) => {
+ handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await
+ }
(None, Some(range)) => {
handle_get_range(
garage,
last_v,
last_v_data,
last_v_meta,
+ enc,
+ &headers,
range.start,
range.start + range.length,
)
.await
}
- (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
+ (None, None) => {
+ handle_get_full(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ overrides,
+ )
+ .await
+ }
}
}
@@ -277,9 +300,11 @@ async fn handle_get_full(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
- let mut resp_builder = object_headers(version, version_meta)
+ let mut resp_builder = object_headers(version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?;
@@ -303,19 +328,26 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await
});
- let stream_block_0 = garage
- .block_manager
- .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ let stream_block_0 = encryption
+ .get_and_decrypt_block(
+ &garage,
+ &first_block_hash,
+ Some(order_stream.order(0)),
+ )
.await?;
+
tx.send(stream_block_0)
.await
.ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
- let stream_block_i = garage
- .block_manager
- .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ let stream_block_i = encryption
+ .get_and_decrypt_block(
+ &garage,
+ &vb.hash,
+ Some(order_stream.order(i as u64)),
+ )
.await?;
tx.send(stream_block_i)
.await
@@ -344,13 +376,15 @@ async fn handle_get_range(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
begin: u64,
end: u64,
) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT)
- let resp_builder = object_headers(version, version_meta)
+ let resp_builder = object_headers(version, version_meta, headers)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
CONTENT_RANGE,
@@ -377,7 +411,8 @@ async fn handle_get_range(
.await?
.ok_or(Error::NoSuchKey)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
@@ -388,11 +423,13 @@ async fn handle_get_part(
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
part_number: u64,
) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers
let resp_builder =
- object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
+ object_headers(object_version, version_meta, headers).status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
@@ -418,7 +455,8 @@ async fn handle_get_part(
let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
@@ -473,6 +511,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
fn body_from_blocks_range(
garage: Arc<Garage>,
+ encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
@@ -504,10 +543,10 @@ fn body_from_blocks_range(
match async {
let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
- let block_stream = garage
- .block_manager
- .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await?
+ let block_stream = encryption
+ .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {