diff options
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r-- | src/api/k2v/batch.rs | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index ae2778b1..02b7ae8b 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,14 +1,9 @@ -use std::sync::Arc; - use base64::prelude::*; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use garage_util::data::*; - use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -18,10 +13,12 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; let mut items2 = vec![]; @@ -38,7 +35,7 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -46,8 +43,7 @@ pub async fn handle_insert_batch( } pub async fn handle_read_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; @@ -55,7 +51,7 @@ pub async fn handle_read_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + .map(|q| handle_read_batch_query(&ctx, q)), ) .await; @@ -68,12 +64,15 @@ pub async fn handle_read_batch( } async fn handle_read_batch_query( - garage: &Arc<Garage>, - bucket_id: Uuid, + ctx: &ReqCtx, query: ReadBatchQuery, ) -> Result<ReadBatchResponse, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -138,8 +137,7 @@ async fn handle_read_batch_query( } pub async fn handle_delete_batch( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; @@ -147,7 +145,7 @@ pub async fn handle_delete_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + .map(|q| handle_delete_batch_query(&ctx, q)), ) .await; @@ -160,12 +158,15 @@ pub async fn handle_delete_batch( } async fn handle_delete_batch_query( - garage: &Arc<Garage>, - bucket_id: Uuid, + ctx: &ReqCtx, query: DeleteBatchQuery, ) -> Result<DeleteBatchResponse, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -195,7 +196,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( - bucket_id, + *bucket_id, i.partition.partition_key, i.sort_key, Some(cc), @@ -235,7 +236,7 @@ async fn handle_delete_batch_query( .collect::<Vec<_>>(); let n = items.len(); - garage.k2v.rpc.insert_batch(bucket_id, items).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items).await?; n }; @@ -251,11 +252,13 @@ async fn handle_delete_batch_query( } pub(crate) async fn handle_poll_range( - garage: Arc<Garage>, - bucket_id: Uuid, + ctx: ReqCtx, partition_key: &str, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; use garage_model::k2v::sub::PollRange; let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; |