aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs100
1 files changed, 82 insertions, 18 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 1e3b1b44..830a7998 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -30,11 +30,14 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::body::StreamingChecksumReceiver;
+use garage_api_common::signature::checksum::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::website::X_AMZ_WEBSITE_REDIRECT_LOCATION;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
@@ -47,6 +50,10 @@ pub(crate) struct SaveStreamResult {
pub(crate) enum ChecksumMode<'a> {
Verify(&'a ExpectedChecksums),
+ VerifyFrom {
+ checksummer: StreamingChecksumReceiver,
+ trailer_algo: Option<ChecksumAlgorithm>,
+ },
Calculate(Option<ChecksumAlgorithm>),
}
@@ -54,10 +61,9 @@ pub async fn handle_put(
ctx: ReqCtx,
req: Request<ReqBody>,
key: &String,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
- let headers = get_headers(req.headers())?;
+ let headers = extract_metadata_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
let expected_checksums = ExpectedChecksums {
@@ -65,9 +71,10 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
- sha256: content_sha256,
+ sha256: None,
extra: request_checksum_value(req.headers())?,
};
+ let trailer_checksum_algorithm = request_trailer_checksum_algorithm(req.headers())?;
let meta = ObjectVersionMetaInner {
headers,
@@ -77,7 +84,19 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
- let stream = body_stream(req.into_body());
+ // The request body is a special ReqBody object (see garage_api_common::signature::body)
+ // which supports calculating checksums while streaming the data.
+ // Before we start streaming, we configure it to calculate all the checksums we need.
+ let mut req_body = req.into_body();
+ req_body.add_expected_checksums(expected_checksums.clone());
+ if !encryption.is_encrypted() {
+ // For non-encrypted objects, we need to compute the md5sum in all cases
+ // (even if content-md5 is not set), because it is used as the object etag
+ req_body.add_md5();
+ }
+
+ let (stream, checksummer) = req_body.streaming_with_checksums();
+ let stream = stream.map_err(Error::from);
let res = save_stream(
&ctx,
@@ -85,7 +104,10 @@ pub async fn handle_put(
encryption,
stream,
key,
- ChecksumMode::Verify(&expected_checksums),
+ ChecksumMode::VerifyFrom {
+ checksummer,
+ trailer_algo: trailer_checksum_algorithm,
+ },
)
.await?;
@@ -121,10 +143,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref());
- let mut checksummer = match checksum_mode {
+ let mut checksummer = match &checksum_mode {
ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
ChecksumMode::Calculate(algo) => {
- Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo)
+ Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo)
+ }
+ ChecksumMode::VerifyFrom { .. } => {
+ // Checksums are calculated by the garage_api_common::signature module
+ // so here we can just have an empty checksummer that does nothing
+ Checksummer::new()
}
};
@@ -132,7 +159,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD {
checksummer.update(&first_block);
- let checksums = checksummer.finalize();
+ let mut checksums = checksummer.finalize();
match checksum_mode {
ChecksumMode::Verify(expected) => {
@@ -141,6 +168,18 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
+ ChecksumMode::VerifyFrom {
+ checksummer,
+ trailer_algo,
+ } => {
+ drop(chunker);
+ checksums = checksummer
+ .await
+ .ok_or_internal_error("checksum calculation")??;
+ if let Some(algo) = trailer_algo {
+ meta.checksum = checksums.extract(Some(algo));
+ }
+ }
};
let size = first_block.len() as u64;
@@ -212,13 +251,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data
- let (total_size, checksums, first_block_hash) = read_and_put_blocks(
+ let (total_size, mut checksums, first_block_hash) = read_and_put_blocks(
ctx,
&version,
encryption,
1,
first_block,
- &mut chunker,
+ chunker,
checksummer,
)
.await?;
@@ -231,6 +270,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
+ ChecksumMode::VerifyFrom {
+ checksummer,
+ trailer_algo,
+ } => {
+ checksums = checksummer
+ .await
+ .ok_or_internal_error("checksum calculation")??;
+ if let Some(algo) = trailer_algo {
+ meta.checksum = checksums.extract(Some(algo));
+ }
+ }
};
// Verify quotas are respsected
@@ -331,7 +381,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
- chunker: &mut StreamChunker<S>,
+ mut chunker: StreamChunker<S>,
checksummer: Checksummer,
) -> Result<(u64, Checksums, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage");
@@ -600,7 +650,9 @@ impl Drop for InterruptedCleanup {
// ============ helpers ============
-pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> {
+pub(crate) fn extract_metadata_headers(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<HeaderList, Error> {
let mut ret = Vec::new();
// Preserve standard headers
@@ -622,10 +674,22 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList
for (name, value) in headers.iter() {
if name.as_str().starts_with("x-amz-meta-") {
ret.push((
- name.to_string(),
+ name.as_str().to_ascii_lowercase(),
std::str::from_utf8(value.as_bytes())?.to_string(),
));
}
+ if name == X_AMZ_WEBSITE_REDIRECT_LOCATION {
+ let value = std::str::from_utf8(value.as_bytes())?.to_string();
+ if !(value.starts_with("/")
+ || value.starts_with("http://")
+ || value.starts_with("https://"))
+ {
+ return Err(Error::bad_request(format!(
+ "Invalid {X_AMZ_WEBSITE_REDIRECT_LOCATION} header",
+ )));
+ }
+ ret.push((X_AMZ_WEBSITE_REDIRECT_LOCATION.to_string(), value));
+ }
}
Ok(ret)