diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/tests/common/custom_requester.rs | 111 | ||||
-rw-r--r-- | src/garage/tests/common/garage.rs | 5 | ||||
-rw-r--r-- | src/garage/tests/s3/streaming_signature.rs | 150 |
4 files changed, 248 insertions, 19 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index c036f000..5860cf97 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -75,6 +75,7 @@ static_init.workspace = true assert-json-diff.workspace = true serde_json.workspace = true base64.workspace = true +crc32fast.workspace = true k2v-client.workspace = true diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 99fd4385..6a8eed38 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -195,10 +195,10 @@ impl<'a> RequestBuilder<'a> { all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap()); all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap()); - let body_sha = match self.body_signature { + let body_sha = match &self.body_signature { BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(), BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)), - BodySignature::Streaming(size) => { + BodySignature::Streaming { chunk_size } => { all_headers.insert( CONTENT_ENCODING, HeaderValue::from_str("aws-chunked").unwrap(), @@ -213,15 +213,56 @@ impl<'a> RequestBuilder<'a> { // code. all_headers.insert( CONTENT_LENGTH, - to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "") - .len() - .to_string() - .try_into() - .unwrap(), + to_streaming_body( + &self.body, + *chunk_size, + String::new(), + signer.clone(), + now, + "", + ) + .len() + .to_string() + .try_into() + .unwrap(), ); "STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned() } + BodySignature::StreamingUnsignedTrailer { + chunk_size, + trailer_algorithm, + trailer_value, + } => { + all_headers.insert( + CONTENT_ENCODING, + HeaderValue::from_str("aws-chunked").unwrap(), + ); + all_headers.insert( + HeaderName::from_static("x-amz-decoded-content-length"), + HeaderValue::from_str(&self.body.len().to_string()).unwrap(), + ); + all_headers.insert( + HeaderName::from_static("x-amz-trailer"), + HeaderValue::from_str(&trailer_algorithm).unwrap(), + ); + + all_headers.insert( + CONTENT_LENGTH, + to_streaming_unsigned_trailer_body( + &self.body, + *chunk_size, + &trailer_algorithm, + &trailer_value, + ) + .len() + .to_string() + .try_into() + .unwrap(), + ); + + "STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned() + } }; all_headers.insert( signature::X_AMZ_CONTENT_SHA256, @@ -273,10 +314,26 @@ impl<'a> RequestBuilder<'a> { let mut request = Request::builder(); *request.headers_mut().unwrap() = all_headers; - let body = if let BodySignature::Streaming(size) = self.body_signature { - to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope) - } else { - self.body.clone() + let body = match &self.body_signature { + BodySignature::Streaming { chunk_size } => to_streaming_body( + &self.body, + *chunk_size, + signature, + streaming_signer, + now, + &scope, + ), + BodySignature::StreamingUnsignedTrailer { + chunk_size, + trailer_algorithm, + trailer_value, + } => to_streaming_unsigned_trailer_body( + &self.body, + *chunk_size, + &trailer_algorithm, + &trailer_value, + ), + _ => self.body.clone(), }; let request = request .uri(uri) @@ -305,7 +362,14 @@ impl<'a> RequestBuilder<'a> { pub enum BodySignature { Unsigned, Classic, - Streaming(usize), + Streaming { + chunk_size: usize, + }, + StreamingUnsignedTrailer { + chunk_size: usize, + trailer_algorithm: String, + trailer_value: String, + }, } fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String { @@ -360,3 +424,26 @@ fn to_streaming_body( res } + +fn to_streaming_unsigned_trailer_body( + body: &[u8], + chunk_size: usize, + trailer_algorithm: &str, + trailer_value: &str, +) -> Vec<u8> { + let mut res = Vec::with_capacity(body.len()); + for chunk in body.chunks(chunk_size) { + let header = format!("{:x}\r\n", chunk.len()); + res.extend_from_slice(header.as_bytes()); + res.extend_from_slice(chunk); + res.extend_from_slice(b"\r\n"); + } + + res.extend_from_slice(b"0\r\n"); + res.extend_from_slice(trailer_algorithm.as_bytes()); + res.extend_from_slice(b":"); + res.extend_from_slice(trailer_value.as_bytes()); + res.extend_from_slice(b"\n\r\n\r\n"); + + res +} diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index da6c624b..8d71504f 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -99,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}" .arg("server") .stdout(stdout) .stderr(stderr) - .env("RUST_LOG", "garage=debug,garage_api=trace") + .env( + "RUST_LOG", + "garage=debug,garage_api_common=trace,garage_api_s3=trace", + ) .spawn() .expect("Could not start garage"); diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index 351aa422..e83d2355 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -1,5 +1,8 @@ use std::collections::HashMap; +use base64::prelude::*; +use crc32fast::Hasher as Crc32; + use crate::common; use crate::common::ext::CommandExt; use common::custom_requester::BodySignature; @@ -21,7 +24,7 @@ async fn test_putobject_streaming() { let content_type = "text/csv"; let mut headers = HashMap::new(); headers.insert("content-type".to_owned(), content_type.to_owned()); - let _ = ctx + let res = ctx .custom_request .builder(bucket.clone()) .method(Method::PUT) @@ -29,10 +32,11 @@ async fn test_putobject_streaming() { .signed_headers(headers) .vhost_style(true) .body(vec![]) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); // assert_eq!(r.e_tag.unwrap().as_str(), etag); // We return a version ID here @@ -65,7 +69,136 @@ async fn test_putobject_streaming() { { let etag = "\"46cf18a9b447991b450cad3facf5937e\""; - let _ = ctx + let res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this + //fail + .path("abc".to_owned()) + .vhost_style(true) + .body(BODY.to_vec()) + .body_signature(BodySignature::Streaming { chunk_size: 16 }) + .send() + .await + .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); + + // assert_eq!(r.e_tag.unwrap().as_str(), etag); + // assert!(r.version_id.is_some()); + + let o = ctx + .client + .get_object() + .bucket(&bucket) + //.key(CTRL_KEY) + .key("abc") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, BODY); + assert_eq!(o.e_tag.unwrap(), etag); + assert!(o.last_modified.is_some()); + assert_eq!(o.content_length.unwrap(), 62); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); + } +} + +#[tokio::test] +async fn test_putobject_streaming_unsigned_trailer() { + let ctx = common::context(); + let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer"); + + { + // Send an empty object (can serve as a directory marker) + // with a content type + let etag = "\"d41d8cd98f00b204e9800998ecf8427e\""; + let content_type = "text/csv"; + let mut headers = HashMap::new(); + headers.insert("content-type".to_owned(), content_type.to_owned()); + + let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]); + + let res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + .path(STD_KEY.to_owned()) + .signed_headers(headers) + .vhost_style(true) + .body(vec![]) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 10, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: empty_crc32, + }) + .send() + .await + .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); + + // assert_eq!(r.e_tag.unwrap().as_str(), etag); + // We return a version ID here + // We should check if Amazon is returning one when versioning is not enabled + // assert!(r.version_id.is_some()); + + //let _version = r.version_id.unwrap(); + + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key(STD_KEY) + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, b""); + assert_eq!(o.e_tag.unwrap(), etag); + // We do not return version ID + // We should check if Amazon is returning one when versioning is not enabled + // assert_eq!(o.version_id.unwrap(), _version); + assert_eq!(o.content_type.unwrap(), content_type); + assert!(o.last_modified.is_some()); + assert_eq!(o.content_length.unwrap(), 0); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); + } + + { + let etag = "\"46cf18a9b447991b450cad3facf5937e\""; + + let mut crc32 = Crc32::new(); + crc32.update(&BODY[..]); + let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]); + + // try sending with wrong crc32, check that it fails + let err_res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this + //fail + .path("abc".to_owned()) + .vhost_style(true) + .body(BODY.to_vec()) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 16, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: "2Yp9Yw==".into(), + }) + .send() + .await + .unwrap(); + assert!( + err_res.status().is_client_error(), + "got response: {:?}", + err_res + ); + + let res = ctx .custom_request .builder(bucket.clone()) .method(Method::PUT) @@ -74,10 +207,15 @@ async fn test_putobject_streaming() { .path("abc".to_owned()) .vhost_style(true) .body(BODY.to_vec()) - .body_signature(BodySignature::Streaming(16)) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 16, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: crc32, + }) .send() .await .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); // assert_eq!(r.e_tag.unwrap().as_str(), etag); // assert!(r.version_id.is_some()); @@ -119,7 +257,7 @@ async fn test_create_bucket_streaming() { .custom_request .builder(bucket.to_owned()) .method(Method::PUT) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap(); @@ -174,7 +312,7 @@ async fn test_put_website_streaming() { .method(Method::PUT) .query_params(query) .body(website_config.as_bytes().to_vec()) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap(); |