aboutsummaryrefslogtreecommitdiff
path: root/src/k2v-client/lib.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
committerAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
commitfa78d806e3ae40031e80eebb86e4eb1756d7baea (patch)
tree144662fb430c484093f6f9a585a2441c2ff26494 /src/k2v-client/lib.rs
parent654999e254e6c1f46bb5d668bc1230f226575716 (diff)
parenta16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff)
downloadgarage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz
garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip
Merge branch 'main' into next
Diffstat (limited to 'src/k2v-client/lib.rs')
-rw-r--r--src/k2v-client/lib.rs89
1 files changed, 88 insertions, 1 deletions
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index c2606af4..ca52d0cf 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
- let mut client = HttpClient::new()?;
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent {
client.local_agent_prepend(ua);
} else {
@@ -153,6 +159,58 @@ impl K2vClient {
}
}
+ /// Perform a PollRange request, waiting for any change in a given range of keys
+ /// to occur
+ pub async fn poll_range(
+ &self,
+ partition_key: &str,
+ filter: Option<PollRangeFilter<'_>>,
+ seen_marker: Option<&str>,
+ timeout: Option<Duration>,
+ ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
+ let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
+
+ let request = PollRangeRequest {
+ filter: filter.unwrap_or_default(),
+ seen_marker,
+ timeout: timeout.as_secs(),
+ };
+
+ let mut req = SignedRequest::new(
+ "POST",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("poll_range", "");
+
+ let payload = serde_json::to_vec(&request)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
+
+ if res.status == StatusCode::NOT_MODIFIED {
+ return Ok(None);
+ }
+
+ let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
+
+ let items = resp
+ .items
+ .into_iter()
+ .map(|BatchReadItem { sk, ct, v }| {
+ (
+ sk,
+ CausalValue {
+ causality: ct,
+ value: v,
+ },
+ )
+ })
+ .collect::<BTreeMap<_, _>>();
+
+ Ok(Some((items, resp.seen_marker)))
+ }
+
/// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item(
&self,
@@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
}
}
+impl AsRef<str> for CausalityToken {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
/// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue {
@@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool,
}
+#[derive(Debug, Default, Clone, Serialize)]
+pub struct PollRangeFilter<'a> {
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ pub prefix: Option<&'a str>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeRequest<'a> {
+ #[serde(flatten)]
+ filter: PollRangeFilter<'a>,
+ seen_marker: Option<&'a str>,
+ timeout: u64,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeResponse {
+ items: Vec<BatchReadItem>,
+ seen_marker: String,
+}
+
impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start {