aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml18
-rw-r--r--src/garage/admin/mod.rs46
-rw-r--r--src/garage/repair/online.rs9
-rw-r--r--src/garage/server.rs8
-rw-r--r--src/garage/tests/common/client.rs2
-rw-r--r--src/garage/tests/common/custom_requester.rs120
-rw-r--r--src/garage/tests/common/garage.rs11
-rw-r--r--src/garage/tests/s3/objects.rs23
-rw-r--r--src/garage/tests/s3/streaming_signature.rs162
-rw-r--r--src/garage/tests/s3/website.rs28
10 files changed, 355 insertions, 72 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 483e33c0..966c8ac5 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,7 +23,9 @@ path = "tests/lib.rs"
[dependencies]
format_table.workspace = true
garage_db.workspace = true
-garage_api.workspace = true
+garage_api_admin.workspace = true
+garage_api_s3.workspace = true
+garage_api_k2v = { workspace = true, optional = true }
garage_block.workspace = true
garage_model.workspace = true
garage_net.workspace = true
@@ -40,7 +42,6 @@ parse_duration.workspace = true
hex.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
-rand.workspace = true
async-trait.workspace = true
sha1.workspace = true
sodiumoxide.workspace = true
@@ -48,21 +49,18 @@ structopt.workspace = true
git-version.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
-toml.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
opentelemetry.workspace = true
opentelemetry-prometheus = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
-prometheus = { workspace = true, optional = true }
syslog-tracing = { workspace = true, optional = true }
[dev-dependencies]
-aws-config.workspace = true
+garage_api_common.workspace = true
+
aws-sdk-s3.workspace = true
chrono.workspace = true
http.workspace = true
@@ -73,10 +71,12 @@ hyper-util.workspace = true
mktemp.workspace = true
sha2.workspace = true
+
static_init.workspace = true
assert-json-diff.workspace = true
serde_json.workspace = true
base64.workspace = true
+crc32fast.workspace = true
k2v-client.workspace = true
@@ -84,7 +84,7 @@ k2v-client.workspace = true
[features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ]
-k2v = [ "garage_util/k2v", "garage_api/k2v" ]
+k2v = [ "garage_util/k2v", "garage_api_k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
@@ -95,7 +95,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).
-metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ]
+metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ]
# Exporter for the OpenTelemetry Collector.
telemetry-otlp = [ "opentelemetry-otlp" ]
# Logging to syslog
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index e2468143..3bbc2b86 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -4,9 +4,11 @@ mod key;
use std::collections::HashMap;
use std::fmt::Write;
+use std::future::Future;
use std::sync::Arc;
-use async_trait::async_trait;
+use futures::future::FutureExt;
+
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
@@ -144,7 +146,12 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in self.garage.system.get_known_nodes().iter() {
+ if node.is_up && !all_nodes.contains(&node.id) {
+ all_nodes.push(node.id);
+ }
+ }
for node in all_nodes.iter() {
let mut opt = opt.clone();
@@ -482,7 +489,7 @@ impl AdminRpcHandler {
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
- .await
+ .await?
}))
.await;
@@ -495,7 +502,11 @@ impl AdminRpcHandler {
ret.push(format!("{:?}\t{}", to, res_str));
}
- Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ if resps.iter().any(Result::is_err) {
+ Err(GarageError::Message(format_table_to_string(ret)).into())
+ } else {
+ Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ }
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
@@ -505,22 +516,25 @@ impl AdminRpcHandler {
}
}
-#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
+ fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
+ ) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
+ let self2 = self.clone();
+ async move {
+ match message {
+ AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
+ AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
}
+ .boxed()
}
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 2c5227d2..47883f97 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -1,3 +1,4 @@
+use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
@@ -93,17 +94,16 @@ pub async fn launch_online_repair(
// ----
-#[async_trait]
trait TableRepair: Send + Sync + 'static {
type T: TableSchema;
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
- async fn process(
+ fn process(
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
- ) -> Result<bool, Error>;
+ ) -> impl Future<Output = Result<bool, Error>> + Send;
}
struct TableRepairWorker<T: TableRepair> {
@@ -174,7 +174,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
struct RepairVersions;
-#[async_trait]
impl TableRepair for RepairVersions {
type T = VersionTable;
@@ -221,7 +220,6 @@ impl TableRepair for RepairVersions {
struct RepairBlockRefs;
-#[async_trait]
impl TableRepair for RepairBlockRefs {
type T = BlockRefTable;
@@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs {
struct RepairMpu;
-#[async_trait]
impl TableRepair for RepairMpu {
type T = MultipartUploadTable;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 65bf34db..1dc86fd3 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,13 +6,13 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_api::admin::api_server::AdminApiServer;
-use garage_api::s3::api_server::S3ApiServer;
+use garage_api_admin::api_server::AdminApiServer;
+use garage_api_s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::WebServer;
#[cfg(feature = "k2v")]
-use garage_api::k2v::api_server::K2VApiServer;
+use garage_api_k2v::api_server::K2VApiServer;
use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
@@ -113,7 +113,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
if let Some(web_config) = &config.s3_web {
info!("Initializing web server...");
- let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
+ let web_server = WebServer::new(garage.clone(), &web_config);
servers.push((
"Web",
tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())),
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index ffa4cae8..7a6612cb 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -12,7 +12,7 @@ pub fn build_client(key: &Key) -> Client {
.endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
- .behavior_version(BehaviorVersion::v2023_11_09())
+ .behavior_version(BehaviorVersion::v2024_03_28())
.build();
Client::from_conf(config)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 42368976..6a8eed38 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
use super::garage::{Instance, Key};
-use garage_api::signature;
+use garage_api_common::signature;
pub type Body = FullBody<hyper::body::Bytes>;
@@ -192,16 +192,13 @@ impl<'a> RequestBuilder<'a> {
.collect::<HeaderMap>();
let date = now.format(signature::LONG_DATETIME).to_string();
- all_headers.insert(
- signature::payload::X_AMZ_DATE,
- HeaderValue::from_str(&date).unwrap(),
- );
+ all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap());
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
- let body_sha = match self.body_signature {
+ let body_sha = match &self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
- BodySignature::Streaming(size) => {
+ BodySignature::Streaming { chunk_size } => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
@@ -216,18 +213,59 @@ impl<'a> RequestBuilder<'a> {
// code.
all_headers.insert(
CONTENT_LENGTH,
- to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
- .len()
- .to_string()
- .try_into()
- .unwrap(),
+ to_streaming_body(
+ &self.body,
+ *chunk_size,
+ String::new(),
+ signer.clone(),
+ now,
+ "",
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => {
+ all_headers.insert(
+ CONTENT_ENCODING,
+ HeaderValue::from_str("aws-chunked").unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-decoded-content-length"),
+ HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-trailer"),
+ HeaderValue::from_str(&trailer_algorithm).unwrap(),
+ );
+
+ all_headers.insert(
+ CONTENT_LENGTH,
+ to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ )
+ .len()
+ .to_string()
+ .try_into()
+ .unwrap(),
+ );
+
+ "STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned()
+ }
};
all_headers.insert(
- signature::payload::X_AMZ_CONTENT_SH256,
+ signature::X_AMZ_CONTENT_SHA256,
HeaderValue::from_str(&body_sha).unwrap(),
);
@@ -276,10 +314,26 @@ impl<'a> RequestBuilder<'a> {
let mut request = Request::builder();
*request.headers_mut().unwrap() = all_headers;
- let body = if let BodySignature::Streaming(size) = self.body_signature {
- to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
- } else {
- self.body.clone()
+ let body = match &self.body_signature {
+ BodySignature::Streaming { chunk_size } => to_streaming_body(
+ &self.body,
+ *chunk_size,
+ signature,
+ streaming_signer,
+ now,
+ &scope,
+ ),
+ BodySignature::StreamingUnsignedTrailer {
+ chunk_size,
+ trailer_algorithm,
+ trailer_value,
+ } => to_streaming_unsigned_trailer_body(
+ &self.body,
+ *chunk_size,
+ &trailer_algorithm,
+ &trailer_value,
+ ),
+ _ => self.body.clone(),
};
let request = request
.uri(uri)
@@ -308,7 +362,14 @@ impl<'a> RequestBuilder<'a> {
pub enum BodySignature {
Unsigned,
Classic,
- Streaming(usize),
+ Streaming {
+ chunk_size: usize,
+ },
+ StreamingUnsignedTrailer {
+ chunk_size: usize,
+ trailer_algorithm: String,
+ trailer_value: String,
+ },
}
fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String {
@@ -363,3 +424,26 @@ fn to_streaming_body(
res
}
+
+fn to_streaming_unsigned_trailer_body(
+ body: &[u8],
+ chunk_size: usize,
+ trailer_algorithm: &str,
+ trailer_value: &str,
+) -> Vec<u8> {
+ let mut res = Vec::with_capacity(body.len());
+ for chunk in body.chunks(chunk_size) {
+ let header = format!("{:x}\r\n", chunk.len());
+ res.extend_from_slice(header.as_bytes());
+ res.extend_from_slice(chunk);
+ res.extend_from_slice(b"\r\n");
+ }
+
+ res.extend_from_slice(b"0\r\n");
+ res.extend_from_slice(trailer_algorithm.as_bytes());
+ res.extend_from_slice(b":");
+ res.extend_from_slice(trailer_value.as_bytes());
+ res.extend_from_slice(b"\n\r\n\r\n");
+
+ res
+}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index db23d316..8d71504f 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -13,7 +13,6 @@ static GARAGE_TEST_SECRET: &str =
#[derive(Debug, Default, Clone)]
pub struct Key {
- pub name: Option<String>,
pub id: String,
pub secret: String,
}
@@ -100,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=debug,garage_api=trace")
+ .env(
+ "RUST_LOG",
+ "garage=debug,garage_api_common=trace,garage_api_s3=trace",
+ )
.spawn()
.expect("Could not start garage");
@@ -213,10 +215,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
- Key {
- name: maybe_name.map(String::from),
- ..key
- }
+ key
}
}
diff --git a/src/garage/tests/s3/objects.rs b/src/garage/tests/s3/objects.rs
index 77eca2b1..dfc5253d 100644
--- a/src/garage/tests/s3/objects.rs
+++ b/src/garage/tests/s3/objects.rs
@@ -189,12 +189,14 @@ async fn test_getobject() {
#[tokio::test]
async fn test_metadata() {
+ use aws_sdk_s3::primitives::{DateTime, DateTimeFormat};
+
let ctx = common::context();
let bucket = ctx.create_bucket("testmetadata");
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
- let exp = aws_sdk_s3::primitives::DateTime::from_secs(10000000000);
- let exp2 = aws_sdk_s3::primitives::DateTime::from_secs(10000500000);
+ let exp = DateTime::from_secs(10000000000);
+ let exp2 = DateTime::from_secs(10000500000);
{
// Note. The AWS client SDK adds a Content-Type header
@@ -227,7 +229,7 @@ async fn test_metadata() {
assert_eq!(o.content_disposition, None);
assert_eq!(o.content_encoding, None);
assert_eq!(o.content_language, None);
- assert_eq!(o.expires, None);
+ assert_eq!(o.expires_string, None);
assert_eq!(o.metadata.unwrap_or_default().len(), 0);
let o = ctx
@@ -250,7 +252,10 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy");
assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy");
assert_eq!(o.content_language.unwrap().as_str(), "cldummy");
- assert_eq!(o.expires.unwrap(), exp);
+ assert_eq!(
+ o.expires_string.unwrap(),
+ exp.fmt(DateTimeFormat::HttpDate).unwrap()
+ );
}
{
@@ -288,7 +293,10 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cdtest");
assert_eq!(o.content_encoding.unwrap().as_str(), "cetest");
assert_eq!(o.content_language.unwrap().as_str(), "cltest");
- assert_eq!(o.expires.unwrap(), exp2);
+ assert_eq!(
+ o.expires_string.unwrap(),
+ exp2.fmt(DateTimeFormat::HttpDate).unwrap()
+ );
let mut meta = o.metadata.unwrap();
assert_eq!(meta.remove("testmeta").unwrap(), "hello people");
assert_eq!(meta.remove("nice-unicode-meta").unwrap(), "宅配便");
@@ -314,7 +322,10 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy");
assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy");
assert_eq!(o.content_language.unwrap().as_str(), "cldummy");
- assert_eq!(o.expires.unwrap(), exp);
+ assert_eq!(
+ o.expires_string.unwrap(),
+ exp.fmt(DateTimeFormat::HttpDate).unwrap()
+ );
}
}
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 351aa422..a86feefc 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -1,5 +1,8 @@
use std::collections::HashMap;
+use base64::prelude::*;
+use crc32fast::Hasher as Crc32;
+
use crate::common;
use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
@@ -21,7 +24,7 @@ async fn test_putobject_streaming() {
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
- let _ = ctx
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -29,10 +32,11 @@ async fn test_putobject_streaming() {
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
@@ -65,7 +69,146 @@ async fn test_putobject_streaming() {
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
- let _ = ctx
+ let mut crc32 = Crc32::new();
+ crc32.update(&BODY[..]);
+ let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
+
+ let mut headers = HashMap::new();
+ headers.insert("x-amz-checksum-crc32".to_owned(), crc32.clone());
+
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .signed_headers(headers)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::Streaming { chunk_size: 16 })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // assert!(r.version_id.is_some());
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ //.key(CTRL_KEY)
+ .key("abc")
+ .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, BODY);
+ assert_eq!(o.e_tag.unwrap(), etag);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 62);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ assert_eq!(o.checksum_crc32.unwrap(), crc32);
+ }
+}
+
+#[tokio::test]
+async fn test_putobject_streaming_unsigned_trailer() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer");
+
+ {
+ // Send an empty object (can serve as a directory marker)
+ // with a content type
+ let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
+ let content_type = "text/csv";
+ let mut headers = HashMap::new();
+ headers.insert("content-type".to_owned(), content_type.to_owned());
+
+ let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]);
+
+ let res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path(STD_KEY.to_owned())
+ .signed_headers(headers)
+ .vhost_style(true)
+ .body(vec![])
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 10,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: empty_crc32,
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
+
+ // assert_eq!(r.e_tag.unwrap().as_str(), etag);
+ // We return a version ID here
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert!(r.version_id.is_some());
+
+ //let _version = r.version_id.unwrap();
+
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, b"");
+ assert_eq!(o.e_tag.unwrap(), etag);
+ // We do not return version ID
+ // We should check if Amazon is returning one when versioning is not enabled
+ // assert_eq!(o.version_id.unwrap(), _version);
+ assert_eq!(o.content_type.unwrap(), content_type);
+ assert!(o.last_modified.is_some());
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
+ }
+
+ {
+ let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
+
+ let mut crc32 = Crc32::new();
+ crc32.update(&BODY[..]);
+ let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
+
+ // try sending with wrong crc32, check that it fails
+ let err_res = ctx
+ .custom_request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
+ //fail
+ .path("abc".to_owned())
+ .vhost_style(true)
+ .body(BODY.to_vec())
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: "2Yp9Yw==".into(),
+ })
+ .send()
+ .await
+ .unwrap();
+ assert!(
+ err_res.status().is_client_error(),
+ "got response: {:?}",
+ err_res
+ );
+
+ let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@@ -74,10 +217,15 @@ async fn test_putobject_streaming() {
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
- .body_signature(BodySignature::Streaming(16))
+ .body_signature(BodySignature::StreamingUnsignedTrailer {
+ chunk_size: 16,
+ trailer_algorithm: "x-amz-checksum-crc32".into(),
+ trailer_value: crc32.clone(),
+ })
.send()
.await
.unwrap();
+ assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
@@ -88,6 +236,7 @@ async fn test_putobject_streaming() {
.bucket(&bucket)
//.key(CTRL_KEY)
.key("abc")
+ .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
.send()
.await
.unwrap();
@@ -98,6 +247,7 @@ async fn test_putobject_streaming() {
assert_eq!(o.content_length.unwrap(), 62);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
+ assert_eq!(o.checksum_crc32.unwrap(), crc32);
}
}
@@ -119,7 +269,7 @@ async fn test_create_bucket_streaming() {
.custom_request
.builder(bucket.to_owned())
.method(Method::PUT)
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
@@ -174,7 +324,7 @@ async fn test_put_website_streaming() {
.method(Method::PUT)
.query_params(query)
.body(website_config.as_bytes().to_vec())
- .body_signature(BodySignature::Streaming(10))
+ .body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index 0cadc388..9a9e29f2 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -11,6 +11,7 @@ use http::{Request, StatusCode};
use http_body_util::BodyExt;
use http_body_util::Full as FullBody;
use hyper::body::Bytes;
+use hyper::header::LOCATION;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use serde_json::json;
@@ -295,6 +296,33 @@ async fn test_website_s3_api() {
);
}
+ // Test x-amz-website-redirect-location
+ {
+ ctx.client
+ .put_object()
+ .bucket(&bucket)
+ .key("test-redirect.html")
+ .website_redirect_location("https://perdu.com")
+ .send()
+ .await
+ .unwrap();
+
+ let req = Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/test-redirect.html",
+ ctx.garage.web_port
+ ))
+ .header("Host", format!("{}.web.garage", BCKT_NAME))
+ .body(Body::new(Bytes::new()))
+ .unwrap();
+
+ let resp = client.request(req).await.unwrap();
+
+ assert_eq!(resp.status(), StatusCode::MOVED_PERMANENTLY);
+ assert_eq!(resp.headers().get(LOCATION).unwrap(), "https://perdu.com");
+ }
+
// Test CORS with an allowed preflight request
{
let req = Request::builder()