aboutsummaryrefslogtreecommitdiff
path: root/src/k2v-client/bin/k2v-cli.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-26 16:19:04 +0000
committerAlex <alex@adnab.me>2023-01-26 16:19:04 +0000
commit246f7468cd18c8ef4f3c0c4c209853cd2500cc76 (patch)
tree6e5f9ddd2159ba5396cc441a82b8240c4306323f /src/k2v-client/bin/k2v-cli.rs
parent611792ddcf86f0a728e22abaa6e172d3679d5ca6 (diff)
parent1dff62564fdda392a97986dca55232f30a1f4234 (diff)
downloadgarage-246f7468cd18c8ef4f3c0c4c209853cd2500cc76.tar.gz
garage-246f7468cd18c8ef4f3c0c4c209853cd2500cc76.zip
Merge pull request 'K2V PollRange, version 2' (#471) from k2v-watch-range-2 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/471
Diffstat (limited to 'src/k2v-client/bin/k2v-cli.rs')
-rw-r--r--src/k2v-client/bin/k2v-cli.rs203
1 files changed, 151 insertions, 52 deletions
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index 925ebeb8..cdd63cce 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -1,3 +1,5 @@
+use std::collections::BTreeMap;
+use std::process::exit;
use std::time::Duration;
use k2v_client::*;
@@ -57,22 +59,39 @@ enum Command {
#[clap(flatten)]
output_kind: ReadOutputKind,
},
- /// Watch changes on a single value
- Poll {
- /// Partition key to delete from
+ /// Watch changes on a single value
+ PollItem {
+ /// Partition key of item to watch
partition_key: String,
- /// Sort key to delete from
+ /// Sort key of item to watch
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
/// Timeout, in seconds
- #[clap(short, long)]
+ #[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
+ /// Watch changes on a range of values
+ PollRange {
+ /// Partition key to poll from
+ partition_key: String,
+ /// Output only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ /// Marker of data that had previously been seen by a PollRange
+ #[clap(short = 'S', long)]
+ seen_marker: Option<String>,
+ /// Timeout, in seconds
+ #[clap(short = 'T', long)]
+ timeout: Option<u64>,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ },
/// Delete a single value
Delete {
/// Partition key to delete from
@@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
- use std::process::exit;
if self.json {
let stdout = std::io::stdout();
@@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool,
}
+impl BatchOutputKind {
+ fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
+ for (key, values) in values {
+ println!("key: {}", key);
+ let causality: String = values.causality.into();
+ println!("causality: {}", causality);
+ for value in values.value {
+ match value {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" value(utf-8): {}", string);
+ } else {
+ println!(" value(base64): {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ }
+ exit(0);
+ }
+
+ fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
+ values
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>()
+ }
+
+ fn display_poll_range_output(
+ &self,
+ seen_marker: String,
+ values: BTreeMap<String, CausalValue>,
+ ) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "values": self.values_json(values),
+ "seen_marker": seen_marker,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ println!("seen marker: {}", seen_marker);
+ self.display_human_output(values)
+ }
+ }
+
+ fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": self.values_json(res.items),
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+ self.display_human_output(res.items)
+ }
+ }
+}
+
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
- Command::Poll {
+ Command::PollItem {
partition_key,
sort_key,
causality,
@@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
if let Some(res) = res_opt {
output_kind.display_output(res);
} else {
- println!("Delay expired and value didn't change.");
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
+ }
+ Command::PollRange {
+ partition_key,
+ filter,
+ seen_marker,
+ timeout,
+ output_kind,
+ } => {
+ if filter.conflicts_only
+ || filter.tombstones
+ || filter.reverse
+ || filter.limit.is_some()
+ {
+ return Err(Error::Message(
+ "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
+ ));
+ }
+
+ let timeout = timeout.map(Duration::from_secs);
+ let res = client
+ .poll_range(
+ &partition_key,
+ Some(PollRangeFilter {
+ start: filter.start.as_deref(),
+ end: filter.end.as_deref(),
+ prefix: filter.prefix.as_deref(),
+ }),
+ seen_marker.as_deref(),
+ timeout,
+ )
+ .await?;
+ match res {
+ Some((items, seen_marker)) => {
+ output_kind.display_poll_range_output(seen_marker, items);
+ }
+ None => {
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
}
}
Command::ReadIndex {
@@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
- if output_kind.json {
- let values = res
- .items
- .into_iter()
- .map(|(k, v)| {
- let mut value = serde_json::to_value(v).unwrap();
- value
- .as_object_mut()
- .unwrap()
- .insert("sort_key".to_owned(), k.into());
- value
- })
- .collect::<Vec<_>>();
- let json = serde_json::json!({
- "next_key": res.next_start,
- "values": values,
- });
-
- let stdout = std::io::stdout();
- serde_json::to_writer_pretty(stdout, &json).unwrap();
- } else {
- if let Some(next) = res.next_start {
- println!("next key: {}", next);
- }
- for (key, values) in res.items {
- println!("key: {}", key);
- let causality: String = values.causality.into();
- println!("causality: {}", causality);
- for value in values.value {
- match value {
- K2vValue::Value(v) => {
- if let Ok(string) = std::str::from_utf8(&v) {
- println!(" value(utf-8): {}", string);
- } else {
- println!(" value(base64): {}", base64::encode(&v));
- }
- }
- K2vValue::Tombstone => {
- println!(" tombstone");
- }
- }
- }
- }
- }
+ output_kind.display_read_range_output(res);
}
Command::DeleteRange {
partition_key,