diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-10 15:22:25 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-10 15:22:25 +0100 |
commit | b83517d521b1bea7585ce45a803fad373f28225c (patch) | |
tree | 5562397a39647be8d61a9c870103d15c604e09ad /src/api/k2v/batch.rs | |
parent | 57eabe787948ad6c23eae7761d02545c675bf7ff (diff) | |
download | garage-b83517d521b1bea7585ce45a803fad373f28225c.tar.gz garage-b83517d521b1bea7585ce45a803fad373f28225c.zip |
Implement PollRange API endpoint
Diffstat (limited to 'src/api/k2v/batch.rs')
-rw-r--r-- | src/api/k2v/batch.rs | 79 |
1 files changed, 70 insertions, 9 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..be3fba07 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_table::{EnumerationOrder, TableSchema}; @@ -65,10 +64,7 @@ pub async fn handle_read_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( @@ -160,10 +156,7 @@ pub async fn handle_delete_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( @@ -257,6 +250,53 @@ async fn handle_delete_batch_query( }) } +pub(crate) async fn handle_poll_range( + garage: Arc<Garage>, + bucket_id: Uuid, + partition_key: &str, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + use garage_model::k2v::sub::PollRange; + + let query = parse_json_body::<PollRangeQuery>(req).await?; + + let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + + let resp = garage + .k2v + .rpc + .poll_range( + PollRange { + partition: K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + start: query.start, + end: query.end, + prefix: query.prefix, + }, + query.seen_marker, + timeout_msec, + ) + .await?; + + if let Some((items, seen_marker)) = resp { + let resp = PollRangeResponse { + items: items + .into_iter() + .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .collect::<Vec<_>>(), + seen_marker, + }; + + Ok(json_ok_response(&resp)?) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -361,3 +401,24 @@ struct DeleteBatchResponse { #[serde(rename = "deletedItems")] deleted_items: usize, } + +#[derive(Deserialize)] +struct PollRangeQuery { + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default)] + timeout: Option<u64>, + #[serde(default, rename = "seenMarker")] + seen_marker: Option<String>, +} + +#[derive(Serialize)] +struct PollRangeResponse { + items: Vec<ReadBatchResponseItem>, + #[serde(rename = "seenMarker")] + seen_marker: String, +} |