aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/batch.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-10 15:22:25 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-10 15:22:25 +0100
commitb83517d521b1bea7585ce45a803fad373f28225c (patch)
tree5562397a39647be8d61a9c870103d15c604e09ad /src/api/k2v/batch.rs
parent57eabe787948ad6c23eae7761d02545c675bf7ff (diff)
downloadgarage-b83517d521b1bea7585ce45a803fad373f28225c.tar.gz
garage-b83517d521b1bea7585ce45a803fad373f28225c.zip
Implement PollRange API endpoint
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r--src/api/k2v/batch.rs79
1 files changed, 70 insertions, 9 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 78035362..be3fba07 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@@ -65,10 +64,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@@ -160,10 +156,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@@ -257,6 +250,53 @@ async fn handle_delete_batch_query(
})
}
+pub(crate) async fn handle_poll_range(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ use garage_model::k2v::sub::PollRange;
+
+ let query = parse_json_body::<PollRangeQuery>(req).await?;
+
+ let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000;
+
+ let resp = garage
+ .k2v
+ .rpc
+ .poll_range(
+ PollRange {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ start: query.start,
+ end: query.end,
+ prefix: query.prefix,
+ },
+ query.seen_marker,
+ timeout_msec,
+ )
+ .await?;
+
+ if let Some((items, seen_marker)) = resp {
+ let resp = PollRangeResponse {
+ items: items
+ .into_iter()
+ .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .collect::<Vec<_>>(),
+ seen_marker,
+ };
+
+ Ok(json_ok_response(&resp)?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
+
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@@ -361,3 +401,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
+
+#[derive(Deserialize)]
+struct PollRangeQuery {
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ timeout: Option<u64>,
+ #[serde(default, rename = "seenMarker")]
+ seen_marker: Option<String>,
+}
+
+#[derive(Serialize)]
+struct PollRangeResponse {
+ items: Vec<ReadBatchResponseItem>,
+ #[serde(rename = "seenMarker")]
+ seen_marker: String,
+}