aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/batch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r--src/api/k2v/batch.rs96
1 files changed, 78 insertions, 18 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 78035362..26d678da 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,10 +1,10 @@
use std::sync::Arc;
+use base64::prelude::*;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@@ -25,15 +25,13 @@ pub async fn handle_insert_batch(
let mut items2 = vec![];
for it in items {
- let ct = it
- .ct
- .map(|s| CausalContext::parse(&s))
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
let v = match it.v {
- Some(vs) => {
- DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
- }
+ Some(vs) => DvvsValue::Value(
+ BASE64_STANDARD
+ .decode(vs)
+ .ok_or_bad_request("Invalid base64 value")?,
+ ),
None => DvvsValue::Deleted,
};
items2.push((it.pk, it.sk, ct, v));
@@ -65,10 +63,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@@ -160,10 +155,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@@ -257,6 +249,53 @@ async fn handle_delete_batch_query(
})
}
+pub(crate) async fn handle_poll_range(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ use garage_model::k2v::sub::PollRange;
+
+ let query = parse_json_body::<PollRangeQuery>(req).await?;
+
+ let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
+
+ let resp = garage
+ .k2v
+ .rpc
+ .poll_range(
+ PollRange {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ start: query.start,
+ end: query.end,
+ prefix: query.prefix,
+ },
+ query.seen_marker,
+ timeout_msec,
+ )
+ .await?;
+
+ if let Some((items, seen_marker)) = resp {
+ let resp = PollRangeResponse {
+ items: items
+ .into_iter()
+ .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .collect::<Vec<_>>(),
+ seen_marker,
+ };
+
+ Ok(json_ok_response(&resp)?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
+
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@@ -322,7 +361,7 @@ impl ReadBatchResponseItem {
.values()
.iter()
.map(|v| match v {
- DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Value(x) => Some(BASE64_STANDARD.encode(x)),
DvvsValue::Deleted => None,
})
.collect::<Vec<_>>();
@@ -361,3 +400,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
+
+#[derive(Deserialize)]
+struct PollRangeQuery {
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ timeout: Option<u64>,
+ #[serde(default, rename = "seenMarker")]
+ seen_marker: Option<String>,
+}
+
+#[derive(Serialize)]
+struct PollRangeResponse {
+ items: Vec<ReadBatchResponseItem>,
+ #[serde(rename = "seenMarker")]
+ seen_marker: String,
+}