aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/batch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r--src/api/k2v/batch.rs24
1 files changed, 13 insertions, 11 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 02b7ae8b..7a03d836 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize};
use garage_table::{EnumerationOrder, TableSchema};
-use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::helpers::*;
-use crate::k2v::api_server::{ReqBody, ResBody};
-use crate::k2v::error::*;
-use crate::k2v::range::read_range;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::item::parse_causality_token;
+use crate::range::read_range;
pub async fn handle_insert_batch(
ctx: ReqCtx,
@@ -19,11 +20,11 @@ pub async fn handle_insert_batch(
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
- let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
+ let items = req.into_body().json::<Vec<InsertBatchItem>>().await?;
let mut items2 = vec![];
for it in items {
- let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
+ let ct = it.ct.map(|s| parse_causality_token(&s)).transpose()?;
let v = match it.v {
Some(vs) => DvvsValue::Value(
BASE64_STANDARD
@@ -46,7 +47,7 @@ pub async fn handle_read_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
- let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
+ let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@@ -140,7 +141,7 @@ pub async fn handle_delete_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
- let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
+ let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@@ -261,7 +262,7 @@ pub(crate) async fn handle_poll_range(
} = ctx;
use garage_model::k2v::sub::PollRange;
- let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
+ let query = req.into_body().json::<PollRangeQuery>().await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
@@ -281,7 +282,8 @@ pub(crate) async fn handle_poll_range(
query.seen_marker,
timeout_msec,
)
- .await?;
+ .await
+ .map_err(pass_helper_error)?;
if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse {