aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/admin/api_server.rs58
-rw-r--r--src/api/helpers.rs14
-rw-r--r--src/api/k2v/api_server.rs60
-rw-r--r--src/api/k2v/batch.rs51
-rw-r--r--src/api/k2v/index.rs12
-rw-r--r--src/api/k2v/item.rs38
-rw-r--r--src/api/s3/api_server.rs227
-rw-r--r--src/api/s3/bucket.rs48
-rw-r--r--src/api/s3/copy.rs35
-rw-r--r--src/api/s3/cors.rs65
-rw-r--r--src/api/s3/delete.rs29
-rw-r--r--src/api/s3/get.rs21
-rw-r--r--src/api/s3/lifecycle.rs53
-rw-r--r--src/api/s3/list.rs14
-rw-r--r--src/api/s3/multipart.rs68
-rw-r--r--src/api/s3/post_object.rs54
-rw-r--r--src/api/s3/put.rs68
-rw-r--r--src/api/s3/website.rs54
-rw-r--r--src/api/signature/mod.rs29
-rw-r--r--src/api/signature/payload.rs615
-rw-r--r--src/api/signature/streaming.rs13
-rw-r--r--src/garage/tests/common/custom_requester.rs87
-rw-r--r--src/garage/tests/s3/mod.rs1
-rw-r--r--src/garage/tests/s3/presigned.rs72
-rw-r--r--src/garage/tests/s3/streaming_signature.rs2
-rw-r--r--src/model/bucket_table.rs7
-rw-r--r--src/net/Cargo.toml2
-rw-r--r--src/web/web_server.rs29
29 files changed, 1090 insertions, 737 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 3b555b8b..9b215333 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -21,6 +21,7 @@ garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
+argon2.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 2b9be24e..0e4565bb 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
+use argon2::password_hash::PasswordHash;
use async_trait::async_trait;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
@@ -45,14 +46,8 @@ impl AdminApiServer {
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
) -> Self {
let cfg = &garage.config.admin;
- let metrics_token = cfg
- .metrics_token
- .as_ref()
- .map(|tok| format!("Bearer {}", tok));
- let admin_token = cfg
- .admin_token
- .as_ref()
- .map(|tok| format!("Bearer {}", tok));
+ let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
+ let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
Self {
garage,
#[cfg(feature = "metrics")]
@@ -248,11 +243,11 @@ impl ApiHandler for AdminApiServer {
req: Request<IncomingBody>,
endpoint: Endpoint,
) -> Result<Response<ResBody>, Error> {
- let expected_auth_header =
+ let required_auth_hash =
match endpoint.authorization_type() {
Authorization::None => None,
- Authorization::MetricsToken => self.metrics_token.as_ref(),
- Authorization::AdminToken => match &self.admin_token {
+ Authorization::MetricsToken => self.metrics_token.as_deref(),
+ Authorization::AdminToken => match self.admin_token.as_deref() {
None => return Err(Error::forbidden(
"Admin token isn't configured, admin API access is disabled for security.",
)),
@@ -260,14 +255,11 @@ impl ApiHandler for AdminApiServer {
},
};
- if let Some(h) = expected_auth_header {
+ if let Some(password_hash) = required_auth_hash {
match req.headers().get("Authorization") {
None => return Err(Error::forbidden("Authorization token must be provided")),
- Some(v) => {
- let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false);
- if !authorized {
- return Err(Error::forbidden("Invalid authorization token provided"));
- }
+ Some(authorization) => {
+ verify_bearer_token(&authorization, password_hash)?;
}
}
}
@@ -342,3 +334,35 @@ impl ApiEndpoint for Endpoint {
fn add_span_attributes(&self, _span: SpanRef<'_>) {}
}
+
+fn hash_bearer_token(token: &str) -> String {
+ use argon2::{
+ password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
+ Argon2,
+ };
+
+ let salt = SaltString::generate(&mut OsRng);
+ let argon2 = Argon2::default();
+ argon2
+ .hash_password(token.trim().as_bytes(), &salt)
+ .expect("could not hash API token")
+ .to_string()
+}
+
+fn verify_bearer_token(token: &hyper::http::HeaderValue, password_hash: &str) -> Result<(), Error> {
+ use argon2::{password_hash::PasswordVerifier, Argon2};
+
+ let parsed_hash = PasswordHash::new(&password_hash).unwrap();
+
+ token
+ .to_str()?
+ .strip_prefix("Bearer ")
+ .and_then(|token| {
+ Argon2::default()
+ .verify_password(token.trim().as_bytes(), &parsed_hash)
+ .ok()
+ })
+ .ok_or_else(|| Error::forbidden("Invalid authorization token"))?;
+
+ Ok(())
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index 5f488912..cf60005d 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,4 +1,5 @@
use std::convert::Infallible;
+use std::sync::Arc;
use futures::{Stream, StreamExt, TryStreamExt};
@@ -10,6 +11,10 @@ use hyper::{
use idna::domain_to_unicode;
use serde::{Deserialize, Serialize};
+use garage_model::bucket_table::BucketParams;
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;
use crate::common_error::{CommonError as Error, *};
@@ -27,6 +32,15 @@ pub enum Authorization {
Owner,
}
+/// The values which are known for each request related to a bucket
+pub struct ReqCtx {
+ pub garage: Arc<Garage>,
+ pub bucket_id: Uuid,
+ pub bucket_name: String,
+ pub bucket_params: BucketParams,
+ pub api_key: Key,
+}
+
/// Host to bucket
///
/// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket",
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index e97da2af..658cfcc8 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -15,8 +15,7 @@ use garage_model::garage::Garage;
use crate::generic_server::*;
use crate::k2v::error::*;
-use crate::signature::payload::check_payload_signature;
-use crate::signature::streaming::*;
+use crate::signature::verify_request;
use crate::helpers::*;
use crate::k2v::batch::*;
@@ -86,17 +85,7 @@ impl ApiHandler for K2VApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
- let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
- let api_key = api_key
- .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
-
- let req = parse_streaming_body(
- &api_key,
- req,
- &mut content_sha256,
- &garage.config.s3_api.s3_region,
- "k2v",
- )?;
+ let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?;
let bucket_id = garage
.bucket_helper()
@@ -106,6 +95,7 @@ impl ApiHandler for K2VApiServer {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
+ let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -123,40 +113,42 @@ impl ApiHandler for K2VApiServer {
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
- .ok_or_internal_error("Error looking up CORS rule")?,
+ Method::GET | Method::HEAD | Method::POST => {
+ find_matching_cors_rule(&bucket_params, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?
+ .cloned()
+ }
_ => None,
};
+ let ctx = ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ bucket_params,
+ api_key,
+ };
+
let resp = match endpoint {
Endpoint::DeleteItem {
partition_key,
sort_key,
- } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_delete_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::InsertItem {
partition_key,
sort_key,
- } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_insert_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::ReadItem {
partition_key,
sort_key,
- } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_read_item(ctx, &req, &partition_key, &sort_key).await,
Endpoint::PollItem {
partition_key,
sort_key,
causality_token,
timeout,
} => {
- handle_poll_item(
- garage,
- &req,
- bucket_id,
- partition_key,
- sort_key,
- causality_token,
- timeout,
- )
- .await
+ handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await
}
Endpoint::ReadIndex {
prefix,
@@ -164,12 +156,12 @@ impl ApiHandler for K2VApiServer {
end,
limit,
reverse,
- } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
- Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
- Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
- Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ } => handle_read_index(ctx, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await,
Endpoint::PollRange { partition_key } => {
- handle_poll_range(garage, bucket_id, &partition_key, req).await
+ handle_poll_range(ctx, &partition_key, req).await
}
Endpoint::Options => unreachable!(),
};
@@ -178,7 +170,7 @@ impl ApiHandler for K2VApiServer {
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp_ok, rule)
+ add_cors_headers(&mut resp_ok, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index ae2778b1..02b7ae8b 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,14 +1,9 @@
-use std::sync::Arc;
-
use base64::prelude::*;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use garage_util::data::*;
-
use garage_table::{EnumerationOrder, TableSchema};
-use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@@ -18,10 +13,12 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
@@ -38,7 +35,7 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
- garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -46,8 +43,7 @@ pub async fn handle_insert_batch(
}
pub async fn handle_read_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
@@ -55,7 +51,7 @@ pub async fn handle_read_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_read_batch_query(&ctx, q)),
)
.await;
@@ -68,12 +64,15 @@ pub async fn handle_read_batch(
}
async fn handle_read_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -138,8 +137,7 @@ async fn handle_read_batch_query(
}
pub async fn handle_delete_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
@@ -147,7 +145,7 @@ pub async fn handle_delete_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_delete_batch_query(&ctx, q)),
)
.await;
@@ -160,12 +158,15 @@ pub async fn handle_delete_batch(
}
async fn handle_delete_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -195,7 +196,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
i.partition.partition_key,
i.sort_key,
Some(cc),
@@ -235,7 +236,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
- garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
n
};
@@ -251,11 +252,13 @@ async fn handle_delete_batch_query(
}
pub(crate) async fn handle_poll_range(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
partition_key: &str,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 291464ab..e3397238 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,13 +1,8 @@
-use std::sync::Arc;
-
use hyper::Response;
use serde::Serialize;
-use garage_util::data::*;
-
use garage_table::util::*;
-use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
@@ -16,14 +11,17 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
reverse: Option<bool>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let reverse = reverse.unwrap_or(false);
let node_id_vec = garage
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 0c5931a1..af3af4e4 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -1,13 +1,8 @@
-use std::sync::Arc;
-
use base64::prelude::*;
use http::header;
use hyper::{Request, Response, StatusCode};
-use garage_util::data::*;
-
-use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@@ -100,12 +95,15 @@ impl ReturnFormat {
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let format = ReturnFormat::from(req)?;
let item = garage
@@ -113,7 +111,7 @@ pub async fn handle_read_item(
.item_table
.get(
&K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: partition_key.to_string(),
},
sort_key,
@@ -125,12 +123,14 @@ pub async fn handle_read_item(
}
pub async fn handle_insert_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -149,7 +149,7 @@ pub async fn handle_insert_item(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@@ -163,12 +163,14 @@ pub async fn handle_insert_item(
}
pub async fn handle_delete_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -183,7 +185,7 @@ pub async fn handle_delete_item(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@@ -199,14 +201,16 @@ pub async fn handle_delete_item(
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let format = ReturnFormat::from(req)?;
let causal_context =
@@ -218,7 +222,7 @@ pub async fn handle_poll_item(
.k2v
.rpc
.poll_item(
- bucket_id,
+ *bucket_id,
partition_key,
sort_key,
causal_context,
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 08405923..1ed30996 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -17,8 +17,7 @@ use garage_model::key_table::Key;
use crate::generic_server::*;
use crate::s3::error::*;
-use crate::signature::payload::check_payload_signature;
-use crate::signature::streaming::*;
+use crate::signature::verify_request;
use crate::helpers::*;
use crate::s3::bucket::*;
@@ -125,17 +124,7 @@ impl ApiHandler for S3ApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
- let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
- let api_key = api_key
- .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
-
- let req = parse_streaming_body(
- &api_key,
- req,
- &mut content_sha256,
- &garage.config.s3_api.s3_region,
- "s3",
- )?;
+ let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?;
let bucket_name = match bucket_name {
None => {
@@ -166,6 +155,7 @@ impl ApiHandler for S3ApiServer {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
+ let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -178,12 +168,20 @@ impl ApiHandler for S3ApiServer {
return Err(Error::forbidden("Operation is not allowed for this key."));
}
- let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?;
+ let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned();
+
+ let ctx = ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ bucket_params,
+ api_key,
+ };
let resp = match endpoint {
Endpoint::HeadObject {
key, part_number, ..
- } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ } => handle_head(ctx, &req, &key, part_number).await,
Endpoint::GetObject {
key,
part_number,
@@ -203,74 +201,37 @@ impl ApiHandler for S3ApiServer {
response_content_type,
response_expires,
};
- handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
+ handle_get(ctx, &req, &key, part_number, overrides).await
}
Endpoint::UploadPart {
key,
part_number,
upload_id,
- } => {
- handle_put_part(
- garage,
- req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- content_sha256,
- )
- .await
- }
- Endpoint::CopyObject { key } => {
- handle_copy(garage, &api_key, &req, bucket_id, &key).await
- }
+ } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await,
+ Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
Endpoint::UploadPartCopy {
key,
part_number,
upload_id,
- } => {
- handle_upload_part_copy(
- garage,
- &api_key,
- &req,
- bucket_id,
- &key,
- part_number,
- &upload_id,
- )
- .await
- }
- Endpoint::PutObject { key } => {
- handle_put(garage, req, &bucket, &key, content_sha256).await
- }
+ } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
+ Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await,
Endpoint::AbortMultipartUpload { key, upload_id } => {
- handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ handle_abort_multipart_upload(ctx, &key, &upload_id).await
}
- Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
+ Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await,
Endpoint::CreateMultipartUpload { key } => {
- handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
+ handle_create_multipart_upload(ctx, &req, &key).await
}
Endpoint::CompleteMultipartUpload { key, upload_id } => {
- handle_complete_multipart_upload(
- garage,
- req,
- &bucket_name,
- &bucket,
- &key,
- &upload_id,
- content_sha256,
- )
- .await
+ handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
let response = Response::builder().body(empty_body()).unwrap();
Ok(response)
}
- Endpoint::DeleteBucket {} => {
- handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await
- }
- Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
+ Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await,
+ Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx),
Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
Endpoint::ListObjects {
delimiter,
@@ -279,24 +240,21 @@ impl ApiHandler for S3ApiServer {
max_keys,
prefix,
} => {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter,
- page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- is_v2: false,
- marker,
- continuation_token: None,
- start_after: None,
+ let query = ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name: ctx.bucket_name.clone(),
+ bucket_id,
+ delimiter,
+ page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
- )
- .await
+ is_v2: false,
+ marker,
+ continuation_token: None,
+ start_after: None,
+ };
+ handle_list(ctx, &query).await
}
Endpoint::ListObjectsV2 {
delimiter,
@@ -309,24 +267,21 @@ impl ApiHandler for S3ApiServer {
..
} => {
if list_type == "2" {
- handle_list(
- garage,
- &ListObjectsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter,
- page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- prefix: prefix.unwrap_or_default(),
- },
- is_v2: true,
- marker: None,
- continuation_token,
- start_after,
+ let query = ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name: ctx.bucket_name.clone(),
+ bucket_id,
+ delimiter,
+ page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
},
- )
- .await
+ is_v2: true,
+ marker: None,
+ continuation_token,
+ start_after,
+ };
+ handle_list(ctx, &query).await
} else {
Err(Error::bad_request(format!(
"Invalid endpoint: list-type={}",
@@ -342,22 +297,19 @@ impl ApiHandler for S3ApiServer {
prefix,
upload_id_marker,
} => {
- handle_list_multipart_upload(
- garage,
- &ListMultipartUploadsQuery {
- common: ListQueryCommon {
- bucket_name,
- bucket_id,
- delimiter,
- page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
- prefix: prefix.unwrap_or_default(),
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
- },
- key_marker,
- upload_id_marker,
+ let query = ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name: ctx.bucket_name.clone(),
+ bucket_id,
+ delimiter,
+ page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
- )
- .await
+ key_marker,
+ upload_id_marker,
+ };
+ handle_list_multipart_upload(ctx, &query).await
}
Endpoint::ListParts {
key,
@@ -365,39 +317,28 @@ impl ApiHandler for S3ApiServer {
part_number_marker,
upload_id,
} => {
- handle_list_parts(
- garage,
- &ListPartsQuery {
- bucket_name,
- bucket_id,
- key,
- upload_id,
- part_number_marker: part_number_marker.map(|p| p.min(10000)),
- max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
- },
- )
- .await
- }
- Endpoint::DeleteObjects {} => {
- handle_delete_objects(garage, bucket_id, req, content_sha256).await
- }
- Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
- Endpoint::PutBucketWebsite {} => {
- handle_put_website(garage, bucket.clone(), req, content_sha256).await
- }
- Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await,
- Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
- Endpoint::PutBucketCors {} => {
- handle_put_cors(garage, bucket.clone(), req, content_sha256).await
+ let query = ListPartsQuery {
+ bucket_name: ctx.bucket_name.clone(),
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.min(10000)),
+ max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
+ };
+ handle_list_parts(ctx, &query).await
}
- Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await,
- Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await,
+ Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
+ Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
+ Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
+ Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
+ Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
+ Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
Endpoint::PutBucketLifecycleConfiguration {} => {
- handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await
- }
- Endpoint::DeleteBucketLifecycle {} => {
- handle_delete_lifecycle(garage, bucket.clone()).await
+ handle_put_lifecycle(ctx, req, content_sha256).await
}
+ Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
@@ -405,7 +346,7 @@ impl ApiHandler for S3ApiServer {
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp_ok, rule)
+ add_cors_headers(&mut resp_ok, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index fa337566..6a12aa9c 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -1,5 +1,4 @@
use std::collections::HashMap;
-use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@@ -21,7 +20,8 @@ use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
-pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
+pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { garage, .. } = ctx;
let loc = s3_xml::LocationConstraint {
xmlns: (),
region: garage.config.s3_api.s3_region.to_string(),
@@ -204,21 +204,20 @@ pub async fn handle_create_bucket(
.unwrap())
}
-pub async fn handle_delete_bucket(
- garage: &Garage,
- bucket_id: Uuid,
- bucket_name: String,
- api_key_id: &String,
-) -> Result<Response<ResBody>, Error> {
+pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ bucket_params: bucket_state,
+ api_key,
+ ..
+ } = &ctx;
let helper = garage.locked_helper().await;
- let api_key = helper.key().get_existing_key(api_key_id).await?;
let key_params = api_key.params().unwrap();
- let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
-
- let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
- let bucket_state = bucket.state.as_option().unwrap();
+ let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_)));
// If the bucket has no other aliases, this is a true deletion.
// Otherwise, it is just an alias removal.
@@ -228,20 +227,20 @@ pub async fn handle_delete_bucket(
.items()
.iter()
.filter(|(_, _, active)| *active)
- .any(|(n, _, _)| is_local_alias || (*n != bucket_name));
+ .any(|(n, _, _)| is_local_alias || (*n != *bucket_name));
let has_other_local_aliases = bucket_state
.local_aliases
.items()
.iter()
.filter(|(_, _, active)| *active)
- .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id);
+ .any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id);
if !has_other_global_aliases && !has_other_local_aliases {
// Delete bucket
// Check bucket is empty
- if !helper.bucket().is_bucket_empty(bucket_id).await? {
+ if !helper.bucket().is_bucket_empty(*bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}
@@ -249,33 +248,36 @@ pub async fn handle_delete_bucket(
// 1. delete bucket alias
if is_local_alias {
helper
- .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
.await?;
} else {
helper
- .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .unset_global_bucket_alias(*bucket_id, bucket_name)
.await?;
}
// 2. delete authorization from keys that had access
- for (key_id, _) in bucket.authorized_keys() {
+ for (key_id, _) in bucket_state.authorized_keys.items() {
helper
- .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
+ let bucket = Bucket {
+ id: *bucket_id,
+ state: Deletable::delete(),
+ };
// 3. delete bucket
- bucket.state = Deletable::delete();
garage.bucket_table.insert(&bucket).await?;
} else if is_local_alias {
// Just unalias
helper
- .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
.await?;
} else {
// Just unalias (but from global namespace)
helper
- .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .unset_global_bucket_alias(*bucket_id, bucket_name)
.await?;
}
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 880ce5f4..3c2bd483 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -1,5 +1,4 @@
use std::pin::Pin;
-use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt};
@@ -15,8 +14,6 @@ use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
-use garage_model::garage::Garage;
-use garage_model::key_table::Key;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
@@ -30,15 +27,19 @@ use crate::s3::put::get_headers;
use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
- garage: Arc<Garage>,
- api_key: &Key,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- dest_bucket_id: Uuid,
dest_key: &str,
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
- let source_object = get_copy_source(&garage, api_key, req).await?;
+ let source_object = get_copy_source(&ctx, req).await?;
+
+ let ReqCtx {
+ garage,
+ bucket_id: dest_bucket_id,
+ ..
+ } = ctx;
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@@ -181,10 +182,8 @@ pub async fn handle_copy(
}
pub async fn handle_upload_part_copy(
- garage: Arc<Garage>,
- api_key: &Key,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
@@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy(
let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
- get_copy_source(&garage, api_key, req),
- multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id)
+ get_copy_source(&ctx, req),
+ multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
+ let ReqCtx { garage, .. } = ctx;
+
let (source_object_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy(
.body(string_body(resp_xml))?)
}
-async fn get_copy_source(
- garage: &Garage,
- api_key: &Key,
- req: &Request<ReqBody>,
-) -> Result<Object, Error> {
+async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
+ let ReqCtx {
+ garage, api_key, ..
+ } = ctx;
+
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index e069cae4..173b7ffe 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -21,16 +21,13 @@ use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
-use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
+use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
- if let Some(cors) = param.cors_config.get() {
+pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { bucket_params, .. } = ctx;
+ if let Some(cors) = bucket_params.cors_config.get() {
let wc = CorsConfiguration {
xmlns: (),
cors_rules: cors
@@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error
}
}
-pub async fn handle_delete_cors(
- garage: Arc<Garage>,
- mut bucket: Bucket,
-) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
- param.cors_config.update(None);
- garage.bucket_table.insert(&bucket).await?;
+pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+ bucket_params.cors_config.update(None);
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -67,28 +66,33 @@ pub async fn handle_delete_cors(
}
pub async fn handle_put_cors(
- garage: Arc<Garage>,
- mut bucket: Bucket,
+ ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
- param
+ bucket_params
.cors_config
.update(Some(conf.into_garage_cors_config()?));
- garage.bucket_table.insert(&bucket).await?;
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::OK)
@@ -115,7 +119,8 @@ pub async fn handle_options_api(
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
if let Some(id) = bucket_id {
let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
- handle_options_for_bucket(req, &bucket)
+ let bucket_params = bucket.state.into_option().unwrap();
+ handle_options_for_bucket(req, &bucket_params)
} else {
// If there is a bucket name in the request, but that name
// does not correspond to a global alias for a bucket,
@@ -145,7 +150,7 @@ pub async fn handle_options_api(
pub fn handle_options_for_bucket(
req: &Request<IncomingBody>,
- bucket: &Bucket,
+ bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
.headers()
@@ -162,7 +167,7 @@ pub fn handle_options_for_bucket(
None => vec![],
};
- if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
+ if let Some(cors_config) = bucket_params.cors_config.get() {
let matching_rule = cors_config
.iter()
.find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
@@ -181,10 +186,10 @@ pub fn handle_options_for_bucket(
}
pub fn find_matching_cors_rule<'a>(
- bucket: &'a Bucket,
+ bucket_params: &'a BucketParams,
req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, Error> {
- if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
+ if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
let origin = origin.to_str()?;
let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 3fb39147..57f6f948 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -1,11 +1,8 @@
-use std::sync::Arc;
-
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
-use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use crate::helpers::*;
@@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
-async fn handle_delete_internal(
- garage: &Garage,
- bucket_id: Uuid,
- key: &str,
-) -> Result<(Uuid, Uuid), Error> {
+async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
let object = garage
.object_table
- .get(&bucket_id, &key.to_string())
+ .get(bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?; // No need to delete
@@ -44,7 +40,7 @@ async fn handle_delete_internal(
};
let object = Object::new(
- bucket_id,
+ *bucket_id,
key.into(),
vec![ObjectVersion {
uuid: del_uuid,
@@ -58,12 +54,8 @@ async fn handle_delete_internal(
Ok((deleted_version, del_uuid))
}
-pub async fn handle_delete(
- garage: Arc<Garage>,
- bucket_id: Uuid,
- key: &str,
-) -> Result<Response<ResBody>, Error> {
- match handle_delete_internal(&garage, bucket_id, key).await {
+pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, Error> {
+ match handle_delete_internal(&ctx, key).await {
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(empty_body())
@@ -73,8 +65,7 @@ pub async fn handle_delete(
}
pub async fn handle_delete_objects(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
@@ -91,7 +82,7 @@ pub async fn handle_delete_objects(
let mut ret_errors = Vec::new();
for obj in cmd.objects.iter() {
- match handle_delete_internal(&garage, bucket_id, &obj.key).await {
+ match handle_delete_internal(&ctx, &obj.key).await {
Ok((deleted_version, delete_marker_version)) => {
if cmd.quiet {
continue;
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 0d18e775..ed996fb1 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -131,6 +131,16 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
+ ctx: ReqCtx,
+ req: &Request<impl Body>,
+ key: &str,
+ part_number: Option<u64>,
+) -> Result<Response<ResBody>, Error> {
+ handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await
+}
+
+/// Handle HEAD request for website
+pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
@@ -218,6 +228,17 @@ pub async fn handle_head(
/// Handle GET request
pub async fn handle_get(
+ ctx: ReqCtx,
+ req: &Request<impl Body>,
+ key: &str,
+ part_number: Option<u64>,
+ overrides: GetObjectOverrides,
+) -> Result<Response<ResBody>, Error> {
+ handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await
+}
+
+/// Handle GET request
+pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 35757e8c..7eb1c2cb 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -1,5 +1,4 @@
use quick_xml::de::from_reader;
-use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@@ -16,15 +15,12 @@ use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
};
-use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
+pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { bucket_params, .. } = ctx;
- if let Some(lifecycle) = param.lifecycle_config.get() {
+ if let Some(lifecycle) = bucket_params.lifecycle_config.get() {
let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
let xml = to_xml_with_header(&wc)?;
Ok(Response::builder()
@@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>,
}
}
-pub async fn handle_delete_lifecycle(
- garage: Arc<Garage>,
- mut bucket: Bucket,
-) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
- param.lifecycle_config.update(None);
- garage.bucket_table.insert(&bucket).await?;
+pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+ bucket_params.lifecycle_config.update(None);
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle(
}
pub async fn handle_put_lifecycle(
- garage: Arc<Garage>,
- mut bucket: Bucket,
+ ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
let config = conf
.validate_into_garage_lifecycle_config()
.ok_or_bad_request("Invalid lifecycle configuration")?;
- param.lifecycle_config.update(Some(config));
- garage.bucket_table.insert(&bucket).await?;
+ bucket_params.lifecycle_config.update(Some(config));
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::OK)
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index b832a4f4..302c03f4 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -1,6 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
-use std::sync::Arc;
use base64::prelude::*;
use hyper::Response;
@@ -9,7 +8,6 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
-use garage_model::garage::Garage;
use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
@@ -62,9 +60,10 @@ pub struct ListPartsQuery {
}
pub async fn handle_list(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
query: &ListObjectsQuery,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { garage, .. } = &ctx;
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -167,9 +166,11 @@ pub async fn handle_list(
}
pub async fn handle_list_multipart_upload(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
query: &ListMultipartUploadsQuery,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { garage, .. } = &ctx;
+
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload(
}
pub async fn handle_list_parts(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
query: &ListPartsQuery,
) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
- let (_, _, mpu) =
- s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?;
+ let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
let (info, next) = fetch_part_info(query, &mpu)?;
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 5959bcd6..1d5aeb26 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
-use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::mpu_table::*;
@@ -25,12 +24,16 @@ use crate::signature::verify_signed_content;
// ----
pub async fn handle_create_multipart_upload(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_name: &str,
- bucket_id: Uuid,
key: &String,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ ..
+ } = &ctx;
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload(
headers,
},
};
- let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
// Create multipart upload in mpu table
// This multipart upload will hold references to uploaded parts
// (which are entries in the Version table)
- let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false);
+ let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
garage.mpu_table.insert(&mpu).await?;
// Send success response
@@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload(
}
pub async fn handle_put_part(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { garage, .. } = &ctx;
+
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -90,10 +94,8 @@ pub async fn handle_put_part(
let stream = body_stream(req.into_body());
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
- let ((_, _, mut mpu), first_block) = futures::try_join!(
- get_upload(&garage, &bucket_id, &key, &upload_id),
- chunker.next(),
- )?;
+ let ((_, _, mut mpu), first_block) =
+ futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -135,7 +137,7 @@ pub async fn handle_put_part(
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?;
+ read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
// Verify that checksums map
ensure_checksum_matches(
@@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup {
}
pub async fn handle_complete_multipart_upload(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_name: &str,
- bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ ..
+ } = &ctx;
+
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
@@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and multipart upload
let key = key.to_string();
- let (object, mut object_version, mpu) =
- get_upload(&garage, &bucket.id, &key, &upload_id).await?;
+ let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?;
if mpu.parts.is_empty() {
return Err(Error::bad_request("No data was uploaded"));
@@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload(
let mut final_version = Version::new(
upload_id,
VersionBacklink::Object {
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.to_string(),
},
false,
@@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
- if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
+ if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
- let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
return Err(e);
@@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload(
final_version.blocks.items()[0].1.hash,
));
- let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
@@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload(
}
pub async fn handle_abort_multipart_upload(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
key: &str,
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let upload_id = decode_upload_id(upload_id)?;
- let (_, mut object_version, _) =
- get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?;
+ let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?;
object_version.state = ObjectVersionState::Aborted;
- let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
+ let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
Ok(Response::new(empty_body()))
@@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload(
#[allow(clippy::ptr_arg)]
pub(crate) async fn get_upload(
- garage: &Garage,
- bucket_id: &Uuid,
+ ctx: &ReqCtx,
key: &String,
upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
let (object, mpu) = futures::try_join!(
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index bca8d6c6..66f8174c 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -21,7 +21,7 @@ use crate::s3::cors::*;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
-use crate::signature::payload::{parse_date, verify_v4};
+use crate::signature::payload::{verify_v4, Authorization};
pub async fn handle_post_object(
garage: Arc<Garage>,
@@ -88,22 +88,11 @@ pub async fn handle_post_object(
.get("key")
.ok_or_bad_request("No key was provided")?
.to_str()?;
- let credential = params
- .get("x-amz-credential")
- .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
- .to_str()?;
let policy = params
.get("policy")
.ok_or_bad_request("No policy was provided")?
.to_str()?;
- let signature = params
- .get("x-amz-signature")
- .ok_or_bad_request("No signature was provided")?
- .to_str()?;
- let date = params
- .get("x-amz-date")
- .ok_or_bad_request("No date was provided")?
- .to_str()?;
+ let authorization = Authorization::parse_form(&params)?;
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
@@ -116,16 +105,7 @@ pub async fn handle_post_object(
key.to_owned()
};
- let date = parse_date(date)?;
- let api_key = verify_v4(
- &garage,
- "s3",
- credential,
- &date,
- signature,
- policy.as_bytes(),
- )
- .await?;
+ let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?;
let bucket_id = garage
.bucket_helper()
@@ -140,6 +120,12 @@ pub async fn handle_post_object(
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
+ let bucket_params = bucket.state.into_option().unwrap();
+ let matching_cors_rule = find_matching_cors_rule(
+ &bucket_params,
+ &Request::from_parts(head.clone(), empty_body::<Infallible>()),
+ )?
+ .cloned();
let decoded_policy = BASE64_STANDARD
.decode(policy)
@@ -233,11 +219,19 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into));
- let (_, md5) = save_stream(
+
+ let ctx = ReqCtx {
garage,
+ bucket_id,
+ bucket_name,
+ bucket_params,
+ api_key,
+ };
+
+ let (_, md5) = save_stream(
+ &ctx,
headers,
StreamLimiter::new(stream, conditions.content_length),
- &bucket,
&key,
None,
None,
@@ -254,7 +248,7 @@ pub async fn handle_post_object(
{
target
.query_pairs_mut()
- .append_pair("bucket", &bucket_name)
+ .append_pair("bucket", &ctx.bucket_name)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
@@ -298,7 +292,7 @@ pub async fn handle_post_object(
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
- bucket: s3_xml::Value(bucket_name),
+ bucket: s3_xml::Value(ctx.bucket_name),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
@@ -311,12 +305,8 @@ pub async fn handle_post_object(
}
};
- let matching_cors_rule = find_matching_cors_rule(
- &bucket,
- &Request::from_parts(head, empty_body::<Infallible>()),
- )?;
if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp, rule)
+ add_cors_headers(&mut resp, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index c1af513c..f06aa7a2 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
-use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::index_counter::CountedItem;
use garage_model::s3::block_ref_table::*;
@@ -42,9 +41,8 @@ use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub async fn handle_put(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
@@ -59,35 +57,27 @@ pub async fn handle_put(
let stream = body_stream(req.into_body());
- save_stream(
- garage,
- headers,
- stream,
- bucket,
- key,
- content_md5,
- content_sha256,
- )
- .await
- .map(|(uuid, md5)| put_response(uuid, md5))
+ save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
+ .await
+ .map(|(uuid, md5)| put_response(uuid, md5))
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
- garage: Arc<Garage>,
+ ctx: &ReqCtx,
headers: ObjectVersionHeaders,
body: S,
- bucket: &Bucket,
key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
+
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
- garage
- .object_table
- .get(&bucket.id, key)
- .map_err(Error::from),
+ garage.object_table.get(bucket_id, key).map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
@@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
+ check_quotas(ctx, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
@@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
)),
};
- let object = Object::new(bucket.id, key.into(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.into(),
version_uuid,
version_timestamp,
@@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
multipart: false,
},
};
- let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
@@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version = Version::new(
version_uuid,
VersionBacklink::Object {
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.into(),
},
false,
@@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
- read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?;
+ read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
@@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
+ check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
@@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
- let object = Object::new(bucket.id, key.into(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
// We were not interrupted, everything went fine.
@@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches(
/// Check that inserting this object with this size doesn't exceed bucket quotas
pub(crate) async fn check_quotas(
- garage: &Arc<Garage>,
- bucket: &Bucket,
+ ctx: &ReqCtx,
size: u64,
prev_object: Option<&Object>,
) -> Result<(), Error> {
- let quotas = bucket.state.as_option().unwrap().quotas.get();
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_params,
+ ..
+ } = ctx;
+
+ let quotas = bucket_params.quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
@@ -248,7 +244,7 @@ pub(crate) async fn check_quotas(
let counters = garage
.object_counter_table
.table
- .get(&bucket.id, &EmptyKey)
+ .get(bucket_id, &EmptyKey)
.await?;
let counters = counters
@@ -292,7 +288,7 @@ pub(crate) async fn check_quotas(
}
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
- garage: &Garage,
+ ctx: &ReqCtx,
version: &Version,
part_number: u64,
first_block: Bytes,
@@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let offset = written_bytes;
written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
- garage,
+ ctx,
version,
part_number,
offset,
@@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}
async fn put_block_and_meta(
- garage: &Garage,
+ ctx: &ReqCtx,
version: &Version,
part_number: u64,
offset: u64,
@@ -455,6 +451,8 @@ async fn put_block_and_meta(
block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> {
+ let ReqCtx { garage, .. } = ctx;
+
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 1c1dbf20..6af55677 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -1,5 +1,4 @@
use quick_xml::de::from_reader;
-use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::*;
-use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
- if let Some(website) = param.website_config.get() {
+pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx { bucket_params, .. } = ctx;
+ if let Some(website) = bucket_params.website_config.get() {
let wc = WebsiteConfiguration {
xmlns: (),
error_document: website.error_document.as_ref().map(|v| Key {
@@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Er
}
}
-pub async fn handle_delete_website(
- garage: Arc<Garage>,
- mut bucket: Bucket,
-) -> Result<Response<ResBody>, Error> {
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
- param.website_config.update(None);
- garage.bucket_table.insert(&bucket).await?;
+pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+ bucket_params.website_config.update(None);
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -61,28 +58,33 @@ pub async fn handle_delete_website(
}
pub async fn handle_put_website(
- garage: Arc<Garage>,
- mut bucket: Bucket,
+ ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage,
+ bucket_id,
+ mut bucket_params,
+ ..
+ } = ctx;
+
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
- let param = bucket
- .params_mut()
- .ok_or_internal_error("Bucket should not be deleted at this point")?;
-
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
- param
+ bucket_params
.website_config
.update(Some(conf.into_garage_website_config()?));
- garage.bucket_table.insert(&bucket).await?;
+ garage
+ .bucket_table
+ .insert(&Bucket::present(bucket_id, bucket_params))
+ .await?;
Ok(Response::builder()
.status(StatusCode::OK)
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
index 4b8b990f..6514da43 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/signature/mod.rs
@@ -2,19 +2,44 @@ use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use sha2::Sha256;
+use hyper::{body::Incoming as IncomingBody, Request};
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
use garage_util::data::{sha256sum, Hash};
+use error::*;
+
pub mod error;
pub mod payload;
pub mod streaming;
-use error::*;
-
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
type HmacSha256 = Hmac<Sha256>;
+pub async fn verify_request(
+ garage: &Garage,
+ mut req: Request<IncomingBody>,
+ service: &'static str,
+) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> {
+ let (api_key, mut content_sha256) =
+ payload::check_payload_signature(&garage, &mut req, service).await?;
+ let api_key =
+ api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
+
+ let req = streaming::parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ service,
+ )?;
+
+ Ok((req, api_key, content_sha256))
+}
+
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
return Err(Error::bad_request(
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 423aad93..d72736bb 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -1,7 +1,9 @@
use std::collections::HashMap;
+use std::convert::TryFrom;
use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
use hmac::Mac;
+use hyper::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, HOST};
use hyper::{body::Incoming as IncomingBody, Method, Request};
use sha2::{Digest, Sha256};
@@ -17,66 +19,98 @@ use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
use crate::signature::error::*;
+pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm");
+pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential");
+pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date");
+pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires");
+pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders");
+pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature");
+pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
+
+pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256";
+pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
+pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
+
+pub type QueryMap = HeaderMap<QueryValue>;
+pub struct QueryValue {
+ /// Original key with potential uppercase characters,
+ /// for use in signature calculation
+ key: String,
+ value: String,
+}
+
pub async fn check_payload_signature(
garage: &Garage,
+ request: &mut Request<IncomingBody>,
service: &'static str,
- request: &Request<IncomingBody>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
- let mut headers = HashMap::new();
- for (key, val) in request.headers() {
- headers.insert(key.to_string(), val.to_str()?.to_string());
- }
- if let Some(query) = request.uri().query() {
- let query_pairs = url::form_urlencoded::parse(query.as_bytes());
- for (key, val) in query_pairs {
- headers.insert(key.to_lowercase(), val.to_string());
- }
- }
-
- let authorization = if let Some(authorization) = headers.get("authorization") {
- parse_authorization(authorization, &headers)?
- } else if let Some(algorithm) = headers.get("x-amz-algorithm") {
- parse_query_authorization(algorithm, &headers)?
+ let query = parse_query_map(request.uri())?;
+
+ if query.contains_key(&X_AMZ_ALGORITHM) {
+ // We check for presigned-URL-style authentification first, because
+ // the browser or someting else could inject an Authorization header
+ // that is totally unrelated to AWS signatures.
+ check_presigned_signature(garage, service, request, query).await
+ } else if request.headers().contains_key(AUTHORIZATION) {
+ check_standard_signature(garage, service, request, query).await
} else {
- let content_sha256 = headers.get("x-amz-content-sha256");
- if let Some(content_sha256) = content_sha256.filter(|c| "UNSIGNED-PAYLOAD" != c.as_str()) {
+ // Unsigned (anonymous) request
+ let content_sha256 = request
+ .headers()
+ .get("x-amz-content-sha256")
+ .filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes());
+ if let Some(content_sha256) = content_sha256 {
let sha256 = hex::decode(content_sha256)
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
- return Ok((None, Some(sha256)));
+ Ok((None, Some(sha256)))
} else {
- return Ok((None, None));
+ Ok((None, None))
}
- };
+ }
+}
+
+async fn check_standard_signature(
+ garage: &Garage,
+ service: &'static str,
+ request: &Request<IncomingBody>,
+ query: QueryMap,
+) -> Result<(Option<Key>, Option<Hash>), Error> {
+ let authorization = Authorization::parse_header(request.headers())?;
+
+ // Verify that all necessary request headers are included in signed_headers
+ // The following must be included for all signatures:
+ // - the Host header (mandatory)
+ // - all x-amz-* headers used in the request
+ // AWS also indicates that the Content-Type header should be signed if
+ // it is used, but Minio client doesn't sign it so we don't check it for compatibility.
+ let signed_headers = split_signed_headers(&authorization)?;
+ verify_signed_headers(request.headers(), &signed_headers)?;
let canonical_request = canonical_request(
service,
request.method(),
- request.uri(),
- &headers,
- &authorization.signed_headers,
+ request.uri().path(),
+ &query,
+ request.headers(),
+ &signed_headers,
&authorization.content_sha256,
+ )?;
+ let string_to_sign = string_to_sign(
+ &authorization.date,
+ &authorization.scope,
+ &canonical_request,
);
- let (_, scope) = parse_credential(&authorization.credential)?;
- let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request);
trace!("canonical request:\n{}", canonical_request);
trace!("string to sign:\n{}", string_to_sign);
- let key = verify_v4(
- garage,
- service,
- &authorization.credential,
- &authorization.date,
- &authorization.signature,
- string_to_sign.as_bytes(),
- )
- .await?;
+ let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
- let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
+ let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD {
None
- } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
+ } else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD {
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
@@ -88,124 +122,102 @@ pub async fn check_payload_signature(
Ok((Some(key), content_sha256))
}
-struct Authorization {
- credential: String,
- signed_headers: String,
- signature: String,
- content_sha256: String,
- date: DateTime<Utc>,
-}
-
-fn parse_authorization(
- authorization: &str,
- headers: &HashMap<String, String>,
-) -> Result<Authorization, Error> {
- let first_space = authorization
- .find(' ')
- .ok_or_bad_request("Authorization field to short")?;
- let (auth_kind, rest) = authorization.split_at(first_space);
-
- if auth_kind != "AWS4-HMAC-SHA256" {
- return Err(Error::bad_request("Unsupported authorization method"));
- }
-
- let mut auth_params = HashMap::new();
- for auth_part in rest.split(',') {
- let auth_part = auth_part.trim();
- let eq = auth_part
- .find('=')
- .ok_or_bad_request("Field without value in authorization header")?;
- let (key, value) = auth_part.split_at(eq);
- auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string());
- }
-
- let cred = auth_params
- .get("Credential")
- .ok_or_bad_request("Could not find Credential in Authorization field")?;
-
- let content_sha256 = headers
- .get("x-amz-content-sha256")
- .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
-
- let date = headers
- .get("x-amz-date")
- .ok_or_bad_request("Missing X-Amz-Date field")
- .map_err(Error::from)
- .and_then(|d| parse_date(d))?;
+async fn check_presigned_signature(
+ garage: &Garage,
+ service: &'static str,
+ request: &mut Request<IncomingBody>,
+ mut query: QueryMap,
+) -> Result<(Option<Key>, Option<Hash>), Error> {
+ let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap();
+ let authorization = Authorization::parse_presigned(&algorithm.value, &query)?;
+
+ // Verify that all necessary request headers are included in signed_headers
+ // For AWSv4 pre-signed URLs, the following must be incldued:
+ // - the Host header (mandatory)
+ // - all x-amz-* headers used in the request
+ let signed_headers = split_signed_headers(&authorization)?;
+ verify_signed_headers(request.headers(), &signed_headers)?;
+
+ // The X-Amz-Signature value is passed as a query parameter,
+ // but the signature cannot be computed from a string that contains itself.
+ // AWS specifies that all query params except X-Amz-Signature are included
+ // in the canonical request.
+ query.remove(&X_AMZ_SIGNATURE);
+ let canonical_request = canonical_request(
+ service,
+ request.method(),
+ request.uri().path(),
+ &query,
+ request.headers(),
+ &signed_headers,
+ &authorization.content_sha256,
+ )?;
+ let string_to_sign = string_to_sign(
+ &authorization.date,
+ &authorization.scope,
+ &canonical_request,
+ );
- if Utc::now() - date > Duration::hours(24) {
- return Err(Error::bad_request("Date is too old".to_string()));
+ trace!("canonical request (presigned url):\n{}", canonical_request);
+ trace!("string to sign (presigned url):\n{}", string_to_sign);
+
+ let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
+
+ // In the page on presigned URLs, AWS specifies that if a signed query
+ // parameter and a signed header of the same name have different values,
+ // then an InvalidRequest error is raised.
+ let headers_mut = request.headers_mut();
+ for (name, value) in query.iter() {
+ if let Some(existing) = headers_mut.get(name) {
+ if signed_headers.contains(&name) && existing.as_bytes() != value.value.as_bytes() {
+ return Err(Error::bad_request(format!(
+ "Conflicting values for `{}` in query parameters and request headers",
+ name
+ )));
+ }
+ }
+ if name.as_str().starts_with("x-amz-") {
+ // Query parameters that start by x-amz- are actually intended to stand in for
+ // headers that can't be added at the time the request is made.
+ // What we do is just add them to the Request object as regular headers,
+ // that will be handled downstream as if they were included like in a normal request.
+ // (Here we allow such query parameters to override headers with the same name
+ // that are not signed, however there is not much reason that this would happen)
+ headers_mut.insert(
+ name,
+ HeaderValue::from_bytes(value.value.as_bytes())
+ .ok_or_bad_request("invalid query parameter value")?,
+ );
+ }
}
- let auth = Authorization {
- credential: cred.to_string(),
- signed_headers: auth_params
- .get("SignedHeaders")
- .ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
- .to_string(),
- signature: auth_params
- .get("Signature")
- .ok_or_bad_request("Could not find Signature in Authorization field")?
- .to_string(),
- content_sha256: content_sha256.to_string(),
- date,
- };
- Ok(auth)
+ // Presigned URLs always use UNSIGNED-PAYLOAD,
+ // so there is no sha256 hash to return.
+ Ok((Some(key), None))
}
-fn parse_query_authorization(
- algorithm: &str,
- headers: &HashMap<String, String>,
-) -> Result<Authorization, Error> {
- if algorithm != "AWS4-HMAC-SHA256" {
- return Err(Error::bad_request(
- "Unsupported authorization method".to_string(),
- ));
- }
-
- let cred = headers
- .get("x-amz-credential")
- .ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
- let signed_headers = headers
- .get("x-amz-signedheaders")
- .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
- let signature = headers
- .get("x-amz-signature")
- .ok_or_bad_request("X-Amz-Signature not found in query parameters")?;
- let content_sha256 = headers
- .get("x-amz-content-sha256")
- .map(|x| x.as_str())
- .unwrap_or("UNSIGNED-PAYLOAD");
-
- let duration = headers
- .get("x-amz-expires")
- .ok_or_bad_request("X-Amz-Expires not found in query parameters")?
- .parse()
- .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
-
- if duration > 7 * 24 * 3600 {
- return Err(Error::bad_request(
- "X-Amz-Expires may not exceed a week".to_string(),
- ));
- }
-
- let date = headers
- .get("x-amz-date")
- .ok_or_bad_request("Missing X-Amz-Date field")
- .map_err(Error::from)
- .and_then(|d| parse_date(d))?;
-
- if Utc::now() - date > Duration::seconds(duration) {
- return Err(Error::bad_request("Date is too old".to_string()));
+pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> {
+ let mut query = QueryMap::with_capacity(0);
+ if let Some(query_str) = uri.query() {
+ let query_pairs = url::form_urlencoded::parse(query_str.as_bytes());
+ for (key, val) in query_pairs {
+ let name =
+ HeaderName::from_bytes(key.as_bytes()).ok_or_bad_request("Invalid header name")?;
+
+ let value = QueryValue {
+ key: key.to_string(),
+ value: val.into_owned(),
+ };
+
+ if query.insert(name, value).is_some() {
+ return Err(Error::bad_request(format!(
+ "duplicate query parameter: `{}`",
+ key
+ )));
+ }
+ }
}
-
- Ok(Authorization {
- credential: cred.to_string(),
- signed_headers: signed_headers.to_string(),
- signature: signature.to_string(),
- content_sha256: content_sha256.to_string(),
- date,
- })
+ Ok(query)
}
fn parse_credential(cred: &str) -> Result<(String, String), Error> {
@@ -219,11 +231,39 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> {
))
}
+fn split_signed_headers(authorization: &Authorization) -> Result<Vec<HeaderName>, Error> {
+ let mut signed_headers = authorization
+ .signed_headers
+ .split(';')
+ .map(HeaderName::try_from)
+ .collect::<Result<Vec<HeaderName>, _>>()
+ .ok_or_bad_request("invalid header name")?;
+ signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str()));
+ Ok(signed_headers)
+}
+
+fn verify_signed_headers(headers: &HeaderMap, signed_headers: &[HeaderName]) -> Result<(), Error> {
+ if !signed_headers.contains(&HOST) {
+ return Err(Error::bad_request("Header `Host` should be signed"));
+ }
+ for (name, _) in headers.iter() {
+ if name.as_str().starts_with("x-amz-") {
+ if !signed_headers.contains(name) {
+ return Err(Error::bad_request(format!(
+ "Header `{}` should be signed",
+ name
+ )));
+ }
+ }
+ }
+ Ok(())
+}
+
pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String {
let mut hasher = Sha256::default();
hasher.update(canonical_req.as_bytes());
[
- "AWS4-HMAC-SHA256",
+ AWS4_HMAC_SHA256,
&datetime.format(LONG_DATETIME).to_string(),
scope_string,
&hex::encode(hasher.finalize().as_slice()),
@@ -234,11 +274,12 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re
pub fn canonical_request(
service: &'static str,
method: &Method,
- uri: &hyper::Uri,
- headers: &HashMap<String, String>,
- signed_headers: &str,
+ canonical_uri: &str,
+ query: &QueryMap,
+ headers: &HeaderMap,
+ signed_headers: &[HeaderName],
content_sha256: &str,
-) -> String {
+) -> Result<String, Error> {
// There seems to be evidence that in AWSv4 signatures, the path component is url-encoded
// a second time when building the canonical request, as specified in this documentation page:
// -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html
@@ -268,49 +309,46 @@ pub fn canonical_request(
// it mentions it in the comments (same link to the souce code as above).
// We make the explicit choice of NOT normalizing paths in the K2V API because doing so
// would make non-normalized paths invalid K2V partition keys, and we don't want that.
- let path: std::borrow::Cow<str> = if service != "s3" {
- uri_encode(uri.path(), false).into()
+ let canonical_uri: std::borrow::Cow<str> = if service != "s3" {
+ uri_encode(canonical_uri, false).into()
} else {
- uri.path().into()
+ canonical_uri.into()
};
- [
- method.as_str(),
- &path,
- &canonical_query_string(uri),
- &canonical_header_string(headers, signed_headers),
- "",
- signed_headers,
- content_sha256,
- ]
- .join("\n")
-}
-fn canonical_header_string(headers: &HashMap<String, String>, signed_headers: &str) -> String {
- let signed_headers_vec = signed_headers.split(';').collect::<Vec<_>>();
- let mut items = headers
- .iter()
- .filter(|(key, _)| signed_headers_vec.contains(&key.as_str()))
- .collect::<Vec<_>>();
- items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
- items
- .iter()
- .map(|(key, value)| key.to_lowercase() + ":" + value.trim())
- .collect::<Vec<_>>()
- .join("\n")
-}
-
-fn canonical_query_string(uri: &hyper::Uri) -> String {
- if let Some(query) = uri.query() {
- let query_pairs = url::form_urlencoded::parse(query.as_bytes());
- let mut items = query_pairs
- .filter(|(key, _)| key != "X-Amz-Signature")
- .map(|(key, value)| uri_encode(&key, true) + "=" + &uri_encode(&value, true))
- .collect::<Vec<_>>();
+ // Canonical query string from passed HeaderMap
+ let canonical_query_string = {
+ let mut items = Vec::with_capacity(query.len());
+ for (_, QueryValue { key, value }) in query.iter() {
+ items.push(uri_encode(&key, true) + "=" + &uri_encode(&value, true));
+ }
items.sort();
items.join("&")
- } else {
- "".to_string()
- }
+ };
+
+ // Canonical header string calculated from signed headers
+ let canonical_header_string = signed_headers
+ .iter()
+ .map(|name| {
+ let value = headers
+ .get(name)
+ .ok_or_bad_request(format!("signed header `{}` is not present", name))?
+ .to_str()?;
+ Ok(format!("{}:{}", name.as_str(), value.trim()))
+ })
+ .collect::<Result<Vec<String>, Error>>()?
+ .join("\n");
+ let signed_headers = signed_headers.join(";");
+
+ let list = [
+ method.as_str(),
+ &canonical_uri,
+ &canonical_query_string,
+ &canonical_header_string,
+ "",
+ &signed_headers,
+ content_sha256,
+ ];
+ Ok(list.join("\n"))
}
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
@@ -322,38 +360,203 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
pub async fn verify_v4(
garage: &Garage,
service: &str,
- credential: &str,
- date: &DateTime<Utc>,
- signature: &str,
+ auth: &Authorization,
payload: &[u8],
) -> Result<Key, Error> {
- let (key_id, scope) = parse_credential(credential)?;
-
- let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service);
- if scope != scope_expected {
- return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
+ let scope_expected = compute_scope(&auth.date, &garage.config.s3_api.s3_region, service);
+ if auth.scope != scope_expected {
+ return Err(Error::AuthorizationHeaderMalformed(auth.scope.to_string()));
}
let key = garage
.key_table
- .get(&EmptyKey, &key_id)
+ .get(&EmptyKey, &auth.key_id)
.await?
.filter(|k| !k.state.is_deleted())
- .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?;
+ .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
- date,
+ &auth.date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
service,
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
- let our_signature = hex::encode(hmac.finalize().into_bytes());
- if signature != our_signature {
- return Err(Error::forbidden("Invalid signature".to_string()));
+ let signature =
+ hex::decode(&auth.signature).map_err(|_| Error::forbidden("Invalid signature"))?;
+ if hmac.verify_slice(&signature).is_err() {
+ return Err(Error::forbidden("Invalid signature"));
}
Ok(key)
}
+
+// ============ Authorization header, or X-Amz-* query params =========
+
+pub struct Authorization {
+ key_id: String,
+ scope: String,
+ signed_headers: String,
+ signature: String,
+ content_sha256: String,
+ date: DateTime<Utc>,
+}
+
+impl Authorization {
+ fn parse_header(headers: &HeaderMap) -> Result<Self, Error> {
+ let authorization = headers
+ .get(AUTHORIZATION)
+ .ok_or_bad_request("Missing authorization header")?
+ .to_str()?;
+
+ let (auth_kind, rest) = authorization
+ .split_once(' ')
+ .ok_or_bad_request("Authorization field to short")?;
+
+ if auth_kind != AWS4_HMAC_SHA256 {
+ return Err(Error::bad_request("Unsupported authorization method"));
+ }
+
+ let mut auth_params = HashMap::new();
+ for auth_part in rest.split(',') {
+ let auth_part = auth_part.trim();
+ let eq = auth_part
+ .find('=')
+ .ok_or_bad_request("Field without value in authorization header")?;
+ let (key, value) = auth_part.split_at(eq);
+ auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string());
+ }
+
+ let cred = auth_params
+ .get("Credential")
+ .ok_or_bad_request("Could not find Credential in Authorization field")?;
+ let signed_headers = auth_params
+ .get("SignedHeaders")
+ .ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
+ .to_string();
+ let signature = auth_params
+ .get("Signature")
+ .ok_or_bad_request("Could not find Signature in Authorization field")?
+ .to_string();
+
+ let content_sha256 = headers
+ .get(X_AMZ_CONTENT_SH256)
+ .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
+
+ let date = headers
+ .get(X_AMZ_DATE)
+ .ok_or_bad_request("Missing X-Amz-Date field")
+ .map_err(Error::from)?
+ .to_str()?;
+ let date = parse_date(date)?;
+
+ if Utc::now() - date > Duration::hours(24) {
+ return Err(Error::bad_request("Date is too old".to_string()));
+ }
+
+ let (key_id, scope) = parse_credential(cred)?;
+ let auth = Authorization {
+ key_id,
+ scope,
+ signed_headers,
+ signature,
+ content_sha256: content_sha256.to_str()?.to_string(),
+ date,
+ };
+ Ok(auth)
+ }
+
+ fn parse_presigned(algorithm: &str, query: &QueryMap) -> Result<Self, Error> {
+ if algorithm != AWS4_HMAC_SHA256 {
+ return Err(Error::bad_request(
+ "Unsupported authorization method".to_string(),
+ ));
+ }
+
+ let cred = query
+ .get(&X_AMZ_CREDENTIAL)
+ .ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
+ let signed_headers = query
+ .get(&X_AMZ_SIGNEDHEADERS)
+ .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
+ let signature = query
+ .get(&X_AMZ_SIGNATURE)
+ .ok_or_bad_request("X-Amz-Signature not found in query parameters")?;
+
+ let duration = query
+ .get(&X_AMZ_EXPIRES)
+ .ok_or_bad_request("X-Amz-Expires not found in query parameters")?
+ .value
+ .parse()
+ .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
+
+ if duration > 7 * 24 * 3600 {
+ return Err(Error::bad_request(
+ "X-Amz-Expires may not exceed a week".to_string(),
+ ));
+ }
+
+ let date = query
+ .get(&X_AMZ_DATE)
+ .ok_or_bad_request("Missing X-Amz-Date field")?;
+ let date = parse_date(&date.value)?;
+
+ if Utc::now() - date > Duration::seconds(duration) {
+ return Err(Error::bad_request("Date is too old".to_string()));
+ }
+
+ let (key_id, scope) = parse_credential(&cred.value)?;
+ Ok(Authorization {
+ key_id,
+ scope,
+ signed_headers: signed_headers.value.clone(),
+ signature: signature.value.clone(),
+ content_sha256: UNSIGNED_PAYLOAD.to_string(),
+ date,
+ })
+ }
+
+ pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
+ let algorithm = params
+ .get(X_AMZ_ALGORITHM)
+ .ok_or_bad_request("Missing X-Amz-Algorithm header")?
+ .to_str()?;
+ if algorithm != AWS4_HMAC_SHA256 {
+ return Err(Error::bad_request(
+ "Unsupported authorization method".to_string(),
+ ));
+ }
+
+ let credential = params
+ .get(X_AMZ_CREDENTIAL)
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
+ .to_str()?;
+ let signature = params
+ .get(X_AMZ_SIGNATURE)
+ .ok_or_bad_request("No signature was provided")?
+ .to_str()?
+ .to_string();
+ let date = params
+ .get(X_AMZ_DATE)
+ .ok_or_bad_request("No date was provided")?
+ .to_str()?;
+ let date = parse_date(date)?;
+
+ if Utc::now() - date > Duration::hours(24) {
+ return Err(Error::bad_request("Date is too old".to_string()));
+ }
+
+ let (key_id, scope) = parse_credential(credential)?;
+ let auth = Authorization {
+ key_id,
+ scope,
+ signed_headers: "".to_string(),
+ signature,
+ content_sha256: UNSIGNED_PAYLOAD.to_string(),
+ date,
+ };
+ Ok(auth)
+ }
+}
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index a2a71f6b..e223d1b1 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -15,6 +15,11 @@ use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
use crate::helpers::*;
use crate::signature::error::*;
+use crate::signature::payload::{
+ STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE,
+};
+
+pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
pub type ReqBody = BoxBody<Error>;
@@ -25,8 +30,8 @@ pub fn parse_streaming_body(
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
- match req.headers().get("x-amz-content-sha256") {
- Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
+ match req.headers().get(X_AMZ_CONTENT_SH256) {
+ Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
@@ -39,7 +44,7 @@ pub fn parse_streaming_body(
let date = req
.headers()
- .get("x-amz-date")
+ .get(X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
@@ -75,7 +80,7 @@ fn compute_streaming_payload_signature(
content_sha256: Hash,
) -> Result<Hash, Error> {
let string_to_sign = [
- "AWS4-HMAC-SHA256-PAYLOAD",
+ AWS4_HMAC_SHA256_PAYLOAD,
&date.format(LONG_DATETIME).to_string(),
scope,
&hex::encode(previous_signature),
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index e5f4cca1..8e1eaa56 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -1,12 +1,15 @@
#![allow(dead_code)]
use std::collections::HashMap;
-use std::convert::TryFrom;
+use std::convert::{TryFrom, TryInto};
use chrono::{offset::Utc, DateTime};
use hmac::{Hmac, Mac};
use http_body_util::BodyExt;
use http_body_util::Full as FullBody;
+use hyper::header::{
+ HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LENGTH, HOST,
+};
use hyper::{Method, Request, Response, Uri};
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
@@ -61,6 +64,10 @@ impl CustomRequester {
vhost_style: false,
}
}
+
+ pub fn client(&self) -> &Client<HttpConnector, Body> {
+ &self.client
+ }
}
pub struct RequestBuilder<'a> {
@@ -173,54 +180,85 @@ impl<'a> RequestBuilder<'a> {
.unwrap();
let streaming_signer = signer.clone();
- let mut all_headers = self.signed_headers.clone();
+ let mut all_headers = self
+ .signed_headers
+ .iter()
+ .map(|(k, v)| {
+ (
+ HeaderName::try_from(k).expect("invalid header name"),
+ HeaderValue::try_from(v).expect("invalid header value"),
+ )
+ })
+ .collect::<HeaderMap>();
let date = now.format(signature::LONG_DATETIME).to_string();
- all_headers.insert("x-amz-date".to_owned(), date);
- all_headers.insert("host".to_owned(), host);
+ all_headers.insert(
+ signature::payload::X_AMZ_DATE,
+ HeaderValue::from_str(&date).unwrap(),
+ );
+ all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
let body_sha = match self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
BodySignature::Streaming(size) => {
- all_headers.insert("content-encoding".to_owned(), "aws-chunked".to_owned());
all_headers.insert(
- "x-amz-decoded-content-length".to_owned(),
- self.body.len().to_string(),
+ CONTENT_ENCODING,
+ HeaderValue::from_str("aws-chunked").unwrap(),
+ );
+ all_headers.insert(
+ HeaderName::from_static("x-amz-decoded-content-length"),
+ HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
);
// Get lenght of body by doing the conversion to a streaming body with an
// invalid signature (we don't know the seed) just to get its length. This
// is a pretty lazy and inefficient way to do it, but it's enought for test
// code.
all_headers.insert(
- "content-length".to_owned(),
+ CONTENT_LENGTH,
to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
.len()
- .to_string(),
+ .to_string()
+ .try_into()
+ .unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
};
- all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone());
+ all_headers.insert(
+ signature::payload::X_AMZ_CONTENT_SH256,
+ HeaderValue::from_str(&body_sha).unwrap(),
+ );
- let mut signed_headers = all_headers
- .keys()
- .map(|k| k.as_ref())
- .collect::<Vec<&str>>();
- signed_headers.sort();
- let signed_headers = signed_headers.join(";");
+ let mut signed_headers = all_headers.keys().cloned().collect::<Vec<_>>();
+ signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str()));
+ let signed_headers_str = signed_headers
+ .iter()
+ .map(ToString::to_string)
+ .collect::<Vec<_>>()
+ .join(";");
- all_headers.extend(self.unsigned_headers.clone());
+ all_headers.extend(self.unsigned_headers.iter().map(|(k, v)| {
+ (
+ HeaderName::try_from(k).expect("invalid header name"),
+ HeaderValue::try_from(v).expect("invalid header value"),
+ )
+ }));
+
+ let uri = Uri::try_from(&uri).unwrap();
+ let query = signature::payload::parse_query_map(&uri).unwrap();
let canonical_request = signature::payload::canonical_request(
self.service,
&self.method,
- &Uri::try_from(&uri).unwrap(),
+ uri.path(),
+ &query,
&all_headers,
&signed_headers,
&body_sha,
- );
+ )
+ .unwrap();
let string_to_sign = signature::payload::string_to_sign(&now, &scope, &canonical_request);
@@ -228,14 +266,15 @@ impl<'a> RequestBuilder<'a> {
let signature = hex::encode(signer.finalize().into_bytes());
let authorization = format!(
"AWS4-HMAC-SHA256 Credential={}/{},SignedHeaders={},Signature={}",
- self.requester.key.id, scope, signed_headers, signature
+ self.requester.key.id, scope, signed_headers_str, signature
+ );
+ all_headers.insert(
+ AUTHORIZATION,
+ HeaderValue::from_str(&authorization).unwrap(),
);
- all_headers.insert("authorization".to_owned(), authorization);
let mut request = Request::builder();
- for (k, v) in all_headers {
- request = request.header(k, v);
- }
+ *request.headers_mut().unwrap() = all_headers;
let body = if let BodySignature::Streaming(size) = self.body_signature {
to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs
index 623eb665..4ebc4914 100644
--- a/src/garage/tests/s3/mod.rs
+++ b/src/garage/tests/s3/mod.rs
@@ -1,6 +1,7 @@
mod list;
mod multipart;
mod objects;
+mod presigned;
mod simple;
mod streaming_signature;
mod website;
diff --git a/src/garage/tests/s3/presigned.rs b/src/garage/tests/s3/presigned.rs
new file mode 100644
index 00000000..15270361
--- /dev/null
+++ b/src/garage/tests/s3/presigned.rs
@@ -0,0 +1,72 @@
+use std::time::{Duration, SystemTime};
+
+use crate::common;
+use aws_sdk_s3::presigning::PresigningConfig;
+use bytes::Bytes;
+use http_body_util::{BodyExt, Full};
+use hyper::Request;
+
+const STD_KEY: &str = "hello world";
+const BODY: &[u8; 62] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+#[tokio::test]
+async fn test_presigned_url() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("presigned");
+
+ let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
+ let body = Bytes::from(BODY.to_vec());
+
+ let psc = PresigningConfig::builder()
+ .start_time(SystemTime::now() - Duration::from_secs(60))
+ .expires_in(Duration::from_secs(3600))
+ .build()
+ .unwrap();
+
+ {
+ // PutObject
+ let req = ctx
+ .client
+ .put_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .presigned(psc.clone())
+ .await
+ .unwrap();
+
+ let client = ctx.custom_request.client();
+ let req = Request::builder()
+ .method("PUT")
+ .uri(req.uri())
+ .body(Full::new(body.clone()))
+ .unwrap();
+ let res = client.request(req).await.unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(res.headers().get("etag").unwrap(), etag);
+ }
+
+ {
+ // GetObject
+ let req = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .presigned(psc)
+ .await
+ .unwrap();
+
+ let client = ctx.custom_request.client();
+ let req = Request::builder()
+ .method("GET")
+ .uri(req.uri())
+ .body(Full::new(Bytes::new()))
+ .unwrap();
+ let res = client.request(req).await.unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(res.headers().get("etag").unwrap(), etag);
+
+ let body2 = BodyExt::collect(res.into_body()).await.unwrap().to_bytes();
+ assert_eq!(body, body2);
+ }
+}
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 224b9ed5..351aa422 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -26,7 +26,7 @@ async fn test_putobject_streaming() {
.builder(bucket.clone())
.method(Method::PUT)
.path(STD_KEY.to_owned())
- .unsigned_headers(headers)
+ .signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::Streaming(10))
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 4c48a76f..1dbdfac2 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -191,6 +191,13 @@ impl Bucket {
}
}
+ pub fn present(id: Uuid, params: BucketParams) -> Self {
+ Bucket {
+ id,
+ state: crdt::Deletable::present(params),
+ }
+ }
+
/// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
self.state.is_deleted()
diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml
index df81c437..4bd0d2e5 100644
--- a/src/net/Cargo.toml
+++ b/src/net/Cargo.toml
@@ -3,7 +3,7 @@ name = "garage_net"
version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
-license-file = "AGPL-3.0"
+license = "AGPL-3.0"
description = "Networking library for Garage RPC communication, forked from Netapp"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 0f9b5dc8..69939f65 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -26,7 +26,7 @@ use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_opt
use garage_api::s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
};
-use garage_api::s3::get::{handle_get, handle_head};
+use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::garage::Garage;
@@ -219,14 +219,13 @@ impl WebServer {
// Check bucket isn't deleted and has website access enabled
let bucket = self
.garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NotFound)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await
+ .map_err(|_| Error::NotFound)?;
+ let bucket_params = bucket.state.into_option().unwrap();
- let website_config = bucket
- .params()
- .ok_or(Error::NotFound)?
+ let website_config = bucket_params
.website_config
.get()
.as_ref()
@@ -243,14 +242,16 @@ impl WebServer {
);
let ret_doc = match *req.method() {
- Method::OPTIONS => handle_options_for_bucket(req, &bucket)
+ Method::OPTIONS => handle_options_for_bucket(req, &bucket_params)
.map_err(ApiError::from)
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
- Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
+ Method::HEAD => {
+ handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await
+ }
Method::GET => {
- handle_get(
+ handle_get_without_ctx(
self.garage.clone(),
- &req,
+ req,
bucket_id,
&key,
None,
@@ -301,7 +302,7 @@ impl WebServer {
.body(empty_body::<Infallible>())
.unwrap();
- match handle_get(
+ match handle_get_without_ctx(
self.garage.clone(),
&req2,
bucket_id,
@@ -344,7 +345,7 @@ impl WebServer {
}
Ok(mut resp) => {
// Maybe add CORS headers
- if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
+ if let Some(rule) = find_matching_cors_rule(&bucket_params, req)? {
add_cors_headers(&mut resp, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}