aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/batch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r--src/api/k2v/batch.rs51
1 files changed, 27 insertions, 24 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index ae2778b1..02b7ae8b 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,14 +1,9 @@
-use std::sync::Arc;
-
use base64::prelude::*;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use garage_util::data::*;
-
use garage_table::{EnumerationOrder, TableSchema};
-use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@@ -18,10 +13,12 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
@@ -38,7 +35,7 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
- garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@@ -46,8 +43,7 @@ pub async fn handle_insert_batch(
}
pub async fn handle_read_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
@@ -55,7 +51,7 @@ pub async fn handle_read_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_read_batch_query(&ctx, q)),
)
.await;
@@ -68,12 +64,15 @@ pub async fn handle_read_batch(
}
async fn handle_read_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -138,8 +137,7 @@ async fn handle_read_batch_query(
}
pub async fn handle_delete_batch(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
@@ -147,7 +145,7 @@ pub async fn handle_delete_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
- .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ .map(|q| handle_delete_batch_query(&ctx, q)),
)
.await;
@@ -160,12 +158,15 @@ pub async fn handle_delete_batch(
}
async fn handle_delete_batch_query(
- garage: &Arc<Garage>,
- bucket_id: Uuid,
+ ctx: &ReqCtx,
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = &ctx;
+
let partition = K2VItemPartition {
- bucket_id,
+ bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@@ -195,7 +196,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
- bucket_id,
+ *bucket_id,
i.partition.partition_key,
i.sort_key,
Some(cc),
@@ -235,7 +236,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
- garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+ garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
n
};
@@ -251,11 +252,13 @@ async fn handle_delete_batch_query(
}
pub(crate) async fn handle_poll_range(
- garage: Arc<Garage>,
- bucket_id: Uuid,
+ ctx: ReqCtx,
partition_key: &str,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;