aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v')
-rw-r--r--src/api/k2v/api_server.rs60
-rw-r--r--src/api/k2v/batch.rs51
-rw-r--r--src/api/k2v/index.rs12
-rw-r--r--src/api/k2v/item.rs38
4 files changed, 79 insertions, 82 deletions
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index e97da2af..658cfcc8 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -15,8 +15,7 @@ use garage_model::garage::Garage;
use crate::generic_server::*;
use crate::k2v::error::*;
-use crate::signature::payload::check_payload_signature;
-use crate::signature::streaming::*;
+use crate::signature::verify_request;
use crate::helpers::*;
use crate::k2v::batch::*;
@@ -86,17 +85,7 @@ impl ApiHandler for K2VApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
- let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
- let api_key = api_key
- .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
-
- let req = parse_streaming_body(
- &api_key,
- req,
- &mut content_sha256,
- &garage.config.s3_api.s3_region,
- "k2v",
- )?;
+ let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?;
let bucket_id = garage
.bucket_helper()
@@ -106,6 +95,7 @@ impl ApiHandler for K2VApiServer {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
+ let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -123,40 +113,42 @@ impl ApiHandler for K2VApiServer {
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
- .ok_or_internal_error("Error looking up CORS rule")?,
+ Method::GET | Method::HEAD | Method::POST => {
+ find_matching_cors_rule(&bucket_params, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?
+ .cloned()
+ }
_ => None,
};
+ let ctx = ReqCtx {
+ garage,
+ bucket_id,
+ bucket_name,
+ bucket_params,
+ api_key,
+ };
+
let resp = match endpoint {
Endpoint::DeleteItem {
partition_key,
sort_key,
- } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_delete_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::InsertItem {
partition_key,
sort_key,
- } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_insert_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::ReadItem {
partition_key,
sort_key,
- } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ } => handle_read_item(ctx, &req, &partition_key, &sort_key).await,
Endpoint::PollItem {
partition_key,
sort_key,
causality_token,
timeout,
} => {
- handle_poll_item(
- garage,
- &req,
- bucket_id,
- partition_key,
- sort_key,
- causality_token,
- timeout,
- )
- .await
+ handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await
}
Endpoint::ReadIndex {
prefix,
@@ -164,12 +156,12 @@ impl ApiHandler for K2VApiServer {
end,
limit,
reverse,
- } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
- Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
- Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
- Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ } => handle_read_index(ctx, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await,
Endpoint::PollRange { partition_key } => {
- handle_poll_range(garage, bucket_id, &partition_key, req).await
+ handle_poll_range(ctx, &partition_key, req).await
}
Endpoint::Options => unreachable!(),
};
@@ -178,7 +170,7 @@ impl ApiHandler for K2VApiServer {
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
- add_cors_headers(&mut resp_ok, rule)
+ add_cors_headers(&mut resp_ok, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index ae2778b1..02b7ae8b 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,14 +1,9 @@
-use std::sync::Arc;
-
use base64::prelude::*;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use garage_util::data::*;
-
use garage_table::{EnumerationOrder, TableSchema};
-use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@@ -18,10 +13,12 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
@@ -38,7 +35,7 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
- garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -46,8 +43,7 @@ pub async fn handle_insert_batch(
}
pub async fn handle_read_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
@@ -55,7 +51,7 @@ pub async fn handle_read_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_read_batch_query(&ctx, q)),
)
.await;
@@ -68,12 +64,15 @@ pub async fn handle_read_batch(
}
async fn handle_read_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -138,8 +137,7 @@ async fn handle_read_batch_query(
}
pub async fn handle_delete_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
@@ -147,7 +145,7 @@ pub async fn handle_delete_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_delete_batch_query(&ctx, q)),
)
.await;
@@ -160,12 +158,15 @@ pub async fn handle_delete_batch(
}
async fn handle_delete_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -195,7 +196,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
i.partition.partition_key,
i.sort_key,
Some(cc),
@@ -235,7 +236,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
- garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
n
};
@@ -251,11 +252,13 @@ async fn handle_delete_batch_query(
}
pub(crate) async fn handle_poll_range(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
partition_key: &str,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 291464ab..e3397238 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,13 +1,8 @@
-use std::sync::Arc;
-
use hyper::Response;
use serde::Serialize;
-use garage_util::data::*;
-
use garage_table::util::*;
-use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
@@ -16,14 +11,17 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
reverse: Option<bool>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let reverse = reverse.unwrap_or(false);
let node_id_vec = garage
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 0c5931a1..af3af4e4 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -1,13 +1,8 @@
-use std::sync::Arc;
-
use base64::prelude::*;
use http::header;
use hyper::{Request, Response, StatusCode};
-use garage_util::data::*;
-
-use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@@ -100,12 +95,15 @@ impl ReturnFormat {
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let format = ReturnFormat::from(req)?;
let item = garage
@@ -113,7 +111,7 @@ pub async fn handle_read_item(
.item_table
.get(
&K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: partition_key.to_string(),
},
sort_key,
@@ -125,12 +123,14 @@ pub async fn handle_read_item(
}
pub async fn handle_insert_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -149,7 +149,7 @@ pub async fn handle_insert_item(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@@ -163,12 +163,14 @@ pub async fn handle_insert_item(
}
pub async fn handle_delete_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -183,7 +185,7 @@ pub async fn handle_delete_item(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@@ -199,14 +201,16 @@ pub async fn handle_delete_item(
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: &Request<ReqBody>,
- bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let format = ReturnFormat::from(req)?;
let causal_context =
@@ -218,7 +222,7 @@ pub async fn handle_poll_item(
.k2v
.rpc
.poll_item(
- bucket_id,
+ *bucket_id,
partition_key,
sort_key,
causal_context,