diff options
author | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
commit | fa78d806e3ae40031e80eebb86e4eb1756d7baea (patch) | |
tree | 144662fb430c484093f6f9a585a2441c2ff26494 /src/k2v-client/lib.rs | |
parent | 654999e254e6c1f46bb5d668bc1230f226575716 (diff) | |
parent | a16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff) | |
download | garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/k2v-client/lib.rs')
-rw-r--r-- | src/k2v-client/lib.rs | 89 |
1 files changed, 88 insertions, 1 deletions
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index c2606af4..ca52d0cf 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -40,7 +40,13 @@ impl K2vClient { creds: AwsCredentials, user_agent: Option<String>, ) -> Result<Self, Error> { - let mut client = HttpClient::new()?; + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let mut client = HttpClient::from_connector(connector); if let Some(ua) = user_agent { client.local_agent_prepend(ua); } else { @@ -153,6 +159,58 @@ impl K2vClient { } } + /// Perform a PollRange request, waiting for any change in a given range of keys + /// to occur + pub async fn poll_range( + &self, + partition_key: &str, + filter: Option<PollRangeFilter<'_>>, + seen_marker: Option<&str>, + timeout: Option<Duration>, + ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let request = PollRangeRequest { + filter: filter.unwrap_or_default(), + seen_marker, + timeout: timeout.as_secs(), + }; + + let mut req = SignedRequest::new( + "POST", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("poll_range", ""); + + let payload = serde_json::to_vec(&request)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + let resp: PollRangeResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect::<BTreeMap<_, _>>(); + + Ok(Some((items, resp.seen_marker))) + } + /// Perform an InsertItem request, inserting a value for a single pk+sk. pub async fn insert_item( &self, @@ -389,6 +447,12 @@ impl From<CausalityToken> for String { } } +impl AsRef<str> for CausalityToken { + fn as_ref(&self) -> &str { + &self.0 + } +} + /// A value in K2V. can be either a binary value, or a tombstone. #[derive(Debug, Clone, PartialEq, Eq)] pub enum K2vValue { @@ -466,6 +530,29 @@ pub struct Filter<'a> { pub reverse: bool, } +#[derive(Debug, Default, Clone, Serialize)] +pub struct PollRangeFilter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeRequest<'a> { + #[serde(flatten)] + filter: PollRangeFilter<'a>, + seen_marker: Option<&'a str>, + timeout: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeResponse { + items: Vec<BatchReadItem>, + seen_marker: String, +} + impl<'a> Filter<'a> { fn insert_params(&self, req: &mut SignedRequest) { if let Some(start) = &self.start { |