From a48e2e0cb2bdc75e14dfde199dbca0a779b1316b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 10:30:59 +0100 Subject: K2V: Subscription to ranges of items --- src/api/k2v/item.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, -- cgit v1.2.3 From b83517d521b1bea7585ce45a803fad373f28225c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 15:22:25 +0100 Subject: Implement PollRange API endpoint --- src/api/k2v/api_server.rs | 3 ++ src/api/k2v/batch.rs | 79 +++++++++++++++++++++++++++++++++++++++++------ src/api/k2v/index.rs | 9 ++---- src/api/k2v/item.rs | 4 ++- src/api/k2v/router.rs | 8 ++++- 5 files changed, 86 insertions(+), 17 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 084867b5..bb85b2e7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer { Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::PollRange { partition_key } => { + handle_poll_range(garage, bucket_id, &partition_key, req).await + } Endpoint::Options => unreachable!(), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..be3fba07 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_table::{EnumerationOrder, TableSchema}; @@ -65,10 +64,7 @@ pub async fn handle_read_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( @@ -160,10 +156,7 @@ pub async fn handle_delete_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( @@ -257,6 +250,53 @@ async fn handle_delete_batch_query( }) } +pub(crate) async fn handle_poll_range( + garage: Arc, + bucket_id: Uuid, + partition_key: &str, + req: Request, +) -> Result, Error> { + use garage_model::k2v::sub::PollRange; + + let query = parse_json_body::(req).await?; + + let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + + let resp = garage + .k2v + .rpc + .poll_range( + PollRange { + partition: K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + start: query.start, + end: query.end, + prefix: query.prefix, + }, + query.seen_marker, + timeout_msec, + ) + .await?; + + if let Some((items, seen_marker)) = resp { + let resp = PollRangeResponse { + items: items + .into_iter() + .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .collect::>(), + seen_marker, + }; + + Ok(json_ok_response(&resp)?) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -361,3 +401,24 @@ struct DeleteBatchResponse { #[serde(rename = "deletedItems")] deleted_items: usize, } + +#[derive(Deserialize)] +struct PollRangeQuery { + #[serde(default)] + prefix: Option, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, + #[serde(default)] + timeout: Option, + #[serde(default, rename = "seenMarker")] + seen_marker: Option, +} + +#[derive(Serialize)] +struct PollRangeResponse { + items: Vec, + #[serde(rename = "seenMarker")] + seen_marker: String, +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 210950bf..6c1d4a91 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use hyper::{Body, Response, StatusCode}; +use hyper::{Body, Response}; use serde::Serialize; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_rpc::ring::Ring; use garage_table::util::*; @@ -12,6 +11,7 @@ use garage_table::util::*; use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use crate::helpers::*; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -68,10 +68,7 @@ pub async fn handle_read_index( next_start, }; - let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resp)?) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 9b78bc07..ebf34723 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -208,6 +208,8 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let item = garage .k2v .rpc @@ -216,7 +218,7 @@ pub async fn handle_poll_item( partition_key, sort_key, causal_context, - timeout_secs.unwrap_or(300) * 1000, + timeout_msec, ) .await?; diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index e7a3dd69..1cc58be5 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -32,6 +32,9 @@ pub enum Endpoint { causality_token: String, timeout: Option, }, + PollRange { + partition_key: String, + }, ReadBatch { }, ReadIndex { @@ -113,6 +116,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => ReadBatch, @@ -142,6 +146,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => InsertBatch, @@ -234,7 +239,8 @@ impl Endpoint { generateQueryParameters! { keywords: [ "delete" => DELETE, - "search" => SEARCH + "search" => SEARCH, + "poll_range" => POLL_RANGE ], fields: [ "prefix" => prefix, -- cgit v1.2.3 From bba13f40fc2e411347ea83960935b39cedb0a7c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 12:27:19 +0100 Subject: Correctly return bad requests when seeh marker is invalid --- src/api/k2v/batch.rs | 6 +----- src/api/k2v/item.rs | 10 ++++------ 2 files changed, 5 insertions(+), 11 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index be3fba07..844faf89 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -24,11 +24,7 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it - .ct - .map(|s| CausalContext::parse(&s)) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { Some(vs) => { DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index ebf34723..e7385bcc 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -133,9 +133,8 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -169,9 +168,8 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let value = DvvsValue::Deleted; -- cgit v1.2.3 From cbfae673e83251e988d764d3a29f06f571ba8452 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 15:03:08 +0100 Subject: PollRange & PollItem: min timeout = 1 sec --- src/api/k2v/batch.rs | 2 +- src/api/k2v/item.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/api/k2v') diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 844faf89..abc9403c 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -256,7 +256,7 @@ pub(crate) async fn handle_poll_range( let query = parse_json_body::(req).await?; - let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; let resp = garage .k2v diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index e7385bcc..787a7df3 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -206,7 +206,7 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; - let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000; let item = garage .k2v -- cgit v1.2.3