aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-02-18 13:59:43 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-18 15:33:42 +0100
commit730bfee753c4f22cd0595d9195222de334ec36f9 (patch)
tree10cae28065967f82adcb145e027c15c38b9502be
parentccab0e4ae5c083d0a06ad2b3ef5fe62481da3c2c (diff)
downloadgarage-730bfee753c4f22cd0595d9195222de334ec36f9.tar.gz
garage-730bfee753c4f22cd0595d9195222de334ec36f9.zip
api: validate trailing checksum + add test for unsigned-paylad-trailer
-rw-r--r--Cargo.lock1
-rw-r--r--src/api/common/signature/body.rs13
-rw-r--r--src/api/common/signature/checksum.rs81
-rw-r--r--src/api/common/signature/streaming.rs58
-rw-r--r--src/api/s3/post_object.rs5
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/tests/common/custom_requester.rs111
-rw-r--r--src/garage/tests/common/garage.rs5
-rw-r--r--src/garage/tests/s3/streaming_signature.rs150
9 files changed, 337 insertions, 88 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 26f6ea1d..477e4456 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1230,6 +1230,7 @@ dependencies = [
"bytes",
"bytesize",
"chrono",
+ "crc32fast",
"format_table",
"futures",
"garage_api_admin",
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs
index 512d02b3..4279d7b5 100644
--- a/src/api/common/signature/body.rs
+++ b/src/api/common/signature/body.rs
@@ -17,6 +17,7 @@ pub struct ReqBody {
pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub(crate) checksummer: Checksummer,
pub(crate) expected_checksums: ExpectedChecksums,
+ pub(crate) trailer_algorithm: Option<ChecksumAlgorithm>,
}
pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>;
@@ -74,6 +75,7 @@ impl ReqBody {
stream,
mut checksummer,
mut expected_checksums,
+ trailer_algorithm,
} = self;
let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1);
@@ -91,18 +93,21 @@ impl ReqBody {
}
Err(frame) => {
let trailers = frame.into_trailers().unwrap();
- if let Some(cv) = request_checksum_value(&trailers)? {
- expected_checksums.extra = Some(cv);
- }
+ let algo = trailer_algorithm.unwrap();
+ expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?);
break;
}
}
}
+ if trailer_algorithm.is_some() && expected_checksums.extra.is_none() {
+ return Err(Error::bad_request("trailing checksum was not sent"));
+ }
+
let checksums = checksummer.finalize();
checksums.verify(&expected_checksums)?;
- return Ok(checksums);
+ Ok(checksums)
});
let stream: BoxStream<_> = stream.into_inner().unwrap();
diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs
index 890c0452..3c5e7c53 100644
--- a/src/api/common/signature/checksum.rs
+++ b/src/api/common/signature/checksum.rs
@@ -12,10 +12,10 @@ use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
-use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
-
use super::*;
+pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
+
pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
@@ -198,17 +198,23 @@ impl Checksums {
// ----
+pub fn parse_checksum_algorithm(algo: &str) -> Result<ChecksumAlgorithm, Error> {
+ match algo {
+ "CRC32" => Ok(ChecksumAlgorithm::Crc32),
+ "CRC32C" => Ok(ChecksumAlgorithm::Crc32c),
+ "SHA1" => Ok(ChecksumAlgorithm::Sha1),
+ "SHA256" => Ok(ChecksumAlgorithm::Sha256),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
/// Extract the value of the x-amz-checksum-algorithm header
pub fn request_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
None => Ok(None),
- Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
- Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
- Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
- Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
- _ => Err(Error::bad_request("invalid checksum algorithm")),
+ Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some),
}
}
@@ -231,37 +237,17 @@ pub fn request_checksum_value(
) -> Result<Option<ChecksumValue>, Error> {
let mut ret = vec![];
- if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
- let crc32 = BASE64_STANDARD
- .decode(&crc32_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
- ret.push(ChecksumValue::Crc32(crc32))
+ if headers.contains_key(X_AMZ_CHECKSUM_CRC32) {
+ ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?);
}
- if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
- let crc32c = BASE64_STANDARD
- .decode(&crc32c_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
- ret.push(ChecksumValue::Crc32c(crc32c))
+ if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) {
+ ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?);
}
- if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
- let sha1 = BASE64_STANDARD
- .decode(&sha1_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
- ret.push(ChecksumValue::Sha1(sha1))
+ if headers.contains_key(X_AMZ_CHECKSUM_SHA1) {
+ ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?);
}
- if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
- let sha256 = BASE64_STANDARD
- .decode(&sha256_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
- ret.push(ChecksumValue::Sha256(sha256))
+ if headers.contains_key(X_AMZ_CHECKSUM_SHA256) {
+ ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?);
}
if ret.len() > 1 {
@@ -274,44 +260,43 @@ pub fn request_checksum_value(
/// Checks for the presence of x-amz-checksum-algorithm
/// if so extract the corresponding x-amz-checksum-* value
-pub fn request_checksum_algorithm_value(
+pub fn extract_checksum_value(
headers: &HeaderMap<HeaderValue>,
-) -> Result<Option<ChecksumValue>, Error> {
- match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
- Some(x) if x == "CRC32" => {
+ algo: ChecksumAlgorithm,
+) -> Result<ChecksumValue, Error> {
+ match algo {
+ ChecksumAlgorithm::Crc32 => {
let crc32 = headers
.get(X_AMZ_CHECKSUM_CRC32)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
- Ok(Some(ChecksumValue::Crc32(crc32)))
+ Ok(ChecksumValue::Crc32(crc32))
}
- Some(x) if x == "CRC32C" => {
+ ChecksumAlgorithm::Crc32c => {
let crc32c = headers
.get(X_AMZ_CHECKSUM_CRC32C)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
- Ok(Some(ChecksumValue::Crc32c(crc32c)))
+ Ok(ChecksumValue::Crc32c(crc32c))
}
- Some(x) if x == "SHA1" => {
+ ChecksumAlgorithm::Sha1 => {
let sha1 = headers
.get(X_AMZ_CHECKSUM_SHA1)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
- Ok(Some(ChecksumValue::Sha1(sha1)))
+ Ok(ChecksumValue::Sha1(sha1))
}
- Some(x) if x == "SHA256" => {
+ ChecksumAlgorithm::Sha256 => {
let sha256 = headers
.get(X_AMZ_CHECKSUM_SHA256)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
- Ok(Some(ChecksumValue::Sha256(sha256)))
+ Ok(ChecksumValue::Sha256(sha256))
}
- Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
- None => Ok(None),
}
}
diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs
index 75f3bf80..70b6e004 100644
--- a/src/api/common/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
@@ -5,7 +5,7 @@ use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
-use http::header::{HeaderValue, CONTENT_ENCODING};
+use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING};
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
@@ -64,10 +64,16 @@ pub fn parse_streaming_body(
}
// If trailer header is announced, add the calculation of the requested checksum
- if trailer {
- let algo = request_trailer_checksum_algorithm(req.headers())?;
+ let trailer_algorithm = if trailer {
+ let algo = Some(
+ request_trailer_checksum_algorithm(req.headers())?
+ .ok_or_bad_request("Missing x-amz-trailer header")?,
+ );
checksummer = checksummer.add(algo);
- }
+ algo
+ } else {
+ None
+ };
// For signed variants, determine signing parameters
let sign_params = if signed {
@@ -123,6 +129,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(signed_payload_stream.boxed()),
checksummer,
expected_checksums,
+ trailer_algorithm,
}
}))
}
@@ -132,6 +139,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(stream.boxed()),
checksummer,
expected_checksums,
+ trailer_algorithm: None,
}
})),
}
@@ -185,6 +193,8 @@ fn compute_streaming_trailer_signature(
}
mod payload {
+ use http::{HeaderName, HeaderValue};
+
use garage_util::data::Hash;
use nom::bytes::streaming::{tag, take_while};
@@ -252,19 +262,21 @@ mod payload {
#[derive(Debug, Clone)]
pub struct TrailerChunk {
- pub header_name: Vec<u8>,
- pub header_value: Vec<u8>,
+ pub header_name: HeaderName,
+ pub header_value: HeaderValue,
pub signature: Option<Hash>,
}
impl TrailerChunk {
fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
- let (input, header_name) = try_parse!(take_while(
- |c: u8| c.is_ascii_alphanumeric() || c == b'-'
+ let (input, header_name) = try_parse!(map_res(
+ take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'),
+ HeaderName::from_bytes
)(input));
let (input, _) = try_parse!(tag(b":")(input));
- let (input, header_value) = try_parse!(take_while(
- |c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)
+ let (input, header_value) = try_parse!(map_res(
+ take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)),
+ HeaderValue::from_bytes
)(input));
// Possible '\n' after the header value, depends on clients
@@ -276,8 +288,8 @@ mod payload {
Ok((
input,
TrailerChunk {
- header_name: header_name.to_vec(),
- header_value: header_value.to_vec(),
+ header_name,
+ header_value,
signature: None,
},
))
@@ -371,6 +383,7 @@ where
buf: bytes::BytesMut,
signing: Option<SignParams>,
has_trailer: bool,
+ done: bool,
}
impl<S> StreamingPayloadStream<S>
@@ -383,6 +396,7 @@ where
buf: bytes::BytesMut::new(),
signing,
has_trailer,
+ done: false,
}
}
@@ -448,6 +462,10 @@ where
let mut this = self.project();
+ if *this.done {
+ return Poll::Ready(None);
+ }
+
loop {
let (input, payload) =
match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) {
@@ -499,17 +517,23 @@ where
if data.is_empty() {
// if there was a trailer, it would have been returned by the parser
assert!(!*this.has_trailer);
+ *this.done = true;
return Poll::Ready(None);
}
return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
+ trace!(
+ "In StreamingPayloadStream::poll_next: got trailer {:?}",
+ trailer
+ );
+
if let Some(signing) = this.signing.as_mut() {
let data = [
- &trailer.header_name[..],
+ trailer.header_name.as_ref(),
&b":"[..],
- &trailer.header_value[..],
+ trailer.header_value.as_ref(),
&b"\n"[..],
]
.concat();
@@ -529,10 +553,12 @@ where
}
*this.buf = input.into();
+ *this.done = true;
- // TODO: handle trailer
+ let mut trailers_map = HeaderMap::new();
+ trailers_map.insert(trailer.header_name, trailer.header_value);
- return Poll::Ready(None);
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers_map))));
}
}
}
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 6c1b7453..350684da 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -218,6 +218,7 @@ pub async fn handle_post_object(
// around here to make sure the rest of the machinery takes our acl into account.
let headers = get_headers(&params)?;
+ let checksum_algorithm = request_checksum_algorithm(&params)?;
let expected_checksums = ExpectedChecksums {
md5: params
.get("content-md5")
@@ -225,7 +226,9 @@ pub async fn handle_post_object(
.transpose()?
.map(str::to_string),
sha256: None,
- extra: request_checksum_algorithm_value(&params)?,
+ extra: checksum_algorithm
+ .map(|algo| extract_checksum_value(&params, algo))
+ .transpose()?,
};
let meta = ObjectVersionMetaInner {
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index c036f000..5860cf97 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -75,6 +75,7 @@ static_init.workspace = true
assert-json-diff.workspace = true
serde_json.workspace = true
base64.workspace = true
+crc32fast.workspace = true
k2v-client.workspace = true
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 99fd4385..6a8eed38 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -195,10 +195,10 @@ impl<'a> RequestBuilder<'a> {
all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap());
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
- let body_sha = match self.body_signature {
+ let body_sha = match &self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
- BodySignature::Streaming(size) => {
+ BodySignature::Streaming { chunk_size } => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
@@ -213,15 +213,56 @@ impl<'a> RequestBuilder<'a> {
// code.
all_headers.insert(
CONTENT_LENGTH,
- to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
- .len()
- .to_string()
- .try_into()
- .unwrap(),
+ to_streaming_body(
+ &self.body,
+ *chunk_size,
+ String::new(),
+ signer.clone(),
+ now,
+ "",
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => {
+ all_headers.insert(
+ CONTENT_ENCODING,
+ HeaderValue::from_str("aws-chunked").unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-decoded-content-length"),
+ HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-trailer"),
+ HeaderValue::from_str(&trailer_algorithm).unwrap(),
+ );
+
+ all_headers.insert(
+ CONTENT_LENGTH,
+ to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
+ );
+
+ "STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned()
+ }
};
all_headers.insert(
signature::X_AMZ_CONTENT_SHA256,
@@ -273,10 +314,26 @@ impl<'a> RequestBuilder<'a> {
let mut request = Request::builder();
*request.headers_mut().unwrap() = all_headers;
- let body = if let BodySignature::Streaming(size) = self.body_signature {
- to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
- } else {
- self.body.clone()
+ let body = match &self.body_signature {
+ BodySignature::Streaming { chunk_size } => to_streaming_body(
+ &self.body,
+ *chunk_size,
+ signature,
+ streaming_signer,
+ now,
+ &scope,
+ ),
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ ),
+ _ => self.body.clone(),
};
let request = request
.uri(uri)
@@ -305,7 +362,14 @@ impl<'a> RequestBuilder<'a> {
pub enum BodySignature {
Unsigned,
Classic,
- Streaming(usize),
+ Streaming {
+ chunk_size: usize,
+ },
+ StreamingUnsignedTrailer {
+ chunk_size: usize,
+ trailer_algorithm: String,
+ trailer_value: String,
+ },
}
fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String {
@@ -360,3 +424,26 @@ fn to_streaming_body(
res
}
+
+fn to_streaming_unsigned_trailer_body(
+ body: &[u8],
+ chunk_size: usize,
+ trailer_algorithm: &str,
+ trailer_value: &str,
+) -> Vec<u8> {
+ let mut res = Vec::with_capacity(body.len());
+ for chunk in body.chunks(chunk_size) {
+ let header = format!("{:x}\r\n", chunk.len());
+ res.extend_from_slice(header.as_bytes());
+ res.extend_from_slice(chunk);
+ res.extend_from_slice(b"\r\n");
+ }
+
+ res.extend_from_slice(b"0\r\n");
+ res.extend_from_slice(trailer_algorithm.as_bytes());
+ res.extend_from_slice(b":");
+ res.extend_from_slice(trailer_value.as_bytes());
+ res.extend_from_slice(b"\n\r\n\r\n");
+
+ res
+}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index da6c624b..8d71504f 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -99,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=debug,garage_api=trace")
+ .env(
+ "RUST_LOG",
+ "garage=debug,garage_api_common=trace,garage_api_s3=trace",
+ )
.spawn()
.expect("Could not start garage");
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 351aa422..e83d2355 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -1,5 +1,8 @@
use std::collections::HashMap;
+use base64::prelude::*;
+use crc32fast::Hasher as Crc32;
+
use crate::common;
use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
@@ -21,7 +24,7 @@ async fn test_putobject_streaming() {
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
- let _ = ctx
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -29,10 +32,11 @@ async fn test_putobject_streaming() {
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
@@ -65,7 +69,136 @@ async fn test_putobject_streaming() {
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
- let _ = ctx
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::Streaming { chunk_size: 16 })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // assert!(r.version_id.is_some());
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ //.key(CTRL_KEY)
+ .key("abc")
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, BODY);
+ assert_eq!(o.e_tag.unwrap(), etag);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 62);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ }
+}
+
+#[tokio::test]
+async fn test_putobject_streaming_unsigned_trailer() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer");
+
+ {
+ // Send an empty object (can serve as a directory marker)
+ // with a content type
+ let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
+ let content_type = "text/csv";
+ let mut headers = HashMap::new();
+ headers.insert("content-type".to_owned(), content_type.to_owned());
+
+ let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]);
+
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path(STD_KEY.to_owned())
+ .signed_headers(headers)
+ .vhost_style(true)
+ .body(vec![])
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 10,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: empty_crc32,
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // We return a version ID here
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert!(r.version_id.is_some());
+
+ //let _version = r.version_id.unwrap();
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, b"");
+ assert_eq!(o.e_tag.unwrap(), etag);
+ // We do not return version ID
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert_eq!(o.version_id.unwrap(), _version);
+ assert_eq!(o.content_type.unwrap(), content_type);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ }
+
+ {
+ let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
+
+ let mut crc32 = Crc32::new();
+ crc32.update(&BODY[..]);
+ let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
+
+ // try sending with wrong crc32, check that it fails
+ let err_res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: "2Yp9Yw==".into(),
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(
+ err_res.status().is_client_error(),
+ "got response: {:?}",
+ err_res
+ );
+
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -74,10 +207,15 @@ async fn test_putobject_streaming() {
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
- .body_signature(BodySignature::Streaming(16))
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: crc32,
+ })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
@@ -119,7 +257,7 @@ async fn test_create_bucket_streaming() {
.custom_request
.builder(bucket.to_owned())
.method(Method::PUT)
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
@@ -174,7 +312,7 @@ async fn test_put_website_streaming() {
.method(Method::PUT)
.query_params(query)
.body(website_config.as_bytes().to_vec())
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();