aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/tests/common/custom_requester.rs111
-rw-r--r--src/garage/tests/common/garage.rs5
-rw-r--r--src/garage/tests/s3/streaming_signature.rs150
4 files changed, 248 insertions, 19 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index c036f000..5860cf97 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -75,6 +75,7 @@ static_init.workspace = true
assert-json-diff.workspace = true
serde_json.workspace = true
base64.workspace = true
+crc32fast.workspace = true
k2v-client.workspace = true
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 99fd4385..6a8eed38 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -195,10 +195,10 @@ impl<'a> RequestBuilder<'a> {
all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap());
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
- let body_sha = match self.body_signature {
+ let body_sha = match &self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
- BodySignature::Streaming(size) => {
+ BodySignature::Streaming { chunk_size } => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
@@ -213,15 +213,56 @@ impl<'a> RequestBuilder<'a> {
// code.
all_headers.insert(
CONTENT_LENGTH,
- to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
- .len()
- .to_string()
- .try_into()
- .unwrap(),
+ to_streaming_body(
+ &self.body,
+ *chunk_size,
+ String::new(),
+ signer.clone(),
+ now,
+ "",
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => {
+ all_headers.insert(
+ CONTENT_ENCODING,
+ HeaderValue::from_str("aws-chunked").unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-decoded-content-length"),
+ HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-trailer"),
+ HeaderValue::from_str(&trailer_algorithm).unwrap(),
+ );
+
+ all_headers.insert(
+ CONTENT_LENGTH,
+ to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
+ );
+
+ "STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned()
+ }
};
all_headers.insert(
signature::X_AMZ_CONTENT_SHA256,
@@ -273,10 +314,26 @@ impl<'a> RequestBuilder<'a> {
let mut request = Request::builder();
*request.headers_mut().unwrap() = all_headers;
- let body = if let BodySignature::Streaming(size) = self.body_signature {
- to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
- } else {
- self.body.clone()
+ let body = match &self.body_signature {
+ BodySignature::Streaming { chunk_size } => to_streaming_body(
+ &self.body,
+ *chunk_size,
+ signature,
+ streaming_signer,
+ now,
+ &scope,
+ ),
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ ),
+ _ => self.body.clone(),
};
let request = request
.uri(uri)
@@ -305,7 +362,14 @@ impl<'a> RequestBuilder<'a> {
pub enum BodySignature {
Unsigned,
Classic,
- Streaming(usize),
+ Streaming {
+ chunk_size: usize,
+ },
+ StreamingUnsignedTrailer {
+ chunk_size: usize,
+ trailer_algorithm: String,
+ trailer_value: String,
+ },
}
fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String {
@@ -360,3 +424,26 @@ fn to_streaming_body(
res
}
+
+fn to_streaming_unsigned_trailer_body(
+ body: &[u8],
+ chunk_size: usize,
+ trailer_algorithm: &str,
+ trailer_value: &str,
+) -> Vec<u8> {
+ let mut res = Vec::with_capacity(body.len());
+ for chunk in body.chunks(chunk_size) {
+ let header = format!("{:x}\r\n", chunk.len());
+ res.extend_from_slice(header.as_bytes());
+ res.extend_from_slice(chunk);
+ res.extend_from_slice(b"\r\n");
+ }
+
+ res.extend_from_slice(b"0\r\n");
+ res.extend_from_slice(trailer_algorithm.as_bytes());
+ res.extend_from_slice(b":");
+ res.extend_from_slice(trailer_value.as_bytes());
+ res.extend_from_slice(b"\n\r\n\r\n");
+
+ res
+}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index da6c624b..8d71504f 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -99,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=debug,garage_api=trace")
+ .env(
+ "RUST_LOG",
+ "garage=debug,garage_api_common=trace,garage_api_s3=trace",
+ )
.spawn()
.expect("Could not start garage");
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 351aa422..e83d2355 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -1,5 +1,8 @@
use std::collections::HashMap;
+use base64::prelude::*;
+use crc32fast::Hasher as Crc32;
+
use crate::common;
use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
@@ -21,7 +24,7 @@ async fn test_putobject_streaming() {
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
- let _ = ctx
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -29,10 +32,11 @@ async fn test_putobject_streaming() {
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
@@ -65,7 +69,136 @@ async fn test_putobject_streaming() {
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
- let _ = ctx
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::Streaming { chunk_size: 16 })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // assert!(r.version_id.is_some());
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ //.key(CTRL_KEY)
+ .key("abc")
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, BODY);
+ assert_eq!(o.e_tag.unwrap(), etag);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 62);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ }
+}
+
+#[tokio::test]
+async fn test_putobject_streaming_unsigned_trailer() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer");
+
+ {
+ // Send an empty object (can serve as a directory marker)
+ // with a content type
+ let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
+ let content_type = "text/csv";
+ let mut headers = HashMap::new();
+ headers.insert("content-type".to_owned(), content_type.to_owned());
+
+ let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]);
+
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path(STD_KEY.to_owned())
+ .signed_headers(headers)
+ .vhost_style(true)
+ .body(vec![])
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 10,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: empty_crc32,
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // We return a version ID here
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert!(r.version_id.is_some());
+
+ //let _version = r.version_id.unwrap();
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, b"");
+ assert_eq!(o.e_tag.unwrap(), etag);
+ // We do not return version ID
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert_eq!(o.version_id.unwrap(), _version);
+ assert_eq!(o.content_type.unwrap(), content_type);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ }
+
+ {
+ let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
+
+ let mut crc32 = Crc32::new();
+ crc32.update(&BODY[..]);
+ let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
+
+ // try sending with wrong crc32, check that it fails
+ let err_res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: "2Yp9Yw==".into(),
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(
+ err_res.status().is_client_error(),
+ "got response: {:?}",
+ err_res
+ );
+
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -74,10 +207,15 @@ async fn test_putobject_streaming() {
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
- .body_signature(BodySignature::Streaming(16))
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: crc32,
+ })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
@@ -119,7 +257,7 @@ async fn test_create_bucket_streaming() {
.custom_request
.builder(bucket.to_owned())
.method(Method::PUT)
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
@@ -174,7 +312,7 @@ async fn test_put_website_streaming() {
.method(Method::PUT)
.query_params(query)
.body(website_config.as_bytes().to_vec())
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();