aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/k2v-client/Cargo.toml3
-rw-r--r--src/k2v-client/bin/k2v-cli.rs203
-rw-r--r--src/k2v-client/lib.rs89
3 files changed, 241 insertions, 54 deletions
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index f57ce849..d95a1b3f 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.0.1"
+version = "0.1.1"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -15,6 +15,7 @@ log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
+hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12", "logging" ] }
serde = "1.0.137"
serde_json = "1.0.81"
thiserror = "1.0.31"
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index 925ebeb8..cdd63cce 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -1,3 +1,5 @@
+use std::collections::BTreeMap;
+use std::process::exit;
use std::time::Duration;
use k2v_client::*;
@@ -57,22 +59,39 @@ enum Command {
#[clap(flatten)]
output_kind: ReadOutputKind,
},
- /// Watch changes on a single value
- Poll {
- /// Partition key to delete from
+ /// Watch changes on a single value
+ PollItem {
+ /// Partition key of item to watch
partition_key: String,
- /// Sort key to delete from
+ /// Sort key of item to watch
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
/// Timeout, in seconds
- #[clap(short, long)]
+ #[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
+ /// Watch changes on a range of values
+ PollRange {
+ /// Partition key to poll from
+ partition_key: String,
+ /// Output only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ /// Marker of data that had previously been seen by a PollRange
+ #[clap(short = 'S', long)]
+ seen_marker: Option<String>,
+ /// Timeout, in seconds
+ #[clap(short = 'T', long)]
+ timeout: Option<u64>,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ },
/// Delete a single value
Delete {
/// Partition key to delete from
@@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
- use std::process::exit;
if self.json {
let stdout = std::io::stdout();
@@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool,
}
+impl BatchOutputKind {
+ fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
+ for (key, values) in values {
+ println!("key: {}", key);
+ let causality: String = values.causality.into();
+ println!("causality: {}", causality);
+ for value in values.value {
+ match value {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" value(utf-8): {}", string);
+ } else {
+ println!(" value(base64): {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ }
+ exit(0);
+ }
+
+ fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
+ values
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>()
+ }
+
+ fn display_poll_range_output(
+ &self,
+ seen_marker: String,
+ values: BTreeMap<String, CausalValue>,
+ ) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "values": self.values_json(values),
+ "seen_marker": seen_marker,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ println!("seen marker: {}", seen_marker);
+ self.display_human_output(values)
+ }
+ }
+
+ fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": self.values_json(res.items),
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+ self.display_human_output(res.items)
+ }
+ }
+}
+
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
- Command::Poll {
+ Command::PollItem {
partition_key,
sort_key,
causality,
@@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
if let Some(res) = res_opt {
output_kind.display_output(res);
} else {
- println!("Delay expired and value didn't change.");
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
+ }
+ Command::PollRange {
+ partition_key,
+ filter,
+ seen_marker,
+ timeout,
+ output_kind,
+ } => {
+ if filter.conflicts_only
+ || filter.tombstones
+ || filter.reverse
+ || filter.limit.is_some()
+ {
+ return Err(Error::Message(
+ "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
+ ));
+ }
+
+ let timeout = timeout.map(Duration::from_secs);
+ let res = client
+ .poll_range(
+ &partition_key,
+ Some(PollRangeFilter {
+ start: filter.start.as_deref(),
+ end: filter.end.as_deref(),
+ prefix: filter.prefix.as_deref(),
+ }),
+ seen_marker.as_deref(),
+ timeout,
+ )
+ .await?;
+ match res {
+ Some((items, seen_marker)) => {
+ output_kind.display_poll_range_output(seen_marker, items);
+ }
+ None => {
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
}
}
Command::ReadIndex {
@@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
- if output_kind.json {
- let values = res
- .items
- .into_iter()
- .map(|(k, v)| {
- let mut value = serde_json::to_value(v).unwrap();
- value
- .as_object_mut()
- .unwrap()
- .insert("sort_key".to_owned(), k.into());
- value
- })
- .collect::<Vec<_>>();
- let json = serde_json::json!({
- "next_key": res.next_start,
- "values": values,
- });
-
- let stdout = std::io::stdout();
- serde_json::to_writer_pretty(stdout, &json).unwrap();
- } else {
- if let Some(next) = res.next_start {
- println!("next key: {}", next);
- }
- for (key, values) in res.items {
- println!("key: {}", key);
- let causality: String = values.causality.into();
- println!("causality: {}", causality);
- for value in values.value {
- match value {
- K2vValue::Value(v) => {
- if let Ok(string) = std::str::from_utf8(&v) {
- println!(" value(utf-8): {}", string);
- } else {
- println!(" value(base64): {}", base64::encode(&v));
- }
- }
- K2vValue::Tombstone => {
- println!(" tombstone");
- }
- }
- }
- }
- }
+ output_kind.display_read_range_output(res);
}
Command::DeleteRange {
partition_key,
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index c2606af4..ca52d0cf 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
- let mut client = HttpClient::new()?;
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent {
client.local_agent_prepend(ua);
} else {
@@ -153,6 +159,58 @@ impl K2vClient {
}
}
+ /// Perform a PollRange request, waiting for any change in a given range of keys
+ /// to occur
+ pub async fn poll_range(
+ &self,
+ partition_key: &str,
+ filter: Option<PollRangeFilter<'_>>,
+ seen_marker: Option<&str>,
+ timeout: Option<Duration>,
+ ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
+ let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
+
+ let request = PollRangeRequest {
+ filter: filter.unwrap_or_default(),
+ seen_marker,
+ timeout: timeout.as_secs(),
+ };
+
+ let mut req = SignedRequest::new(
+ "POST",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("poll_range", "");
+
+ let payload = serde_json::to_vec(&request)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
+
+ if res.status == StatusCode::NOT_MODIFIED {
+ return Ok(None);
+ }
+
+ let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
+
+ let items = resp
+ .items
+ .into_iter()
+ .map(|BatchReadItem { sk, ct, v }| {
+ (
+ sk,
+ CausalValue {
+ causality: ct,
+ value: v,
+ },
+ )
+ })
+ .collect::<BTreeMap<_, _>>();
+
+ Ok(Some((items, resp.seen_marker)))
+ }
+
/// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item(
&self,
@@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
}
}
+impl AsRef<str> for CausalityToken {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
/// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue {
@@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool,
}
+#[derive(Debug, Default, Clone, Serialize)]
+pub struct PollRangeFilter<'a> {
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ pub prefix: Option<&'a str>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeRequest<'a> {
+ #[serde(flatten)]
+ filter: PollRangeFilter<'a>,
+ seen_marker: Option<&'a str>,
+ timeout: u64,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeResponse {
+ items: Vec<BatchReadItem>,
+ seen_marker: String,
+}
+
impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start {