aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/get.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r--src/api/s3/get.rs283
1 files changed, 216 insertions, 67 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index ed996fb1..f5d3cf11 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -1,10 +1,12 @@
//! Function related to GET and HEAD requests
+use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
+use bytes::Bytes;
use futures::future;
-use futures::stream::{self, StreamExt};
+use futures::stream::{self, Stream, StreamExt};
use http::header::{
ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
@@ -25,6 +27,8 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
+use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -42,6 +46,9 @@ pub struct GetObjectOverrides {
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
+ meta_inner: &ObjectVersionMetaInner,
+ encryption: EncryptionParams,
+ checksum_mode: ChecksumMode,
) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta);
@@ -49,7 +56,6 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
- .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
@@ -57,10 +63,31 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
- for (k, v) in version_meta.headers.other.iter() {
- resp = resp.header(k, v.to_string());
+ // When metadata is retrieved through the REST API, Amazon S3 combines headers that
+ // have the same name (ignoring case) into a comma-delimited list.
+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
+ let mut headers_by_name = BTreeMap::new();
+ for (name, value) in meta_inner.headers.iter() {
+ match headers_by_name.get_mut(name) {
+ None => {
+ headers_by_name.insert(name, vec![value.as_str()]);
+ }
+ Some(headers) => {
+ headers.push(value.as_str());
+ }
+ }
+ }
+
+ for (name, values) in headers_by_name {
+ resp = resp.header(name, values.join(","));
+ }
+
+ if checksum_mode.enabled {
+ resp = add_checksum_response_headers(&meta_inner.checksum, resp);
}
+ encryption.add_response_headers(&mut resp);
+
resp
}
@@ -175,21 +202,33 @@ pub async fn handle_head_without_ctx(
return Ok(cached);
}
+ let (encryption, headers) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?;
+
+ let checksum_mode = checksum_mode(&req);
+
if let Some(pn) = part_number {
match version_data {
- ObjectVersionData::Inline(_, bytes) => {
+ ObjectVersionData::Inline(_, _) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", bytes.len()))
- .header(
- CONTENT_RANGE,
- format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
- )
- .header(X_AMZ_MP_PARTS_COUNT, "1")
- .status(StatusCode::PARTIAL_CONTENT)
- .body(empty_body())?)
+ let bytes_len = version_meta.size;
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", bytes_len))
+ .header(
+ CONTENT_RANGE,
+ format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, "1")
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(empty_body())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -201,28 +240,40 @@ pub async fn handle_head_without_ctx(
let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
- .header(
- CONTENT_RANGE,
- format!(
- "bytes {}-{}/{}",
- part_offset,
- part_end - 1,
- version_meta.size
- ),
- )
- .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
- .status(StatusCode::PARTIAL_CONTENT)
- .body(empty_body())?)
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
+ .header(
+ CONTENT_RANGE,
+ format!(
+ "bytes {}-{}/{}",
+ part_offset,
+ part_end - 1,
+ version_meta.size
+ ),
+ )
+ .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
+ .status(StatusCode::PARTIAL_CONTENT)
+ .body(empty_body())?)
}
_ => unreachable!(),
}
} else {
- Ok(object_headers(object_version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", version_meta.size))
- .status(StatusCode::OK)
- .body(empty_body())?)
+ Ok(object_headers(
+ object_version,
+ version_meta,
+ &headers,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK)
+ .body(empty_body())?)
}
}
@@ -273,23 +324,55 @@ pub async fn handle_get_without_ctx(
return Ok(cached);
}
+ let (enc, headers) =
+ EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?;
+
+ let checksum_mode = checksum_mode(&req);
+
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
- (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
+ (Some(pn), None) => {
+ handle_get_part(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ pn,
+ checksum_mode,
+ )
+ .await
+ }
(None, Some(range)) => {
handle_get_range(
garage,
last_v,
last_v_data,
last_v_meta,
+ enc,
+ &headers,
range.start,
range.start + range.length,
+ checksum_mode,
+ )
+ .await
+ }
+ (None, None) => {
+ handle_get_full(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ overrides,
+ checksum_mode,
)
.await
}
- (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
}
}
@@ -298,17 +381,43 @@ async fn handle_get_full(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
overrides: GetObjectOverrides,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
- let mut resp_builder = object_headers(version, version_meta)
- .header(CONTENT_LENGTH, format!("{}", version_meta.size))
- .status(StatusCode::OK);
+ let mut resp_builder = object_headers(
+ version,
+ version_meta,
+ &meta_inner,
+ encryption,
+ checksum_mode,
+ )
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?;
+ let stream = full_object_byte_stream(garage, version, version_data, encryption);
+
+ Ok(resp_builder.body(response_body_from_stream(stream))?)
+}
+
+pub fn full_object_byte_stream(
+ garage: Arc<Garage>,
+ version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ encryption: EncryptionParams,
+) -> ByteStream {
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
+ let bytes = bytes.to_vec();
+ Box::pin(futures::stream::once(async move {
+ encryption
+ .decrypt_blob(&bytes)
+ .map(|x| Bytes::from(x.to_vec()))
+ .map_err(std_error_from_read_error)
+ }))
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2);
@@ -324,19 +433,18 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await
});
- let stream_block_0 = garage
- .block_manager
- .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ let stream_block_0 = encryption
+ .get_block(&garage, &first_block_hash, Some(order_stream.order(0)))
.await?;
+
tx.send(stream_block_0)
.await
.ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
- let stream_block_i = garage
- .block_manager
- .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ let stream_block_i = encryption
+ .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64)))
.await?;
tx.send(stream_block_i)
.await
@@ -354,8 +462,7 @@ async fn handle_get_full(
}
});
- let body = response_body_from_block_stream(rx);
- Ok(resp_builder.body(body)?)
+ Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten())
}
}
}
@@ -365,13 +472,16 @@ async fn handle_get_range(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
begin: u64,
end: u64,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT)
- let resp_builder = object_headers(version, version_meta)
+ let resp_builder = object_headers(version, version_meta, meta_inner, encryption, checksum_mode)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
CONTENT_RANGE,
@@ -382,6 +492,7 @@ async fn handle_get_range(
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
+ let bytes = encryption.decrypt_blob(&bytes)?;
if end as usize <= bytes.len() {
let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
@@ -398,7 +509,8 @@ async fn handle_get_range(
.await?
.ok_or(Error::NoSuchKey)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
@@ -409,17 +521,28 @@ async fn handle_get_part(
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ meta_inner: &ObjectVersionMetaInner,
part_number: u64,
+ checksum_mode: ChecksumMode,
) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers
- let resp_builder =
- object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
+ let resp_builder = object_headers(
+ object_version,
+ version_meta,
+ meta_inner,
+ encryption,
+ checksum_mode,
+ )
+ .status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if part_number != 1 {
return Err(Error::InvalidPart);
}
+ let bytes = encryption.decrypt_blob(&bytes)?;
+ assert_eq!(bytes.len() as u64, version_meta.size);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
@@ -427,7 +550,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(bytes_body(bytes.to_vec().into()))?)
+ .body(bytes_body(bytes.into_owned().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -439,7 +562,8 @@ async fn handle_get_part(
let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
@@ -492,8 +616,23 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
None
}
+struct ChecksumMode {
+ enabled: bool,
+}
+
+fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
+ ChecksumMode {
+ enabled: req
+ .headers()
+ .get(X_AMZ_CHECKSUM_MODE)
+ .map(|x| x == "ENABLED")
+ .unwrap_or(false),
+ }
+}
+
fn body_from_blocks_range(
garage: Arc<Garage>,
+ encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
@@ -523,12 +662,11 @@ fn body_from_blocks_range(
tokio::spawn(async move {
match async {
- let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
- let block_stream = garage
- .block_manager
- .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await?
+ let block_stream = encryption
+ .get_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
@@ -588,19 +726,30 @@ fn body_from_blocks_range(
}
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
- .flatten()
- .map(|x| {
- x.map(hyper::body::Frame::data)
- .map_err(|e| Error::from(garage_util::error::Error::from(e)))
- });
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
+ response_body_from_stream(body_stream)
+}
+
+fn response_body_from_stream<S>(stream: S) -> ResBody
+where
+ S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
+{
+ let body_stream = stream.map(|x| {
+ x.map(hyper::body::Frame::data)
+ .map_err(|e| Error::from(garage_util::error::Error::from(e)))
+ });
ResBody::new(http_body_util::StreamBody::new(body_stream))
}
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
- let err = std::io::Error::new(
+ Box::pin(stream::once(future::ready(Err(std_error_from_read_error(
+ e,
+ )))))
+}
+
+fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error {
+ std::io::Error::new(
std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- Box::pin(stream::once(future::ready(Err(err))))
+ format!("Error while reading object data: {}", e),
+ )
}