From e9f759d4cb9be28584ab511a0a2dc78b579475c8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 28 Feb 2024 00:27:54 +0100 Subject: [fix-presigned] presigned requests: allow x-amz-* query parameters to stand in for equivalent headers --- src/api/k2v/api_server.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index e97da2af..5ed7e286 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -69,7 +69,7 @@ impl ApiHandler for K2VApiServer { async fn handle( &self, - req: Request, + mut req: Request, endpoint: K2VApiEndpoint, ) -> Result, Error> { let K2VApiEndpoint { @@ -86,7 +86,8 @@ impl ApiHandler for K2VApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; + let (api_key, mut content_sha256) = + check_payload_signature(&garage, "k2v", &mut req).await?; let api_key = api_key .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; -- cgit v1.2.3 From 90cab5b8f2b5212668975bf445a1e86f638fe1c7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 28 Feb 2024 10:51:08 +0100 Subject: [fix-presigned] add comments and reorganize --- src/api/k2v/api_server.rs | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 5ed7e286..fdb5db4c 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -15,8 +15,7 @@ use garage_model::garage::Garage; use crate::generic_server::*; use crate::k2v::error::*; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::*; +use crate::signature::verify_request; use crate::helpers::*; use crate::k2v::batch::*; @@ -69,7 +68,7 @@ impl ApiHandler for K2VApiServer { async fn handle( &self, - mut req: Request, + req: Request, endpoint: K2VApiEndpoint, ) -> Result, Error> { let K2VApiEndpoint { @@ -86,18 +85,7 @@ impl ApiHandler for K2VApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (api_key, mut content_sha256) = - check_payload_signature(&garage, "k2v", &mut req).await?; - let api_key = api_key - .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = parse_streaming_body( - &api_key, - req, - &mut content_sha256, - &garage.config.s3_api.s3_region, - "k2v", - )?; + let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?; let bucket_id = garage .bucket_helper() -- cgit v1.2.3 From fb55682c66092921f766f82c16eb9e046f1bbb41 Mon Sep 17 00:00:00 2001 From: Yureka Date: Sun, 3 Mar 2024 14:56:52 +0100 Subject: add request context helper --- src/api/k2v/api_server.rs | 45 ++++++++++++++++++++++------------------- src/api/k2v/batch.rs | 51 +++++++++++++++++++++++++---------------------- src/api/k2v/index.rs | 10 +++++----- src/api/k2v/item.rs | 38 +++++++++++++++++++---------------- 4 files changed, 77 insertions(+), 67 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index fdb5db4c..658cfcc8 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -95,6 +95,7 @@ impl ApiHandler for K2VApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -112,40 +113,42 @@ impl ApiHandler for K2VApiServer { // are always preflighted, i.e. the browser should make // an OPTIONS call before to check it is allowed let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) - .ok_or_internal_error("Error looking up CORS rule")?, + Method::GET | Method::HEAD | Method::POST => { + find_matching_cors_rule(&bucket_params, &req) + .ok_or_internal_error("Error looking up CORS rule")? + .cloned() + } _ => None, }; + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + let resp = match endpoint { Endpoint::DeleteItem { partition_key, sort_key, - } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_delete_item(ctx, req, &partition_key, &sort_key).await, Endpoint::InsertItem { partition_key, sort_key, - } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_insert_item(ctx, req, &partition_key, &sort_key).await, Endpoint::ReadItem { partition_key, sort_key, - } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + } => handle_read_item(ctx, &req, &partition_key, &sort_key).await, Endpoint::PollItem { partition_key, sort_key, causality_token, timeout, } => { - handle_poll_item( - garage, - &req, - bucket_id, - partition_key, - sort_key, - causality_token, - timeout, - ) - .await + handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await } Endpoint::ReadIndex { prefix, @@ -153,12 +156,12 @@ impl ApiHandler for K2VApiServer { end, limit, reverse, - } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, - Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, - Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, - Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + } => handle_read_index(ctx, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await, + Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await, Endpoint::PollRange { partition_key } => { - handle_poll_range(garage, bucket_id, &partition_key, req).await + handle_poll_range(ctx, &partition_key, req).await } Endpoint::Options => unreachable!(), }; @@ -167,7 +170,7 @@ impl ApiHandler for K2VApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index ae2778b1..02b7ae8b 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,14 +1,9 @@ -use std::sync::Arc; - use base64::prelude::*; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use garage_util::data::*; - use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -18,10 +13,12 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let items = parse_json_body::, _, Error>(req).await?; let mut items2 = vec![]; @@ -38,7 +35,7 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -46,8 +43,7 @@ pub async fn handle_insert_batch( } pub async fn handle_read_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; @@ -55,7 +51,7 @@ pub async fn handle_read_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + .map(|q| handle_read_batch_query(&ctx, q)), ) .await; @@ -68,12 +64,15 @@ pub async fn handle_read_batch( } async fn handle_read_batch_query( - garage: &Arc, - bucket_id: Uuid, + ctx: &ReqCtx, query: ReadBatchQuery, ) -> Result { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -138,8 +137,7 @@ async fn handle_read_batch_query( } pub async fn handle_delete_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; @@ -147,7 +145,7 @@ pub async fn handle_delete_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + .map(|q| handle_delete_batch_query(&ctx, q)), ) .await; @@ -160,12 +158,15 @@ pub async fn handle_delete_batch( } async fn handle_delete_batch_query( - garage: &Arc, - bucket_id: Uuid, + ctx: &ReqCtx, query: DeleteBatchQuery, ) -> Result { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -195,7 +196,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( - bucket_id, + *bucket_id, i.partition.partition_key, i.sort_key, Some(cc), @@ -235,7 +236,7 @@ async fn handle_delete_batch_query( .collect::>(); let n = items.len(); - garage.k2v.rpc.insert_batch(bucket_id, items).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items).await?; n }; @@ -251,11 +252,13 @@ async fn handle_delete_batch_query( } pub(crate) async fn handle_poll_range( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, partition_key: &str, req: Request, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; use garage_model::k2v::sub::PollRange; let query = parse_json_body::(req).await?; diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 1baec1db..822bec44 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -3,12 +3,9 @@ use std::sync::Arc; use hyper::Response; use serde::Serialize; -use garage_util::data::*; - use garage_rpc::ring::Ring; use garage_table::util::*; -use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; @@ -17,14 +14,17 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_read_index( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, prefix: Option, start: Option, end: Option, limit: Option, reverse: Option, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let reverse = reverse.unwrap_or(false); let ring: Arc = garage.system.ring.borrow().clone(); diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 0c5931a1..af3af4e4 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,13 +1,8 @@ -use std::sync::Arc; - use base64::prelude::*; use http::header; use hyper::{Request, Response, StatusCode}; -use garage_util::data::*; - -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -100,12 +95,15 @@ impl ReturnFormat { /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_read_item( - garage: Arc, + ctx: ReqCtx, req: &Request, - bucket_id: Uuid, partition_key: &str, sort_key: &String, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let format = ReturnFormat::from(req)?; let item = garage @@ -113,7 +111,7 @@ pub async fn handle_read_item( .item_table .get( &K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: partition_key.to_string(), }, sort_key, @@ -125,12 +123,14 @@ pub async fn handle_read_item( } pub async fn handle_insert_item( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -149,7 +149,7 @@ pub async fn handle_insert_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -163,12 +163,14 @@ pub async fn handle_insert_item( } pub async fn handle_delete_item( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -183,7 +185,7 @@ pub async fn handle_delete_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -199,14 +201,16 @@ pub async fn handle_delete_item( /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( - garage: Arc, + ctx: ReqCtx, req: &Request, - bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let format = ReturnFormat::from(req)?; let causal_context = @@ -218,7 +222,7 @@ pub async fn handle_poll_item( .k2v .rpc .poll_item( - bucket_id, + *bucket_id, partition_key, sort_key, causal_context, -- cgit v1.2.3