diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-04 15:56:10 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-04 15:56:10 +0100 |
commit | bbde9bc91225ac41ea6e8def61c5b7044bb186a0 (patch) | |
tree | 6e2bb951b1efb104c61d6e56aae84d7a6b036342 /src | |
parent | d0d95fd53f3d4a6fd5adcfbb4cbb031826fd64a4 (diff) | |
parent | 3168bb34a0082480660e945f7390a5ecab26c665 (diff) | |
download | garage-bbde9bc91225ac41ea6e8def61c5b7044bb186a0.tar.gz garage-bbde9bc91225ac41ea6e8def61c5b7044bb186a0.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
29 files changed, 1090 insertions, 737 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 3b555b8b..9b215333 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -21,6 +21,7 @@ garage_net.workspace = true garage_util.workspace = true garage_rpc.workspace = true +argon2.workspace = true async-trait.workspace = true base64.workspace = true bytes.workspace = true diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 2b9be24e..0e4565bb 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; +use argon2::password_hash::PasswordHash; use async_trait::async_trait; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; @@ -45,14 +46,8 @@ impl AdminApiServer { #[cfg(feature = "metrics")] exporter: PrometheusExporter, ) -> Self { let cfg = &garage.config.admin; - let metrics_token = cfg - .metrics_token - .as_ref() - .map(|tok| format!("Bearer {}", tok)); - let admin_token = cfg - .admin_token - .as_ref() - .map(|tok| format!("Bearer {}", tok)); + let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token); + let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token); Self { garage, #[cfg(feature = "metrics")] @@ -248,11 +243,11 @@ impl ApiHandler for AdminApiServer { req: Request<IncomingBody>, endpoint: Endpoint, ) -> Result<Response<ResBody>, Error> { - let expected_auth_header = + let required_auth_hash = match endpoint.authorization_type() { Authorization::None => None, - Authorization::MetricsToken => self.metrics_token.as_ref(), - Authorization::AdminToken => match &self.admin_token { + Authorization::MetricsToken => self.metrics_token.as_deref(), + Authorization::AdminToken => match self.admin_token.as_deref() { None => return Err(Error::forbidden( "Admin token isn't configured, admin API access is disabled for security.", )), @@ -260,14 +255,11 @@ impl ApiHandler for AdminApiServer { }, }; - if let Some(h) = expected_auth_header { + if let Some(password_hash) = required_auth_hash { match req.headers().get("Authorization") { None => return Err(Error::forbidden("Authorization token must be provided")), - Some(v) => { - let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false); - if !authorized { - return Err(Error::forbidden("Invalid authorization token provided")); - } + Some(authorization) => { + verify_bearer_token(&authorization, password_hash)?; } } } @@ -342,3 +334,35 @@ impl ApiEndpoint for Endpoint { fn add_span_attributes(&self, _span: SpanRef<'_>) {} } + +fn hash_bearer_token(token: &str) -> String { + use argon2::{ + password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, + Argon2, + }; + + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + argon2 + .hash_password(token.trim().as_bytes(), &salt) + .expect("could not hash API token") + .to_string() +} + +fn verify_bearer_token(token: &hyper::http::HeaderValue, password_hash: &str) -> Result<(), Error> { + use argon2::{password_hash::PasswordVerifier, Argon2}; + + let parsed_hash = PasswordHash::new(&password_hash).unwrap(); + + token + .to_str()? + .strip_prefix("Bearer ") + .and_then(|token| { + Argon2::default() + .verify_password(token.trim().as_bytes(), &parsed_hash) + .ok() + }) + .ok_or_else(|| Error::forbidden("Invalid authorization token"))?; + + Ok(()) +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 5f488912..cf60005d 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,5 @@ use std::convert::Infallible; +use std::sync::Arc; use futures::{Stream, StreamExt, TryStreamExt}; @@ -10,6 +11,10 @@ use hyper::{ use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; +use garage_model::bucket_table::BucketParams; +use garage_model::garage::Garage; +use garage_model::key_table::Key; +use garage_util::data::Uuid; use garage_util::error::Error as GarageError; use crate::common_error::{CommonError as Error, *}; @@ -27,6 +32,15 @@ pub enum Authorization { Owner, } +/// The values which are known for each request related to a bucket +pub struct ReqCtx { + pub garage: Arc<Garage>, + pub bucket_id: Uuid, + pub bucket_name: String, + pub bucket_params: BucketParams, + pub api_key: Key, +} + /// Host to bucket /// /// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket", diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index e97da2af..658cfcc8 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -15,8 +15,7 @@ use garage_model::garage::Garage; use crate::generic_server::*; use crate::k2v::error::*; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::*; +use crate::signature::verify_request; use crate::helpers::*; use crate::k2v::batch::*; @@ -86,17 +85,7 @@ impl ApiHandler for K2VApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; - let api_key = api_key - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = parse_streaming_body( - &api_key, - req, - &mut content_sha256, - &garage.config.s3_api.s3_region, - "k2v", - )?; + let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?; let bucket_id = garage .bucket_helper() @@ -106,6 +95,7 @@ impl ApiHandler for K2VApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -123,40 +113,42 @@ impl ApiHandler for K2VApiServer { // are always preflighted, i.e. the browser should make // an OPTIONS call before to check it is allowed let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) - .ok_or_internal_error("Error looking up CORS rule")?, + Method::GET | Method::HEAD | Method::POST => { + find_matching_cors_rule(&bucket_params, &req) + .ok_or_internal_error("Error looking up CORS rule")? + .cloned() + } _ => None, }; + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + let resp = match endpoint { Endpoint::DeleteItem { partition_key, sort_key, - } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_delete_item(ctx, req, &partition_key, &sort_key).await, Endpoint::InsertItem { partition_key, sort_key, - } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_insert_item(ctx, req, &partition_key, &sort_key).await, Endpoint::ReadItem { partition_key, sort_key, - } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + } => handle_read_item(ctx, &req, &partition_key, &sort_key).await, Endpoint::PollItem { partition_key, sort_key, causality_token, timeout, } => { - handle_poll_item( - garage, - &req, - bucket_id, - partition_key, - sort_key, - causality_token, - timeout, - ) - .await + handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await } Endpoint::ReadIndex { prefix, @@ -164,12 +156,12 @@ impl ApiHandler for K2VApiServer { end, limit, reverse, - } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, - Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, - Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, - Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + } => handle_read_index(ctx, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await, + Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await, Endpoint::PollRange { partition_key } => { - handle_poll_range(garage, bucket_id, &partition_key, req).await + handle_poll_range(ctx, &partition_key, req).await } Endpoint::Options => unreachable!(), }; @@ -178,7 +170,7 @@ impl ApiHandler for K2VApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index ae2778b1..02b7ae8b 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,14 +1,9 @@ -use std::sync::Arc; - use base64::prelude::*; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use garage_util::data::*; - use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -18,10 +13,12 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; let mut items2 = vec![]; @@ -38,7 +35,7 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -46,8 +43,7 @@ pub async fn handle_insert_batch( } pub async fn handle_read_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; @@ -55,7 +51,7 @@ pub async fn handle_read_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + .map(|q| handle_read_batch_query(&ctx, q)), ) .await; @@ -68,12 +64,15 @@ pub async fn handle_read_batch( } async fn handle_read_batch_query( - garage: &Arc<Garage>, - bucket_id: Uuid, + ctx: &ReqCtx, query: ReadBatchQuery, ) -> Result<ReadBatchResponse, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -138,8 +137,7 @@ async fn handle_read_batch_query( } pub async fn handle_delete_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; @@ -147,7 +145,7 @@ pub async fn handle_delete_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + .map(|q| handle_delete_batch_query(&ctx, q)), ) .await; @@ -160,12 +158,15 @@ pub async fn handle_delete_batch( } async fn handle_delete_batch_query( - garage: &Arc<Garage>, - bucket_id: Uuid, + ctx: &ReqCtx, query: DeleteBatchQuery, ) -> Result<DeleteBatchResponse, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -195,7 +196,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( - bucket_id, + *bucket_id, i.partition.partition_key, i.sort_key, Some(cc), @@ -235,7 +236,7 @@ async fn handle_delete_batch_query( .collect::<Vec<_>>(); let n = items.len(); - garage.k2v.rpc.insert_batch(bucket_id, items).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items).await?; n }; @@ -251,11 +252,13 @@ async fn handle_delete_batch_query( } pub(crate) async fn handle_poll_range( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, partition_key: &str, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; use garage_model::k2v::sub::PollRange; let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 291464ab..e3397238 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,13 +1,8 @@ -use std::sync::Arc; - use hyper::Response; use serde::Serialize; -use garage_util::data::*; - use garage_table::util::*; -use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; @@ -16,14 +11,17 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_read_index( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, prefix: Option<String>, start: Option<String>, end: Option<String>, limit: Option<u64>, reverse: Option<bool>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let reverse = reverse.unwrap_or(false); let node_id_vec = garage diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 0c5931a1..af3af4e4 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,13 +1,8 @@ -use std::sync::Arc; - use base64::prelude::*; use http::header; use hyper::{Request, Response, StatusCode}; -use garage_util::data::*; - -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -100,12 +95,15 @@ impl ReturnFormat { /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_read_item( - garage: Arc<Garage>, + ctx: ReqCtx, req: &Request<ReqBody>, - bucket_id: Uuid, partition_key: &str, sort_key: &String, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let format = ReturnFormat::from(req)?; let item = garage @@ -113,7 +111,7 @@ pub async fn handle_read_item( .item_table .get( &K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: partition_key.to_string(), }, sort_key, @@ -125,12 +123,14 @@ pub async fn handle_read_item( } pub async fn handle_insert_item( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -149,7 +149,7 @@ pub async fn handle_insert_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -163,12 +163,14 @@ pub async fn handle_insert_item( } pub async fn handle_delete_item( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -183,7 +185,7 @@ pub async fn handle_delete_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -199,14 +201,16 @@ pub async fn handle_delete_item( /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( - garage: Arc<Garage>, + ctx: ReqCtx, req: &Request<ReqBody>, - bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option<u64>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let format = ReturnFormat::from(req)?; let causal_context = @@ -218,7 +222,7 @@ pub async fn handle_poll_item( .k2v .rpc .poll_item( - bucket_id, + *bucket_id, partition_key, sort_key, causal_context, diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 08405923..1ed30996 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -17,8 +17,7 @@ use garage_model::key_table::Key; use crate::generic_server::*; use crate::s3::error::*; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::*; +use crate::signature::verify_request; use crate::helpers::*; use crate::s3::bucket::*; @@ -125,17 +124,7 @@ impl ApiHandler for S3ApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; - let api_key = api_key - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = parse_streaming_body( - &api_key, - req, - &mut content_sha256, - &garage.config.s3_api.s3_region, - "s3", - )?; + let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?; let bucket_name = match bucket_name { None => { @@ -166,6 +155,7 @@ impl ApiHandler for S3ApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -178,12 +168,20 @@ impl ApiHandler for S3ApiServer { return Err(Error::forbidden("Operation is not allowed for this key.")); } - let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?; + let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned(); + + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(garage, &req, bucket_id, &key, part_number).await, + } => handle_head(ctx, &req, &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -203,74 +201,37 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + handle_get(ctx, &req, &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => { - handle_put_part( - garage, - req, - bucket_id, - &key, - part_number, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CopyObject { key } => { - handle_copy(garage, &api_key, &req, bucket_id, &key).await - } + } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, - } => { - handle_upload_part_copy( - garage, - &api_key, - &req, - bucket_id, - &key, - part_number, - &upload_id, - ) - .await - } - Endpoint::PutObject { key } => { - handle_put(garage, req, &bucket, &key, content_sha256).await - } + } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, Endpoint::AbortMultipartUpload { key, upload_id } => { - handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + handle_abort_multipart_upload(ctx, &key, &upload_id).await } - Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, + Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await, Endpoint::CreateMultipartUpload { key } => { - handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await + handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload( - garage, - req, - &bucket_name, - &bucket, - &key, - &upload_id, - content_sha256, - ) - .await + handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } - Endpoint::DeleteBucket {} => { - handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await - } - Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), + Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await, + Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx), Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), Endpoint::ListObjects { delimiter, @@ -279,24 +240,21 @@ impl ApiHandler for S3ApiServer { max_keys, prefix, } => { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - is_v2: false, - marker, - continuation_token: None, - start_after: None, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + is_v2: false, + marker, + continuation_token: None, + start_after: None, + }; + handle_list(ctx, &query).await } Endpoint::ListObjectsV2 { delimiter, @@ -309,24 +267,21 @@ impl ApiHandler for S3ApiServer { .. } => { if list_type == "2" { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - prefix: prefix.unwrap_or_default(), - }, - is_v2: true, - marker: None, - continuation_token, - start_after, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + prefix: prefix.unwrap_or_default(), }, - ) - .await + is_v2: true, + marker: None, + continuation_token, + start_after, + }; + handle_list(ctx, &query).await } else { Err(Error::bad_request(format!( "Invalid endpoint: list-type={}", @@ -342,22 +297,19 @@ impl ApiHandler for S3ApiServer { prefix, upload_id_marker, } => { - handle_list_multipart_upload( - garage, - &ListMultipartUploadsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - key_marker, - upload_id_marker, + let query = ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + key_marker, + upload_id_marker, + }; + handle_list_multipart_upload(ctx, &query).await } Endpoint::ListParts { key, @@ -365,39 +317,28 @@ impl ApiHandler for S3ApiServer { part_number_marker, upload_id, } => { - handle_list_parts( - garage, - &ListPartsQuery { - bucket_name, - bucket_id, - key, - upload_id, - part_number_marker: part_number_marker.map(|p| p.min(10000)), - max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), - }, - ) - .await - } - Endpoint::DeleteObjects {} => { - handle_delete_objects(garage, bucket_id, req, content_sha256).await - } - Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, - Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await, - Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, - Endpoint::PutBucketCors {} => { - handle_put_cors(garage, bucket.clone(), req, content_sha256).await + let query = ListPartsQuery { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.min(10000)), + max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), + }; + handle_list_parts(ctx, &query).await } - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await, - Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, + Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, + Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketLifecycle {} => { - handle_delete_lifecycle(garage, bucket.clone()).await + handle_put_lifecycle(ctx, req, content_sha256).await } + Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; @@ -405,7 +346,7 @@ impl ApiHandler for S3ApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index fa337566..6a12aa9c 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -21,7 +20,8 @@ use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> { +pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = ctx; let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -204,21 +204,20 @@ pub async fn handle_create_bucket( .unwrap()) } -pub async fn handle_delete_bucket( - garage: &Garage, - bucket_id: Uuid, - bucket_name: String, - api_key_id: &String, -) -> Result<Response<ResBody>, Error> { +pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params: bucket_state, + api_key, + .. + } = &ctx; let helper = garage.locked_helper().await; - let api_key = helper.key().get_existing_key(api_key_id).await?; let key_params = api_key.params().unwrap(); - let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_))); - - let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; - let bucket_state = bucket.state.as_option().unwrap(); + let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_))); // If the bucket has no other aliases, this is a true deletion. // Otherwise, it is just an alias removal. @@ -228,20 +227,20 @@ pub async fn handle_delete_bucket( .items() .iter() .filter(|(_, _, active)| *active) - .any(|(n, _, _)| is_local_alias || (*n != bucket_name)); + .any(|(n, _, _)| is_local_alias || (*n != *bucket_name)); let has_other_local_aliases = bucket_state .local_aliases .items() .iter() .filter(|(_, _, active)| *active) - .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id); + .any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id); if !has_other_global_aliases && !has_other_local_aliases { // Delete bucket // Check bucket is empty - if !helper.bucket().is_bucket_empty(bucket_id).await? { + if !helper.bucket().is_bucket_empty(*bucket_id).await? { return Err(CommonError::BucketNotEmpty.into()); } @@ -249,33 +248,36 @@ pub async fn handle_delete_bucket( // 1. delete bucket alias if is_local_alias { helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } // 2. delete authorization from keys that had access - for (key_id, _) in bucket.authorized_keys() { + for (key_id, _) in bucket_state.authorized_keys.items() { helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS) .await?; } + let bucket = Bucket { + id: *bucket_id, + state: Deletable::delete(), + }; // 3. delete bucket - bucket.state = Deletable::delete(); garage.bucket_table.insert(&bucket).await?; } else if is_local_alias { // Just unalias helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { // Just unalias (but from global namespace) helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 880ce5f4..3c2bd483 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -1,5 +1,4 @@ use std::pin::Pin; -use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt}; @@ -15,8 +14,6 @@ use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::garage::Garage; -use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -30,15 +27,19 @@ use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( - garage: Arc<Garage>, - api_key: &Key, + ctx: ReqCtx, req: &Request<ReqBody>, - dest_bucket_id: Uuid, dest_key: &str, ) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&garage, api_key, req).await?; + let source_object = get_copy_source(&ctx, req).await?; + + let ReqCtx { + garage, + bucket_id: dest_bucket_id, + .. + } = ctx; let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -181,10 +182,8 @@ pub async fn handle_copy( } pub async fn handle_upload_part_copy( - garage: Arc<Garage>, - api_key: &Key, + ctx: ReqCtx, req: &Request<ReqBody>, - dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, @@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy( let dest_key = dest_key.to_string(); let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( - get_copy_source(&garage, api_key, req), - multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) + get_copy_source(&ctx, req), + multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; + let ReqCtx { garage, .. } = ctx; + let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy( .body(string_body(resp_xml))?) } -async fn get_copy_source( - garage: &Garage, - api_key: &Key, - req: &Request<ReqBody>, -) -> Result<Object, Error> { +async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> { + let ReqCtx { + garage, api_key, .. + } = ctx; + let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index e069cae4..173b7ffe 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -21,16 +21,13 @@ use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; -use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; +use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(cors) = param.cors_config.get() { +pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(cors) = bucket_params.cors_config.get() { let wc = CorsConfiguration { xmlns: (), cors_rules: cors @@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error } } -pub async fn handle_delete_cors( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.cors_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.cors_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -67,28 +66,33 @@ pub async fn handle_delete_cors( } pub async fn handle_put_cors( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .cors_config .update(Some(conf.into_garage_cors_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) @@ -115,7 +119,8 @@ pub async fn handle_options_api( let bucket_id = helper.resolve_global_bucket_name(&bn).await?; if let Some(id) = bucket_id { let bucket = garage.bucket_helper().get_existing_bucket(id).await?; - handle_options_for_bucket(req, &bucket) + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) } else { // If there is a bucket name in the request, but that name // does not correspond to a global alias for a bucket, @@ -145,7 +150,7 @@ pub async fn handle_options_api( pub fn handle_options_for_bucket( req: &Request<IncomingBody>, - bucket: &Bucket, + bucket_params: &BucketParams, ) -> Result<Response<EmptyBody>, CommonError> { let origin = req .headers() @@ -162,7 +167,7 @@ pub fn handle_options_for_bucket( None => vec![], }; - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { let matching_rule = cors_config .iter() .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); @@ -181,10 +186,10 @@ pub fn handle_options_for_bucket( } pub fn find_matching_cors_rule<'a>( - bucket: &'a Bucket, + bucket_params: &'a BucketParams, req: &Request<impl Body>, ) -> Result<Option<&'a GarageCorsRule>, Error> { - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { let origin = origin.to_str()?; let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 3fb39147..57f6f948 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; -use garage_model::garage::Garage; use garage_model::s3::object_table::*; use crate::helpers::*; @@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -async fn handle_delete_internal( - garage: &Garage, - bucket_id: Uuid, - key: &str, -) -> Result<(Uuid, Uuid), Error> { +async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; // No need to delete @@ -44,7 +40,7 @@ async fn handle_delete_internal( }; let object = Object::new( - bucket_id, + *bucket_id, key.into(), vec![ObjectVersion { uuid: del_uuid, @@ -58,12 +54,8 @@ async fn handle_delete_internal( Ok((deleted_version, del_uuid)) } -pub async fn handle_delete( - garage: Arc<Garage>, - bucket_id: Uuid, - key: &str, -) -> Result<Response<ResBody>, Error> { - match handle_delete_internal(&garage, bucket_id, key).await { +pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, Error> { + match handle_delete_internal(&ctx, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(empty_body()) @@ -73,8 +65,7 @@ pub async fn handle_delete( } pub async fn handle_delete_objects( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { @@ -91,7 +82,7 @@ pub async fn handle_delete_objects( let mut ret_errors = Vec::new(); for obj in cmd.objects.iter() { - match handle_delete_internal(&garage, bucket_id, &obj.key).await { + match handle_delete_internal(&ctx, &obj.key).await { Ok((deleted_version, delete_marker_version)) => { if cmd.quiet { continue; diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 0d18e775..ed996fb1 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -131,6 +131,16 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( + ctx: ReqCtx, + req: &Request<impl Body>, + key: &str, + part_number: Option<u64>, +) -> Result<Response<ResBody>, Error> { + handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await +} + +/// Handle HEAD request for website +pub async fn handle_head_without_ctx( garage: Arc<Garage>, req: &Request<impl Body>, bucket_id: Uuid, @@ -218,6 +228,17 @@ pub async fn handle_head( /// Handle GET request pub async fn handle_get( + ctx: ReqCtx, + req: &Request<impl Body>, + key: &str, + part_number: Option<u64>, + overrides: GetObjectOverrides, +) -> Result<Response<ResBody>, Error> { + handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await +} + +/// Handle GET request +pub async fn handle_get_without_ctx( garage: Arc<Garage>, req: &Request<impl Body>, bucket_id: Uuid, diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 35757e8c..7eb1c2cb 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -16,15 +15,12 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; +pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; - if let Some(lifecycle) = param.lifecycle_config.get() { + if let Some(lifecycle) = bucket_params.lifecycle_config.get() { let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; Ok(Response::builder() @@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, } } -pub async fn handle_delete_lifecycle( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.lifecycle_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.lifecycle_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle( } pub async fn handle_put_lifecycle( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf .validate_into_garage_lifecycle_config() .ok_or_bad_request("Invalid lifecycle configuration")?; - param.lifecycle_config.update(Some(config)); - garage.bucket_table.insert(&bucket).await?; + bucket_params.lifecycle_config.update(Some(config)); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index b832a4f4..302c03f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; -use std::sync::Arc; use base64::prelude::*; use hyper::Response; @@ -9,7 +8,6 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model::garage::Garage; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -62,9 +60,10 @@ pub struct ListPartsQuery { } pub async fn handle_list( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListObjectsQuery, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -167,9 +166,11 @@ pub async fn handle_list( } pub async fn handle_list_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListMultipartUploadsQuery, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; + let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload( } pub async fn handle_list_parts( - garage: Arc<Garage>, + ctx: ReqCtx, query: &ListPartsQuery, ) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (_, _, mpu) = - s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; + let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; let (info, next) = fetch_part_info(query, &mpu)?; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 5959bcd6..1d5aeb26 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; @@ -25,12 +24,16 @@ use crate::signature::verify_signed_content; // ---- pub async fn handle_create_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: &Request<ReqBody>, - bucket_name: &str, - bucket_id: Uuid, key: &String, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload( headers, }, }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) - let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); + let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response @@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload( } pub async fn handle_put_part( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { garage, .. } = &ctx; + let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -90,10 +94,8 @@ pub async fn handle_put_part( let stream = body_stream(req.into_body()); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = futures::try_join!( - get_upload(&garage, &bucket_id, &key, &upload_id), - chunker.next(), - )?; + let ((_, _, mut mpu), first_block) = + futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -135,7 +137,7 @@ pub async fn handle_put_part( // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; + read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( @@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup { } pub async fn handle_complete_multipart_upload( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket_name: &str, - bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; + let body = http_body_util::BodyExt::collect(req.into_body()) .await? .to_bytes(); @@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload( // Get object and multipart upload let key = key.to_string(); - let (object, mut object_version, mpu) = - get_upload(&garage, &bucket.id, &key, &upload_id).await?; + let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); @@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload( let mut final_version = Version::new( upload_id, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.to_string(), }, false, @@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); - if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { + if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; return Err(e); @@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload( final_version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done @@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload( } pub async fn handle_abort_multipart_upload( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, key: &str, upload_id: &str, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let upload_id = decode_upload_id(upload_id)?; - let (_, mut object_version, _) = - get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?; object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(empty_body())) @@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload( #[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( - garage: &Garage, - bucket_id: &Uuid, + ctx: &ReqCtx, key: &String, upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let (object, mpu) = futures::try_join!( garage.object_table.get(bucket_id, key).map_err(Error::from), garage diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index bca8d6c6..66f8174c 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -21,7 +21,7 @@ use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; -use crate::signature::payload::{parse_date, verify_v4}; +use crate::signature::payload::{verify_v4, Authorization}; pub async fn handle_post_object( garage: Arc<Garage>, @@ -88,22 +88,11 @@ pub async fn handle_post_object( .get("key") .ok_or_bad_request("No key was provided")? .to_str()?; - let credential = params - .get("x-amz-credential") - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? - .to_str()?; let policy = params .get("policy") .ok_or_bad_request("No policy was provided")? .to_str()?; - let signature = params - .get("x-amz-signature") - .ok_or_bad_request("No signature was provided")? - .to_str()?; - let date = params - .get("x-amz-date") - .ok_or_bad_request("No date was provided")? - .to_str()?; + let authorization = Authorization::parse_form(¶ms)?; let key = if key.contains("${filename}") { // if no filename is provided, don't replace. This matches the behavior of AWS. @@ -116,16 +105,7 @@ pub async fn handle_post_object( key.to_owned() }; - let date = parse_date(date)?; - let api_key = verify_v4( - &garage, - "s3", - credential, - &date, - signature, - policy.as_bytes(), - ) - .await?; + let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?; let bucket_id = garage .bucket_helper() @@ -140,6 +120,12 @@ pub async fn handle_post_object( .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); + let matching_cors_rule = find_matching_cors_rule( + &bucket_params, + &Request::from_parts(head.clone(), empty_body::<Infallible>()), + )? + .cloned(); let decoded_policy = BASE64_STANDARD .decode(policy) @@ -233,11 +219,19 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; let stream = field.map(|r| r.map_err(Into::into)); - let (_, md5) = save_stream( + + let ctx = ReqCtx { garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + + let (_, md5) = save_stream( + &ctx, headers, StreamLimiter::new(stream, conditions.content_length), - &bucket, &key, None, None, @@ -254,7 +248,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket_name) + .append_pair("bucket", &ctx.bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -298,7 +292,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket_name), + bucket: s3_xml::Value(ctx.bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; @@ -311,12 +305,8 @@ pub async fn handle_post_object( } }; - let matching_cors_rule = find_matching_cors_rule( - &bucket, - &Request::from_parts(head, empty_body::<Infallible>()), - )?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp, rule) + add_cors_headers(&mut resp, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c1af513c..f06aa7a2 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; @@ -42,9 +41,8 @@ use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; pub async fn handle_put( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket: &Bucket, key: &String, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { @@ -59,35 +57,27 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream( - garage, - headers, - stream, - bucket, - key, - content_md5, - content_sha256, - ) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + save_stream(&ctx, headers, stream, key, content_md5, content_sha256) + .await + .map(|(uuid, md5)| put_response(uuid, md5)) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: Arc<Garage>, + ctx: &ReqCtx, headers: ObjectVersionHeaders, body: S, - bucket: &Bucket, key: &String, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, ) -> Result<(Uuid, String), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage - .object_table - .get(&bucket.id, key) - .map_err(Error::from), + garage.object_table.get(bucket_id, key).map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; + check_quotas(ctx, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( )), }; - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), version_uuid, version_timestamp, @@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( multipart: false, }, }; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); + let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table @@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version = Version::new( version_uuid, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), }, false, @@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; + check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; // We were not interrupted, everything went fine. @@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches( /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( - garage: &Arc<Garage>, - bucket: &Bucket, + ctx: &ReqCtx, size: u64, prev_object: Option<&Object>, ) -> Result<(), Error> { - let quotas = bucket.state.as_option().unwrap().quotas.get(); + let ReqCtx { + garage, + bucket_id, + bucket_params, + .. + } = ctx; + + let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; @@ -248,7 +244,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(&bucket.id, &EmptyKey) + .get(bucket_id, &EmptyKey) .await?; let counters = counters @@ -292,7 +288,7 @@ pub(crate) async fn check_quotas( } pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, first_block: Bytes, @@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let offset = written_bytes; written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( - garage, + ctx, version, part_number, offset, @@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } async fn put_block_and_meta( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, offset: u64, @@ -455,6 +451,8 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { + let ReqCtx { garage, .. } = ctx; + let mut version = version.clone(); version.blocks.put( VersionBlockKey { diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 1c1dbf20..6af55677 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(website) = param.website_config.get() { +pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(website) = bucket_params.website_config.get() { let wc = WebsiteConfiguration { xmlns: (), error_document: website.error_document.as_ref().map(|v| Key { @@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Er } } -pub async fn handle_delete_website( - garage: Arc<Garage>, - mut bucket: Bucket, -) -> Result<Response<ResBody>, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.website_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.website_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -61,28 +58,33 @@ pub async fn handle_delete_website( } pub async fn handle_put_website( - garage: Arc<Garage>, - mut bucket: Bucket, + ctx: ReqCtx, req: Request<ReqBody>, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .website_config .update(Some(conf.into_garage_website_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index 4b8b990f..6514da43 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -2,19 +2,44 @@ use chrono::{DateTime, Utc}; use hmac::{Hmac, Mac}; use sha2::Sha256; +use hyper::{body::Incoming as IncomingBody, Request}; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_util::data::{sha256sum, Hash}; +use error::*; + pub mod error; pub mod payload; pub mod streaming; -use error::*; - pub const SHORT_DATE: &str = "%Y%m%d"; pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; type HmacSha256 = Hmac<Sha256>; +pub async fn verify_request( + garage: &Garage, + mut req: Request<IncomingBody>, + service: &'static str, +) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> { + let (api_key, mut content_sha256) = + payload::check_payload_signature(&garage, &mut req, service).await?; + let api_key = + api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; + + let req = streaming::parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + service, + )?; + + Ok((req, api_key, content_sha256)) +} + pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { if expected_sha256 != sha256sum(body) { return Err(Error::bad_request( diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 423aad93..d72736bb 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; +use std::convert::TryFrom; use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc}; use hmac::Mac; +use hyper::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, HOST}; use hyper::{body::Incoming as IncomingBody, Method, Request}; use sha2::{Digest, Sha256}; @@ -17,66 +19,98 @@ use super::{compute_scope, signing_hmac}; use crate::encoding::uri_encode; use crate::signature::error::*; +pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm"); +pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential"); +pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date"); +pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires"); +pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders"); +pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature"); +pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); + +pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256"; +pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; +pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; + +pub type QueryMap = HeaderMap<QueryValue>; +pub struct QueryValue { + /// Original key with potential uppercase characters, + /// for use in signature calculation + key: String, + value: String, +} + pub async fn check_payload_signature( garage: &Garage, + request: &mut Request<IncomingBody>, service: &'static str, - request: &Request<IncomingBody>, ) -> Result<(Option<Key>, Option<Hash>), Error> { - let mut headers = HashMap::new(); - for (key, val) in request.headers() { - headers.insert(key.to_string(), val.to_str()?.to_string()); - } - if let Some(query) = request.uri().query() { - let query_pairs = url::form_urlencoded::parse(query.as_bytes()); - for (key, val) in query_pairs { - headers.insert(key.to_lowercase(), val.to_string()); - } - } - - let authorization = if let Some(authorization) = headers.get("authorization") { - parse_authorization(authorization, &headers)? - } else if let Some(algorithm) = headers.get("x-amz-algorithm") { - parse_query_authorization(algorithm, &headers)? + let query = parse_query_map(request.uri())?; + + if query.contains_key(&X_AMZ_ALGORITHM) { + // We check for presigned-URL-style authentification first, because + // the browser or someting else could inject an Authorization header + // that is totally unrelated to AWS signatures. + check_presigned_signature(garage, service, request, query).await + } else if request.headers().contains_key(AUTHORIZATION) { + check_standard_signature(garage, service, request, query).await } else { - let content_sha256 = headers.get("x-amz-content-sha256"); - if let Some(content_sha256) = content_sha256.filter(|c| "UNSIGNED-PAYLOAD" != c.as_str()) { + // Unsigned (anonymous) request + let content_sha256 = request + .headers() + .get("x-amz-content-sha256") + .filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes()); + if let Some(content_sha256) = content_sha256 { let sha256 = hex::decode(content_sha256) .ok() .and_then(|bytes| Hash::try_from(&bytes)) .ok_or_bad_request("Invalid content sha256 hash")?; - return Ok((None, Some(sha256))); + Ok((None, Some(sha256))) } else { - return Ok((None, None)); + Ok((None, None)) } - }; + } +} + +async fn check_standard_signature( + garage: &Garage, + service: &'static str, + request: &Request<IncomingBody>, + query: QueryMap, +) -> Result<(Option<Key>, Option<Hash>), Error> { + let authorization = Authorization::parse_header(request.headers())?; + + // Verify that all necessary request headers are included in signed_headers + // The following must be included for all signatures: + // - the Host header (mandatory) + // - all x-amz-* headers used in the request + // AWS also indicates that the Content-Type header should be signed if + // it is used, but Minio client doesn't sign it so we don't check it for compatibility. + let signed_headers = split_signed_headers(&authorization)?; + verify_signed_headers(request.headers(), &signed_headers)?; let canonical_request = canonical_request( service, request.method(), - request.uri(), - &headers, - &authorization.signed_headers, + request.uri().path(), + &query, + request.headers(), + &signed_headers, &authorization.content_sha256, + )?; + let string_to_sign = string_to_sign( + &authorization.date, + &authorization.scope, + &canonical_request, ); - let (_, scope) = parse_credential(&authorization.credential)?; - let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request); trace!("canonical request:\n{}", canonical_request); trace!("string to sign:\n{}", string_to_sign); - let key = verify_v4( - garage, - service, - &authorization.credential, - &authorization.date, - &authorization.signature, - string_to_sign.as_bytes(), - ) - .await?; + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; - let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { + let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD { None - } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + } else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD { let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) } else { @@ -88,124 +122,102 @@ pub async fn check_payload_signature( Ok((Some(key), content_sha256)) } -struct Authorization { - credential: String, - signed_headers: String, - signature: String, - content_sha256: String, - date: DateTime<Utc>, -} - -fn parse_authorization( - authorization: &str, - headers: &HashMap<String, String>, -) -> Result<Authorization, Error> { - let first_space = authorization - .find(' ') - .ok_or_bad_request("Authorization field to short")?; - let (auth_kind, rest) = authorization.split_at(first_space); - - if auth_kind != "AWS4-HMAC-SHA256" { - return Err(Error::bad_request("Unsupported authorization method")); - } - - let mut auth_params = HashMap::new(); - for auth_part in rest.split(',') { - let auth_part = auth_part.trim(); - let eq = auth_part - .find('=') - .ok_or_bad_request("Field without value in authorization header")?; - let (key, value) = auth_part.split_at(eq); - auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); - } - - let cred = auth_params - .get("Credential") - .ok_or_bad_request("Could not find Credential in Authorization field")?; - - let content_sha256 = headers - .get("x-amz-content-sha256") - .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; - - let date = headers - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field") - .map_err(Error::from) - .and_then(|d| parse_date(d))?; +async fn check_presigned_signature( + garage: &Garage, + service: &'static str, + request: &mut Request<IncomingBody>, + mut query: QueryMap, +) -> Result<(Option<Key>, Option<Hash>), Error> { + let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap(); + let authorization = Authorization::parse_presigned(&algorithm.value, &query)?; + + // Verify that all necessary request headers are included in signed_headers + // For AWSv4 pre-signed URLs, the following must be incldued: + // - the Host header (mandatory) + // - all x-amz-* headers used in the request + let signed_headers = split_signed_headers(&authorization)?; + verify_signed_headers(request.headers(), &signed_headers)?; + + // The X-Amz-Signature value is passed as a query parameter, + // but the signature cannot be computed from a string that contains itself. + // AWS specifies that all query params except X-Amz-Signature are included + // in the canonical request. + query.remove(&X_AMZ_SIGNATURE); + let canonical_request = canonical_request( + service, + request.method(), + request.uri().path(), + &query, + request.headers(), + &signed_headers, + &authorization.content_sha256, + )?; + let string_to_sign = string_to_sign( + &authorization.date, + &authorization.scope, + &canonical_request, + ); - if Utc::now() - date > Duration::hours(24) { - return Err(Error::bad_request("Date is too old".to_string())); + trace!("canonical request (presigned url):\n{}", canonical_request); + trace!("string to sign (presigned url):\n{}", string_to_sign); + + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; + + // In the page on presigned URLs, AWS specifies that if a signed query + // parameter and a signed header of the same name have different values, + // then an InvalidRequest error is raised. + let headers_mut = request.headers_mut(); + for (name, value) in query.iter() { + if let Some(existing) = headers_mut.get(name) { + if signed_headers.contains(&name) && existing.as_bytes() != value.value.as_bytes() { + return Err(Error::bad_request(format!( + "Conflicting values for `{}` in query parameters and request headers", + name + ))); + } + } + if name.as_str().starts_with("x-amz-") { + // Query parameters that start by x-amz- are actually intended to stand in for + // headers that can't be added at the time the request is made. + // What we do is just add them to the Request object as regular headers, + // that will be handled downstream as if they were included like in a normal request. + // (Here we allow such query parameters to override headers with the same name + // that are not signed, however there is not much reason that this would happen) + headers_mut.insert( + name, + HeaderValue::from_bytes(value.value.as_bytes()) + .ok_or_bad_request("invalid query parameter value")?, + ); + } } - let auth = Authorization { - credential: cred.to_string(), - signed_headers: auth_params - .get("SignedHeaders") - .ok_or_bad_request("Could not find SignedHeaders in Authorization field")? - .to_string(), - signature: auth_params - .get("Signature") - .ok_or_bad_request("Could not find Signature in Authorization field")? - .to_string(), - content_sha256: content_sha256.to_string(), - date, - }; - Ok(auth) + // Presigned URLs always use UNSIGNED-PAYLOAD, + // so there is no sha256 hash to return. + Ok((Some(key), None)) } -fn parse_query_authorization( - algorithm: &str, - headers: &HashMap<String, String>, -) -> Result<Authorization, Error> { - if algorithm != "AWS4-HMAC-SHA256" { - return Err(Error::bad_request( - "Unsupported authorization method".to_string(), - )); - } - - let cred = headers - .get("x-amz-credential") - .ok_or_bad_request("X-Amz-Credential not found in query parameters")?; - let signed_headers = headers - .get("x-amz-signedheaders") - .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?; - let signature = headers - .get("x-amz-signature") - .ok_or_bad_request("X-Amz-Signature not found in query parameters")?; - let content_sha256 = headers - .get("x-amz-content-sha256") - .map(|x| x.as_str()) - .unwrap_or("UNSIGNED-PAYLOAD"); - - let duration = headers - .get("x-amz-expires") - .ok_or_bad_request("X-Amz-Expires not found in query parameters")? - .parse() - .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?; - - if duration > 7 * 24 * 3600 { - return Err(Error::bad_request( - "X-Amz-Expires may not exceed a week".to_string(), - )); - } - - let date = headers - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field") - .map_err(Error::from) - .and_then(|d| parse_date(d))?; - - if Utc::now() - date > Duration::seconds(duration) { - return Err(Error::bad_request("Date is too old".to_string())); +pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> { + let mut query = QueryMap::with_capacity(0); + if let Some(query_str) = uri.query() { + let query_pairs = url::form_urlencoded::parse(query_str.as_bytes()); + for (key, val) in query_pairs { + let name = + HeaderName::from_bytes(key.as_bytes()).ok_or_bad_request("Invalid header name")?; + + let value = QueryValue { + key: key.to_string(), + value: val.into_owned(), + }; + + if query.insert(name, value).is_some() { + return Err(Error::bad_request(format!( + "duplicate query parameter: `{}`", + key + ))); + } + } } - - Ok(Authorization { - credential: cred.to_string(), - signed_headers: signed_headers.to_string(), - signature: signature.to_string(), - content_sha256: content_sha256.to_string(), - date, - }) + Ok(query) } fn parse_credential(cred: &str) -> Result<(String, String), Error> { @@ -219,11 +231,39 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> { )) } +fn split_signed_headers(authorization: &Authorization) -> Result<Vec<HeaderName>, Error> { + let mut signed_headers = authorization + .signed_headers + .split(';') + .map(HeaderName::try_from) + .collect::<Result<Vec<HeaderName>, _>>() + .ok_or_bad_request("invalid header name")?; + signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str())); + Ok(signed_headers) +} + +fn verify_signed_headers(headers: &HeaderMap, signed_headers: &[HeaderName]) -> Result<(), Error> { + if !signed_headers.contains(&HOST) { + return Err(Error::bad_request("Header `Host` should be signed")); + } + for (name, _) in headers.iter() { + if name.as_str().starts_with("x-amz-") { + if !signed_headers.contains(name) { + return Err(Error::bad_request(format!( + "Header `{}` should be signed", + name + ))); + } + } + } + Ok(()) +} + pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String { let mut hasher = Sha256::default(); hasher.update(canonical_req.as_bytes()); [ - "AWS4-HMAC-SHA256", + AWS4_HMAC_SHA256, &datetime.format(LONG_DATETIME).to_string(), scope_string, &hex::encode(hasher.finalize().as_slice()), @@ -234,11 +274,12 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re pub fn canonical_request( service: &'static str, method: &Method, - uri: &hyper::Uri, - headers: &HashMap<String, String>, - signed_headers: &str, + canonical_uri: &str, + query: &QueryMap, + headers: &HeaderMap, + signed_headers: &[HeaderName], content_sha256: &str, -) -> String { +) -> Result<String, Error> { // There seems to be evidence that in AWSv4 signatures, the path component is url-encoded // a second time when building the canonical request, as specified in this documentation page: // -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html @@ -268,49 +309,46 @@ pub fn canonical_request( // it mentions it in the comments (same link to the souce code as above). // We make the explicit choice of NOT normalizing paths in the K2V API because doing so // would make non-normalized paths invalid K2V partition keys, and we don't want that. - let path: std::borrow::Cow<str> = if service != "s3" { - uri_encode(uri.path(), false).into() + let canonical_uri: std::borrow::Cow<str> = if service != "s3" { + uri_encode(canonical_uri, false).into() } else { - uri.path().into() + canonical_uri.into() }; - [ - method.as_str(), - &path, - &canonical_query_string(uri), - &canonical_header_string(headers, signed_headers), - "", - signed_headers, - content_sha256, - ] - .join("\n") -} -fn canonical_header_string(headers: &HashMap<String, String>, signed_headers: &str) -> String { - let signed_headers_vec = signed_headers.split(';').collect::<Vec<_>>(); - let mut items = headers - .iter() - .filter(|(key, _)| signed_headers_vec.contains(&key.as_str())) - .collect::<Vec<_>>(); - items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - items - .iter() - .map(|(key, value)| key.to_lowercase() + ":" + value.trim()) - .collect::<Vec<_>>() - .join("\n") -} - -fn canonical_query_string(uri: &hyper::Uri) -> String { - if let Some(query) = uri.query() { - let query_pairs = url::form_urlencoded::parse(query.as_bytes()); - let mut items = query_pairs - .filter(|(key, _)| key != "X-Amz-Signature") - .map(|(key, value)| uri_encode(&key, true) + "=" + &uri_encode(&value, true)) - .collect::<Vec<_>>(); + // Canonical query string from passed HeaderMap + let canonical_query_string = { + let mut items = Vec::with_capacity(query.len()); + for (_, QueryValue { key, value }) in query.iter() { + items.push(uri_encode(&key, true) + "=" + &uri_encode(&value, true)); + } items.sort(); items.join("&") - } else { - "".to_string() - } + }; + + // Canonical header string calculated from signed headers + let canonical_header_string = signed_headers + .iter() + .map(|name| { + let value = headers + .get(name) + .ok_or_bad_request(format!("signed header `{}` is not present", name))? + .to_str()?; + Ok(format!("{}:{}", name.as_str(), value.trim())) + }) + .collect::<Result<Vec<String>, Error>>()? + .join("\n"); + let signed_headers = signed_headers.join(";"); + + let list = [ + method.as_str(), + &canonical_uri, + &canonical_query_string, + &canonical_header_string, + "", + &signed_headers, + content_sha256, + ]; + Ok(list.join("\n")) } pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { @@ -322,38 +360,203 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { pub async fn verify_v4( garage: &Garage, service: &str, - credential: &str, - date: &DateTime<Utc>, - signature: &str, + auth: &Authorization, payload: &[u8], ) -> Result<Key, Error> { - let (key_id, scope) = parse_credential(credential)?; - - let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service); - if scope != scope_expected { - return Err(Error::AuthorizationHeaderMalformed(scope.to_string())); + let scope_expected = compute_scope(&auth.date, &garage.config.s3_api.s3_region, service); + if auth.scope != scope_expected { + return Err(Error::AuthorizationHeaderMalformed(auth.scope.to_string())); } let key = garage .key_table - .get(&EmptyKey, &key_id) + .get(&EmptyKey, &auth.key_id) .await? .filter(|k| !k.state.is_deleted()) - .ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?; + .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?; let key_p = key.params().unwrap(); let mut hmac = signing_hmac( - date, + &auth.date, &key_p.secret_key, &garage.config.s3_api.s3_region, service, ) .ok_or_internal_error("Unable to build signing HMAC")?; hmac.update(payload); - let our_signature = hex::encode(hmac.finalize().into_bytes()); - if signature != our_signature { - return Err(Error::forbidden("Invalid signature".to_string())); + let signature = + hex::decode(&auth.signature).map_err(|_| Error::forbidden("Invalid signature"))?; + if hmac.verify_slice(&signature).is_err() { + return Err(Error::forbidden("Invalid signature")); } Ok(key) } + +// ============ Authorization header, or X-Amz-* query params ========= + +pub struct Authorization { + key_id: String, + scope: String, + signed_headers: String, + signature: String, + content_sha256: String, + date: DateTime<Utc>, +} + +impl Authorization { + fn parse_header(headers: &HeaderMap) -> Result<Self, Error> { + let authorization = headers + .get(AUTHORIZATION) + .ok_or_bad_request("Missing authorization header")? + .to_str()?; + + let (auth_kind, rest) = authorization + .split_once(' ') + .ok_or_bad_request("Authorization field to short")?; + + if auth_kind != AWS4_HMAC_SHA256 { + return Err(Error::bad_request("Unsupported authorization method")); + } + + let mut auth_params = HashMap::new(); + for auth_part in rest.split(',') { + let auth_part = auth_part.trim(); + let eq = auth_part + .find('=') + .ok_or_bad_request("Field without value in authorization header")?; + let (key, value) = auth_part.split_at(eq); + auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); + } + + let cred = auth_params + .get("Credential") + .ok_or_bad_request("Could not find Credential in Authorization field")?; + let signed_headers = auth_params + .get("SignedHeaders") + .ok_or_bad_request("Could not find SignedHeaders in Authorization field")? + .to_string(); + let signature = auth_params + .get("Signature") + .ok_or_bad_request("Could not find Signature in Authorization field")? + .to_string(); + + let content_sha256 = headers + .get(X_AMZ_CONTENT_SH256) + .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; + + let date = headers + .get(X_AMZ_DATE) + .ok_or_bad_request("Missing X-Amz-Date field") + .map_err(Error::from)? + .to_str()?; + let date = parse_date(date)?; + + if Utc::now() - date > Duration::hours(24) { + return Err(Error::bad_request("Date is too old".to_string())); + } + + let (key_id, scope) = parse_credential(cred)?; + let auth = Authorization { + key_id, + scope, + signed_headers, + signature, + content_sha256: content_sha256.to_str()?.to_string(), + date, + }; + Ok(auth) + } + + fn parse_presigned(algorithm: &str, query: &QueryMap) -> Result<Self, Error> { + if algorithm != AWS4_HMAC_SHA256 { + return Err(Error::bad_request( + "Unsupported authorization method".to_string(), + )); + } + + let cred = query + .get(&X_AMZ_CREDENTIAL) + .ok_or_bad_request("X-Amz-Credential not found in query parameters")?; + let signed_headers = query + .get(&X_AMZ_SIGNEDHEADERS) + .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?; + let signature = query + .get(&X_AMZ_SIGNATURE) + .ok_or_bad_request("X-Amz-Signature not found in query parameters")?; + + let duration = query + .get(&X_AMZ_EXPIRES) + .ok_or_bad_request("X-Amz-Expires not found in query parameters")? + .value + .parse() + .map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?; + + if duration > 7 * 24 * 3600 { + return Err(Error::bad_request( + "X-Amz-Expires may not exceed a week".to_string(), + )); + } + + let date = query + .get(&X_AMZ_DATE) + .ok_or_bad_request("Missing X-Amz-Date field")?; + let date = parse_date(&date.value)?; + + if Utc::now() - date > Duration::seconds(duration) { + return Err(Error::bad_request("Date is too old".to_string())); + } + + let (key_id, scope) = parse_credential(&cred.value)?; + Ok(Authorization { + key_id, + scope, + signed_headers: signed_headers.value.clone(), + signature: signature.value.clone(), + content_sha256: UNSIGNED_PAYLOAD.to_string(), + date, + }) + } + + pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> { + let algorithm = params + .get(X_AMZ_ALGORITHM) + .ok_or_bad_request("Missing X-Amz-Algorithm header")? + .to_str()?; + if algorithm != AWS4_HMAC_SHA256 { + return Err(Error::bad_request( + "Unsupported authorization method".to_string(), + )); + } + + let credential = params + .get(X_AMZ_CREDENTIAL) + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? + .to_str()?; + let signature = params + .get(X_AMZ_SIGNATURE) + .ok_or_bad_request("No signature was provided")? + .to_str()? + .to_string(); + let date = params + .get(X_AMZ_DATE) + .ok_or_bad_request("No date was provided")? + .to_str()?; + let date = parse_date(date)?; + + if Utc::now() - date > Duration::hours(24) { + return Err(Error::bad_request("Date is too old".to_string())); + } + + let (key_id, scope) = parse_credential(credential)?; + let auth = Authorization { + key_id, + scope, + signed_headers: "".to_string(), + signature, + content_sha256: UNSIGNED_PAYLOAD.to_string(), + date, + }; + Ok(auth) + } +} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index a2a71f6b..e223d1b1 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -15,6 +15,11 @@ use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; use crate::helpers::*; use crate::signature::error::*; +use crate::signature::payload::{ + STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE, +}; + +pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; pub type ReqBody = BoxBody<Error>; @@ -25,8 +30,8 @@ pub fn parse_streaming_body( region: &str, service: &str, ) -> Result<Request<ReqBody>, Error> { - match req.headers().get("x-amz-content-sha256") { - Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + match req.headers().get(X_AMZ_CONTENT_SH256) { + Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => { let signature = content_sha256 .take() .ok_or_bad_request("No signature provided")?; @@ -39,7 +44,7 @@ pub fn parse_streaming_body( let date = req .headers() - .get("x-amz-date") + .get(X_AMZ_DATE) .ok_or_bad_request("Missing X-Amz-Date field")? .to_str()?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) @@ -75,7 +80,7 @@ fn compute_streaming_payload_signature( content_sha256: Hash, ) -> Result<Hash, Error> { let string_to_sign = [ - "AWS4-HMAC-SHA256-PAYLOAD", + AWS4_HMAC_SHA256_PAYLOAD, &date.format(LONG_DATETIME).to_string(), scope, &hex::encode(previous_signature), diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index e5f4cca1..8e1eaa56 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -1,12 +1,15 @@ #![allow(dead_code)] use std::collections::HashMap; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use chrono::{offset::Utc, DateTime}; use hmac::{Hmac, Mac}; use http_body_util::BodyExt; use http_body_util::Full as FullBody; +use hyper::header::{ + HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LENGTH, HOST, +}; use hyper::{Method, Request, Response, Uri}; use hyper_util::client::legacy::{connect::HttpConnector, Client}; use hyper_util::rt::TokioExecutor; @@ -61,6 +64,10 @@ impl CustomRequester { vhost_style: false, } } + + pub fn client(&self) -> &Client<HttpConnector, Body> { + &self.client + } } pub struct RequestBuilder<'a> { @@ -173,54 +180,85 @@ impl<'a> RequestBuilder<'a> { .unwrap(); let streaming_signer = signer.clone(); - let mut all_headers = self.signed_headers.clone(); + let mut all_headers = self + .signed_headers + .iter() + .map(|(k, v)| { + ( + HeaderName::try_from(k).expect("invalid header name"), + HeaderValue::try_from(v).expect("invalid header value"), + ) + }) + .collect::<HeaderMap>(); let date = now.format(signature::LONG_DATETIME).to_string(); - all_headers.insert("x-amz-date".to_owned(), date); - all_headers.insert("host".to_owned(), host); + all_headers.insert( + signature::payload::X_AMZ_DATE, + HeaderValue::from_str(&date).unwrap(), + ); + all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap()); let body_sha = match self.body_signature { BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(), BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)), BodySignature::Streaming(size) => { - all_headers.insert("content-encoding".to_owned(), "aws-chunked".to_owned()); all_headers.insert( - "x-amz-decoded-content-length".to_owned(), - self.body.len().to_string(), + CONTENT_ENCODING, + HeaderValue::from_str("aws-chunked").unwrap(), + ); + all_headers.insert( + HeaderName::from_static("x-amz-decoded-content-length"), + HeaderValue::from_str(&self.body.len().to_string()).unwrap(), ); // Get lenght of body by doing the conversion to a streaming body with an // invalid signature (we don't know the seed) just to get its length. This // is a pretty lazy and inefficient way to do it, but it's enought for test // code. all_headers.insert( - "content-length".to_owned(), + CONTENT_LENGTH, to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "") .len() - .to_string(), + .to_string() + .try_into() + .unwrap(), ); "STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned() } }; - all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone()); + all_headers.insert( + signature::payload::X_AMZ_CONTENT_SH256, + HeaderValue::from_str(&body_sha).unwrap(), + ); - let mut signed_headers = all_headers - .keys() - .map(|k| k.as_ref()) - .collect::<Vec<&str>>(); - signed_headers.sort(); - let signed_headers = signed_headers.join(";"); + let mut signed_headers = all_headers.keys().cloned().collect::<Vec<_>>(); + signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str())); + let signed_headers_str = signed_headers + .iter() + .map(ToString::to_string) + .collect::<Vec<_>>() + .join(";"); - all_headers.extend(self.unsigned_headers.clone()); + all_headers.extend(self.unsigned_headers.iter().map(|(k, v)| { + ( + HeaderName::try_from(k).expect("invalid header name"), + HeaderValue::try_from(v).expect("invalid header value"), + ) + })); + + let uri = Uri::try_from(&uri).unwrap(); + let query = signature::payload::parse_query_map(&uri).unwrap(); let canonical_request = signature::payload::canonical_request( self.service, &self.method, - &Uri::try_from(&uri).unwrap(), + uri.path(), + &query, &all_headers, &signed_headers, &body_sha, - ); + ) + .unwrap(); let string_to_sign = signature::payload::string_to_sign(&now, &scope, &canonical_request); @@ -228,14 +266,15 @@ impl<'a> RequestBuilder<'a> { let signature = hex::encode(signer.finalize().into_bytes()); let authorization = format!( "AWS4-HMAC-SHA256 Credential={}/{},SignedHeaders={},Signature={}", - self.requester.key.id, scope, signed_headers, signature + self.requester.key.id, scope, signed_headers_str, signature + ); + all_headers.insert( + AUTHORIZATION, + HeaderValue::from_str(&authorization).unwrap(), ); - all_headers.insert("authorization".to_owned(), authorization); let mut request = Request::builder(); - for (k, v) in all_headers { - request = request.header(k, v); - } + *request.headers_mut().unwrap() = all_headers; let body = if let BodySignature::Streaming(size) = self.body_signature { to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope) diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs index 623eb665..4ebc4914 100644 --- a/src/garage/tests/s3/mod.rs +++ b/src/garage/tests/s3/mod.rs @@ -1,6 +1,7 @@ mod list; mod multipart; mod objects; +mod presigned; mod simple; mod streaming_signature; mod website; diff --git a/src/garage/tests/s3/presigned.rs b/src/garage/tests/s3/presigned.rs new file mode 100644 index 00000000..15270361 --- /dev/null +++ b/src/garage/tests/s3/presigned.rs @@ -0,0 +1,72 @@ +use std::time::{Duration, SystemTime}; + +use crate::common; +use aws_sdk_s3::presigning::PresigningConfig; +use bytes::Bytes; +use http_body_util::{BodyExt, Full}; +use hyper::Request; + +const STD_KEY: &str = "hello world"; +const BODY: &[u8; 62] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +#[tokio::test] +async fn test_presigned_url() { + let ctx = common::context(); + let bucket = ctx.create_bucket("presigned"); + + let etag = "\"46cf18a9b447991b450cad3facf5937e\""; + let body = Bytes::from(BODY.to_vec()); + + let psc = PresigningConfig::builder() + .start_time(SystemTime::now() - Duration::from_secs(60)) + .expires_in(Duration::from_secs(3600)) + .build() + .unwrap(); + + { + // PutObject + let req = ctx + .client + .put_object() + .bucket(&bucket) + .key(STD_KEY) + .presigned(psc.clone()) + .await + .unwrap(); + + let client = ctx.custom_request.client(); + let req = Request::builder() + .method("PUT") + .uri(req.uri()) + .body(Full::new(body.clone())) + .unwrap(); + let res = client.request(req).await.unwrap(); + assert_eq!(res.status(), 200); + assert_eq!(res.headers().get("etag").unwrap(), etag); + } + + { + // GetObject + let req = ctx + .client + .get_object() + .bucket(&bucket) + .key(STD_KEY) + .presigned(psc) + .await + .unwrap(); + + let client = ctx.custom_request.client(); + let req = Request::builder() + .method("GET") + .uri(req.uri()) + .body(Full::new(Bytes::new())) + .unwrap(); + let res = client.request(req).await.unwrap(); + assert_eq!(res.status(), 200); + assert_eq!(res.headers().get("etag").unwrap(), etag); + + let body2 = BodyExt::collect(res.into_body()).await.unwrap().to_bytes(); + assert_eq!(body, body2); + } +} diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index 224b9ed5..351aa422 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -26,7 +26,7 @@ async fn test_putobject_streaming() { .builder(bucket.clone()) .method(Method::PUT) .path(STD_KEY.to_owned()) - .unsigned_headers(headers) + .signed_headers(headers) .vhost_style(true) .body(vec![]) .body_signature(BodySignature::Streaming(10)) diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 4c48a76f..1dbdfac2 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -191,6 +191,13 @@ impl Bucket { } } + pub fn present(id: Uuid, params: BucketParams) -> Self { + Bucket { + id, + state: crdt::Deletable::present(params), + } + } + /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { self.state.is_deleted() diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index df81c437..4bd0d2e5 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -3,7 +3,7 @@ name = "garage_net" version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" -license-file = "AGPL-3.0" +license = "AGPL-3.0" description = "Networking library for Garage RPC communication, forked from Netapp" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" readme = "../../README.md" diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 0f9b5dc8..69939f65 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -26,7 +26,7 @@ use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_opt use garage_api::s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, }; -use garage_api::s3::get::{handle_get, handle_head}; +use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; use garage_model::garage::Garage; @@ -219,14 +219,13 @@ impl WebServer { // Check bucket isn't deleted and has website access enabled let bucket = self .garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NotFound)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await + .map_err(|_| Error::NotFound)?; + let bucket_params = bucket.state.into_option().unwrap(); - let website_config = bucket - .params() - .ok_or(Error::NotFound)? + let website_config = bucket_params .website_config .get() .as_ref() @@ -243,14 +242,16 @@ impl WebServer { ); let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket) + Method::OPTIONS => handle_options_for_bucket(req, &bucket_params) .map_err(ApiError::from) .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), - Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, + Method::HEAD => { + handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await + } Method::GET => { - handle_get( + handle_get_without_ctx( self.garage.clone(), - &req, + req, bucket_id, &key, None, @@ -301,7 +302,7 @@ impl WebServer { .body(empty_body::<Infallible>()) .unwrap(); - match handle_get( + match handle_get_without_ctx( self.garage.clone(), &req2, bucket_id, @@ -344,7 +345,7 @@ impl WebServer { } Ok(mut resp) => { // Maybe add CORS headers - if let Some(rule) = find_matching_cors_rule(&bucket, req)? { + if let Some(rule) = find_matching_cors_rule(&bucket_params, req)? { add_cors_headers(&mut resp, rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } |