aboutsummaryrefslogtreecommitdiff
path: root/src/k2v-client
diff options
context:
space:
mode:
authortrinity-1686a <trinity.pointard@gmail.com>2022-05-18 22:24:09 +0200
committerAlex <alex@adnab.me>2022-05-18 22:24:09 +0200
commit64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e (patch)
tree40f2b00ecaa0f1b14a1e08fbc7883c5305fe9552 /src/k2v-client
parentc692f55d5ce2c3ed08db7fbc4844debcc0aeb134 (diff)
downloadgarage-64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e.tar.gz
garage-64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e.zip
Add a K2V client library and CLI (#303)
lib.rs could use getting split in modules, but I'm not sure how exactly Co-authored-by: trinity-1686a <trinity@deuxfleurs.fr> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/303 Co-authored-by: trinity-1686a <trinity.pointard@gmail.com> Co-committed-by: trinity-1686a <trinity.pointard@gmail.com>
Diffstat (limited to 'src/k2v-client')
-rw-r--r--src/k2v-client/Cargo.toml27
-rw-r--r--src/k2v-client/README.md25
-rw-r--r--src/k2v-client/src/bin/k2v-cli.rs466
-rw-r--r--src/k2v-client/src/error.rs22
-rw-r--r--src/k2v-client/src/lib.rs566
5 files changed, 1106 insertions, 0 deletions
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
new file mode 100644
index 00000000..84c6b8b2
--- /dev/null
+++ b/src/k2v-client/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "k2v-client"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+base64 = "0.13.0"
+http = "0.2.6"
+rusoto_core = "0.48.0"
+rusoto_credential = "0.48.0"
+rusoto_signature = "0.48.0"
+serde = "1.0.137"
+serde_json = "1.0.81"
+thiserror = "1.0.31"
+tokio = "1.17.0"
+
+# cli deps
+clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
+garage_util = { path = "../util", optional = true }
+
+
+[features]
+cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"]
+
+[[bin]]
+name = "k2v-cli"
+required-features = ["cli"]
diff --git a/src/k2v-client/README.md b/src/k2v-client/README.md
new file mode 100644
index 00000000..db454805
--- /dev/null
+++ b/src/k2v-client/README.md
@@ -0,0 +1,25 @@
+Example usage:
+```sh
+# all these values can be provided on the cli instead
+export AWS_ACCESS_KEY_ID=GK123456
+export AWS_SECRET_ACCESS_KEY=0123..789
+export AWS_REGION=garage
+export K2V_ENDPOINT=http://172.30.2.1:3903
+export K2V_BUCKET=my-bucket
+
+cargo run --features=cli -- read-range my-partition-key --all
+
+cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string1"
+cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string2"
+cargo run --features=cli -- insert my-partition-key my-sort-key2 --text "my string"
+
+cargo run --features=cli -- read-range my-partition-key --all
+
+causality=$(cargo run --features=cli -- read my-partition-key my-sort-key2 -b | head -n1)
+cargo run --features=cli -- delete my-partition-key my-sort-key2 -c $causality
+
+causality=$(cargo run --features=cli -- read my-partition-key my-sort-key -b | head -n1)
+cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string3" -c $causality
+
+cargo run --features=cli -- read-range my-partition-key --all
+```
diff --git a/src/k2v-client/src/bin/k2v-cli.rs b/src/k2v-client/src/bin/k2v-cli.rs
new file mode 100644
index 00000000..38c39361
--- /dev/null
+++ b/src/k2v-client/src/bin/k2v-cli.rs
@@ -0,0 +1,466 @@
+use k2v_client::*;
+
+use garage_util::formater::format_table;
+
+use rusoto_core::credential::AwsCredentials;
+use rusoto_core::Region;
+
+use clap::{Parser, Subcommand};
+
+/// K2V command line interface
+#[derive(Parser, Debug)]
+#[clap(author, version, about, long_about = None)]
+struct Args {
+ /// Name of the region to use
+ #[clap(short, long, env = "AWS_REGION", default_value = "garage")]
+ region: String,
+ /// Url of the endpoint to connect to
+ #[clap(short, long, env = "K2V_ENDPOINT")]
+ endpoint: String,
+ /// Access key ID
+ #[clap(short, long, env = "AWS_ACCESS_KEY_ID")]
+ key_id: String,
+ /// Access key ID
+ #[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")]
+ secret: String,
+ /// Bucket name
+ #[clap(short, long, env = "K2V_BUCKET")]
+ bucket: String,
+ #[clap(subcommand)]
+ command: Command,
+}
+
+#[derive(Subcommand, Debug)]
+enum Command {
+ /// Insert a single value
+ Insert {
+ /// Partition key to insert to
+ partition_key: String,
+ /// Sort key to insert to
+ sort_key: String,
+ /// Causality of the insertion
+ #[clap(short, long)]
+ causality: Option<String>,
+ /// Value to insert
+ #[clap(flatten)]
+ value: Value,
+ },
+ /// Read a single value
+ Read {
+ /// Partition key to read from
+ partition_key: String,
+ /// Sort key to read from
+ sort_key: String,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: ReadOutputKind,
+ },
+ /// Delete a single value
+ Delete {
+ /// Partition key to delete from
+ partition_key: String,
+ /// Sort key to delete from
+ sort_key: String,
+ /// Causality information
+ #[clap(short, long)]
+ causality: String,
+ },
+ /// List partition keys
+ ReadIndex {
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ /// Output only partition keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ },
+ /// Read a range of sort keys
+ ReadRange {
+ /// Partition key to read from
+ partition_key: String,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ /// Output only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ },
+ /// Delete a range of sort keys
+ DeleteRange {
+ /// Partition key to delete from
+ partition_key: String,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ /// Delete only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ },
+}
+
+/// Where to read a value from
+#[derive(Parser, Debug)]
+#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))]
+struct Value {
+ /// Read value from a file. use - to read from stdin
+ #[clap(short, long, group = "value")]
+ file: Option<String>,
+ /// Read a base64 value from commandline
+ #[clap(short, long, group = "value")]
+ b64: Option<String>,
+ /// Read a raw (UTF-8) value from the commandline
+ #[clap(short, long, group = "value")]
+ text: Option<String>,
+}
+
+impl Value {
+ async fn to_data(&self) -> Result<Vec<u8>, Error> {
+ if let Some(ref text) = self.text {
+ Ok(text.as_bytes().to_vec())
+ } else if let Some(ref b64) = self.b64 {
+ base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into()))
+ } else if let Some(ref path) = self.file {
+ use tokio::io::AsyncReadExt;
+ if path == "-" {
+ let mut file = tokio::io::stdin();
+ let mut vec = Vec::new();
+ file.read_to_end(&mut vec).await?;
+ Ok(vec)
+ } else {
+ let mut file = tokio::fs::File::open(path).await?;
+ let mut vec = Vec::new();
+ file.read_to_end(&mut vec).await?;
+ Ok(vec)
+ }
+ } else {
+ unreachable!("Value must have one option set")
+ }
+ }
+}
+
+#[derive(Parser, Debug)]
+#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
+struct ReadOutputKind {
+ /// Base64 output. Conflicts are line separated, first line is causality token
+ #[clap(short, long, group = "output-kind")]
+ b64: bool,
+ /// Raw output. Conflicts generate error, causality token is not returned
+ #[clap(short, long, group = "output-kind")]
+ raw: bool,
+ /// Human formated output
+ #[clap(short = 'H', long, group = "output-kind")]
+ human: bool,
+ /// JSON formated output
+ #[clap(short, long, group = "output-kind")]
+ json: bool,
+}
+
+impl ReadOutputKind {
+ fn display_output(&self, val: CausalValue) -> ! {
+ use std::io::Write;
+ use std::process::exit;
+
+ if self.json {
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &val).unwrap();
+ exit(0);
+ }
+
+ if self.raw {
+ let mut val = val.value;
+ if val.len() != 1 {
+ eprintln!(
+ "Raw mode can only read non-concurent values, found {} values, expected 1",
+ val.len()
+ );
+ exit(1);
+ }
+ let val = val.pop().unwrap();
+ match val {
+ K2vValue::Value(v) => {
+ std::io::stdout().write_all(&v).unwrap();
+ exit(0);
+ }
+ K2vValue::Tombstone => {
+ eprintln!("Expected value, found tombstone");
+ exit(2);
+ }
+ }
+ }
+
+ let causality: String = val.causality.into();
+ if self.b64 {
+ println!("{}", causality);
+ for val in val.value {
+ match val {
+ K2vValue::Value(v) => {
+ println!("{}", base64::encode(&v))
+ }
+ K2vValue::Tombstone => {
+ println!();
+ }
+ }
+ }
+ exit(0);
+ }
+
+ // human
+ println!("causality: {}", causality);
+ println!("values:");
+ for val in val.value {
+ match val {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" utf-8: {}", string);
+ } else {
+ println!(" base64: {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ exit(0);
+ }
+}
+
+#[derive(Parser, Debug)]
+#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
+struct BatchOutputKind {
+ /// Human formated output
+ #[clap(short = 'H', long, group = "output-kind")]
+ human: bool,
+ /// JSON formated output
+ #[clap(short, long, group = "output-kind")]
+ json: bool,
+}
+
+/// Filter for batch operations
+#[derive(Parser, Debug)]
+#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
+struct Filter {
+ /// Match only keys starting with this prefix
+ #[clap(short, long, group = "filter")]
+ prefix: Option<String>,
+ /// Match only keys lexicographically after this key (including this key itself)
+ #[clap(short, long, group = "filter")]
+ start: Option<String>,
+ /// Match only keys lexicographically before this key (excluding this key)
+ #[clap(short, long, group = "filter")]
+ end: Option<String>,
+ /// Only match the first X keys
+ #[clap(short, long)]
+ limit: Option<u64>,
+ /// Return keys in reverse order
+ #[clap(short, long)]
+ reverse: bool,
+ /// Return only keys where conflict happened
+ #[clap(short, long)]
+ conflicts_only: bool,
+ /// Also include keys storing only tombstones
+ #[clap(short, long)]
+ tombstones: bool,
+ /// Return any key
+ #[clap(short, long, group = "filter")]
+ all: bool,
+}
+
+impl Filter {
+ fn k2v_filter(&self) -> k2v_client::Filter<'_> {
+ k2v_client::Filter {
+ start: self.start.as_deref(),
+ end: self.end.as_deref(),
+ prefix: self.prefix.as_deref(),
+ limit: self.limit,
+ reverse: self.reverse,
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+ let args = Args::parse();
+
+ let region = Region::Custom {
+ name: args.region,
+ endpoint: args.endpoint,
+ };
+
+ let creds = AwsCredentials::new(args.key_id, args.secret, None, None);
+
+ let client = K2vClient::new(region, args.bucket, creds, None)?;
+
+ match args.command {
+ Command::Insert {
+ partition_key,
+ sort_key,
+ causality,
+ value,
+ } => {
+ client
+ .insert_item(
+ &partition_key,
+ &sort_key,
+ value.to_data().await?,
+ causality.map(Into::into),
+ )
+ .await?;
+ }
+ Command::Delete {
+ partition_key,
+ sort_key,
+ causality,
+ } => {
+ client
+ .delete_item(&partition_key, &sort_key, causality.into())
+ .await?;
+ }
+ Command::Read {
+ partition_key,
+ sort_key,
+ output_kind,
+ } => {
+ let res = client.read_item(&partition_key, &sort_key).await?;
+ output_kind.display_output(res);
+ }
+ Command::ReadIndex {
+ output_kind,
+ filter,
+ } => {
+ if filter.conflicts_only || filter.tombstones {
+ return Err(Error::Message(
+ "conlicts-only and tombstones are invalid for read-index".into(),
+ ));
+ }
+ let res = client.read_index(filter.k2v_filter()).await?;
+ if output_kind.json {
+ let values = res
+ .items
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>();
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": values,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+
+ let mut to_print = Vec::new();
+ to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes"));
+ for (k, v) in res.items {
+ to_print.push(format!(
+ "{}\t{}\t{}\t{}\t{}",
+ k, v.entries, v.conflicts, v.values, v.bytes
+ ));
+ }
+ format_table(to_print);
+ }
+ }
+ Command::ReadRange {
+ partition_key,
+ output_kind,
+ filter,
+ } => {
+ let op = BatchReadOp {
+ partition_key: &partition_key,
+ filter: filter.k2v_filter(),
+ conflicts_only: filter.conflicts_only,
+ tombstones: filter.tombstones,
+ single_item: false,
+ };
+ let mut res = client.read_batch(&[op]).await?;
+ let res = res.pop().unwrap();
+ if output_kind.json {
+ let values = res
+ .items
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>();
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": values,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+ for (key, values) in res.items {
+ println!("key: {}", key);
+ let causality: String = values.causality.into();
+ println!("causality: {}", causality);
+ for value in values.value {
+ match value {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" value(utf-8): {}", string);
+ } else {
+ println!(" value(base64): {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ }
+ }
+ }
+ Command::DeleteRange {
+ partition_key,
+ output_kind,
+ filter,
+ } => {
+ let op = BatchDeleteOp {
+ partition_key: &partition_key,
+ prefix: filter.prefix.as_deref(),
+ start: filter.start.as_deref(),
+ end: filter.end.as_deref(),
+ single_item: false,
+ };
+ if filter.reverse
+ || filter.conflicts_only
+ || filter.tombstones
+ || filter.limit.is_some()
+ {
+ return Err(Error::Message(
+ "limit, conlicts-only, reverse and tombstones are invalid for delete-range"
+ .into(),
+ ));
+ }
+
+ let res = client.delete_batch(&[op]).await?;
+
+ if output_kind.json {
+ println!("{}", res[0]);
+ } else {
+ println!("deleted {} keys", res[0]);
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/k2v-client/src/error.rs b/src/k2v-client/src/error.rs
new file mode 100644
index 00000000..62357934
--- /dev/null
+++ b/src/k2v-client/src/error.rs
@@ -0,0 +1,22 @@
+use std::borrow::Cow;
+
+use thiserror::Error;
+
+/// Errors returned by this crate
+#[derive(Error, Debug)]
+pub enum Error {
+ #[error("received invalid response: {0}")]
+ InvalidResponse(Cow<'static, str>),
+ #[error("not found")]
+ NotFound,
+ #[error("io error: {0}")]
+ IoError(#[from] std::io::Error),
+ #[error("rusoto tls error: {0}")]
+ RusotoTls(#[from] rusoto_core::request::TlsError),
+ #[error("rusoto http error: {0}")]
+ RusotoHttp(#[from] rusoto_core::HttpDispatchError),
+ #[error("deserialization error: {0}")]
+ Deserialization(#[from] serde_json::Error),
+ #[error("{0}")]
+ Message(Cow<'static, str>),
+}
diff --git a/src/k2v-client/src/lib.rs b/src/k2v-client/src/lib.rs
new file mode 100644
index 00000000..ba1cd6ea
--- /dev/null
+++ b/src/k2v-client/src/lib.rs
@@ -0,0 +1,566 @@
+use std::collections::BTreeMap;
+use std::time::Duration;
+
+use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
+use http::status::StatusCode;
+use http::HeaderMap;
+
+use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
+use rusoto_credential::AwsCredentials;
+use rusoto_signature::region::Region;
+use rusoto_signature::signature::SignedRequest;
+use serde::de::Error as DeError;
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use tokio::io::AsyncReadExt;
+
+mod error;
+
+pub use error::Error;
+
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
+const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
+const SERVICE: &str = "k2v";
+const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+
+/// Client used to query a K2V server.
+pub struct K2vClient {
+ region: Region,
+ bucket: String,
+ creds: AwsCredentials,
+ client: HttpClient,
+}
+
+impl K2vClient {
+ /// Create a new K2V client.
+ pub fn new(
+ region: Region,
+ bucket: String,
+ creds: AwsCredentials,
+ user_agent: Option<String>,
+ ) -> Result<Self, Error> {
+ let mut client = HttpClient::new()?;
+ if let Some(ua) = user_agent {
+ client.local_agent_prepend(ua);
+ } else {
+ client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION")));
+ }
+ Ok(K2vClient {
+ region,
+ bucket,
+ creds,
+ client,
+ })
+ }
+
+ /// Perform a ReadItem request, reading the value(s) stored for a single pk+sk.
+ pub async fn read_item(
+ &self,
+ partition_key: &str,
+ sort_key: &str,
+ ) -> Result<CausalValue, Error> {
+ let mut req = SignedRequest::new(
+ "GET",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("sort_key", sort_key);
+ req.add_header(ACCEPT, "application/octet-stream, application/json");
+
+ let res = self.dispatch(req, None).await?;
+
+ let causality = res
+ .causality_token
+ .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?;
+
+ if res.status == StatusCode::NO_CONTENT {
+ return Ok(CausalValue {
+ causality,
+ value: vec![K2vValue::Tombstone],
+ });
+ }
+
+ match res.content_type.as_deref() {
+ Some("application/octet-stream") => Ok(CausalValue {
+ causality,
+ value: vec![K2vValue::Value(res.body)],
+ }),
+ Some("application/json") => {
+ let value = serde_json::from_slice(&res.body)?;
+ Ok(CausalValue { causality, value })
+ }
+ Some(ct) => Err(Error::InvalidResponse(
+ format!("invalid content type: {}", ct).into(),
+ )),
+ None => Err(Error::InvalidResponse("missing content type".into())),
+ }
+ }
+
+ /// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be
+ /// updated.
+ pub async fn poll_item(
+ &self,
+ partition_key: &str,
+ sort_key: &str,
+ causality: CausalityToken,
+ timeout: Option<Duration>,
+ ) -> Result<Option<CausalValue>, Error> {
+ let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
+
+ let mut req = SignedRequest::new(
+ "GET",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("sort_key", sort_key);
+ req.add_param("causality_token", &causality.0);
+ req.add_param("timeout", &timeout.as_secs().to_string());
+ req.add_header(ACCEPT, "application/octet-stream, application/json");
+
+ let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
+
+ let causality = res
+ .causality_token
+ .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?;
+
+ if res.status == StatusCode::NOT_MODIFIED {
+ return Ok(None);
+ }
+
+ if res.status == StatusCode::NO_CONTENT {
+ return Ok(Some(CausalValue {
+ causality,
+ value: vec![K2vValue::Tombstone],
+ }));
+ }
+
+ match res.content_type.as_deref() {
+ Some("application/octet-stream") => Ok(Some(CausalValue {
+ causality,
+ value: vec![K2vValue::Value(res.body)],
+ })),
+ Some("application/json") => {
+ let value = serde_json::from_slice(&res.body)?;
+ Ok(Some(CausalValue { causality, value }))
+ }
+ Some(ct) => Err(Error::InvalidResponse(
+ format!("invalid content type: {}", ct).into(),
+ )),
+ None => Err(Error::InvalidResponse("missing content type".into())),
+ }
+ }
+
+ /// Perform an InsertItem request, inserting a value for a single pk+sk.
+ pub async fn insert_item(
+ &self,
+ partition_key: &str,
+ sort_key: &str,
+ value: Vec<u8>,
+ causality: Option<CausalityToken>,
+ ) -> Result<(), Error> {
+ let mut req = SignedRequest::new(
+ "PUT",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("sort_key", sort_key);
+ req.set_payload(Some(value));
+
+ if let Some(causality) = causality {
+ req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ }
+
+ self.dispatch(req, None).await?;
+ Ok(())
+ }
+
+ /// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk.
+ pub async fn delete_item(
+ &self,
+ partition_key: &str,
+ sort_key: &str,
+ causality: CausalityToken,
+ ) -> Result<(), Error> {
+ let mut req = SignedRequest::new(
+ "DELETE",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("sort_key", sort_key);
+ req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+
+ self.dispatch(req, None).await?;
+ Ok(())
+ }
+
+ /// Perform a ReadIndex request, listing partition key which have at least one associated
+ /// sort key, and which matches the filter.
+ pub async fn read_index(
+ &self,
+ filter: Filter<'_>,
+ ) -> Result<PaginatedRange<PartitionInfo>, Error> {
+ let mut req =
+ SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket));
+ filter.insert_params(&mut req);
+
+ let res = self.dispatch(req, None).await?;
+
+ let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?;
+
+ let items = resp
+ .partition_keys
+ .into_iter()
+ .map(|ReadIndexItem { pk, info }| (pk, info))
+ .collect();
+
+ Ok(PaginatedRange {
+ items,
+ next_start: resp.next_start,
+ })
+ }
+
+ /// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is
+ /// *not* atomic: it is possible for some sub-operations to fails and others to success. In
+ /// that case, failure is reported.
+ pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
+ let mut req =
+ SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
+
+ let payload = serde_json::to_vec(operations)?;
+ req.set_payload(Some(payload));
+ self.dispatch(req, None).await?;
+ Ok(())
+ }
+
+ /// Perform a ReadBatch request, reading multiple values or range of values at once.
+ pub async fn read_batch(
+ &self,
+ operations: &[BatchReadOp<'_>],
+ ) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
+ let mut req =
+ SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
+ req.add_param("search", "");
+
+ let payload = serde_json::to_vec(operations)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, None).await?;
+
+ let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
+
+ Ok(resp
+ .into_iter()
+ .map(|e| PaginatedRange {
+ items: e
+ .items
+ .into_iter()
+ .map(|BatchReadItem { sk, ct, v }| {
+ (
+ sk,
+ CausalValue {
+ causality: ct,
+ value: v,
+ },
+ )
+ })
+ .collect(),
+ next_start: e.next_start,
+ })
+ .collect())
+ }
+
+ /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
+ /// providing causality information.
+ pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
+ let mut req =
+ SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
+ req.add_param("delete", "");
+
+ let payload = serde_json::to_vec(operations)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, None).await?;
+
+ let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
+
+ Ok(resp.into_iter().map(|r| r.deleted_items).collect())
+ }
+
+ async fn dispatch(
+ &self,
+ mut req: SignedRequest,
+ timeout: Option<Duration>,
+ ) -> Result<Response, Error> {
+ req.sign(&self.creds);
+ let mut res = self
+ .client
+ .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
+ .await?;
+
+ let causality_token = res
+ .headers
+ .remove(GARAGE_CAUSALITY_TOKEN)
+ .map(CausalityToken);
+ let content_type = res.headers.remove(CONTENT_TYPE);
+
+ let body = match res.status {
+ StatusCode::OK => read_body(&mut res.headers, res.body).await?,
+ StatusCode::NO_CONTENT => Vec::new(),
+ StatusCode::NOT_FOUND => return Err(Error::NotFound),
+ StatusCode::NOT_MODIFIED => Vec::new(),
+ _ => {
+ return Err(Error::InvalidResponse(
+ format!("invalid error code: {}", res.status).into(),
+ ))
+ }
+ };
+
+ Ok(Response {
+ body,
+ status: res.status,
+ causality_token,
+ content_type,
+ })
+ }
+}
+
+async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> {
+ let body_len = headers
+ .get(CONTENT_LENGTH)
+ .and_then(|h| h.parse().ok())
+ .unwrap_or(0);
+ let mut res = Vec::with_capacity(body_len);
+ body.into_async_read().read_to_end(&mut res).await?;
+ Ok(res)
+}
+
+/// An opaque token used to convey causality between operations.
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(transparent)]
+pub struct CausalityToken(String);
+
+impl From<String> for CausalityToken {
+ fn from(v: String) -> Self {
+ CausalityToken(v)
+ }
+}
+
+impl From<CausalityToken> for String {
+ fn from(v: CausalityToken) -> Self {
+ v.0
+ }
+}
+
+/// A value in K2V. can be either a binary value, or a tombstone.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum K2vValue {
+ Tombstone,
+ Value(Vec<u8>),
+}
+
+impl From<Vec<u8>> for K2vValue {
+ fn from(v: Vec<u8>) -> Self {
+ K2vValue::Value(v)
+ }
+}
+
+impl From<Option<Vec<u8>>> for K2vValue {
+ fn from(v: Option<Vec<u8>>) -> Self {
+ match v {
+ Some(v) => K2vValue::Value(v),
+ None => K2vValue::Tombstone,
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for K2vValue {
+ fn deserialize<D>(d: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let val: Option<&str> = Option::deserialize(d)?;
+ Ok(match val {
+ Some(s) => {
+ K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?)
+ }
+ None => K2vValue::Tombstone,
+ })
+ }
+}
+
+impl Serialize for K2vValue {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ K2vValue::Tombstone => serializer.serialize_none(),
+ K2vValue::Value(v) => {
+ let b64 = base64::encode(v);
+ serializer.serialize_str(&b64)
+ }
+ }
+ }
+}
+
+/// A set of K2vValue and associated causality information.
+#[derive(Debug, Clone, Serialize)]
+pub struct CausalValue {
+ pub causality: CausalityToken,
+ pub value: Vec<K2vValue>,
+}
+
+/// Result of paginated requests.
+#[derive(Debug, Clone)]
+pub struct PaginatedRange<V> {
+ pub items: BTreeMap<String, V>,
+ pub next_start: Option<String>,
+}
+
+/// Filter for batch operations.
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Filter<'a> {
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ pub prefix: Option<&'a str>,
+ pub limit: Option<u64>,
+ #[serde(default)]
+ pub reverse: bool,
+}
+
+impl<'a> Filter<'a> {
+ fn insert_params(&self, req: &mut SignedRequest) {
+ if let Some(start) = &self.start {
+ req.add_param("start", start);
+ }
+ if let Some(end) = &self.end {
+ req.add_param("end", end);
+ }
+ if let Some(prefix) = &self.prefix {
+ req.add_param("prefix", prefix);
+ }
+ if let Some(limit) = &self.limit {
+ req.add_param("limit", &limit.to_string());
+ }
+ if self.reverse {
+ req.add_param("reverse", "true");
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ReadIndexResponse<'a> {
+ #[serde(flatten, borrow)]
+ #[allow(dead_code)]
+ filter: Filter<'a>,
+ partition_keys: Vec<ReadIndexItem>,
+ #[allow(dead_code)]
+ more: bool,
+ next_start: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+struct ReadIndexItem {
+ pk: String,
+ #[serde(flatten)]
+ info: PartitionInfo,
+}
+
+/// Information about data stored with a given partition key.
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct PartitionInfo {
+ pub entries: u64,
+ pub conflicts: u64,
+ pub values: u64,
+ pub bytes: u64,
+}
+
+/// Single sub-operation of an InsertBatch.
+#[derive(Debug, Clone, Serialize)]
+pub struct BatchInsertOp<'a> {
+ #[serde(rename = "pk")]
+ pub partition_key: &'a str,
+ #[serde(rename = "sk")]
+ pub sort_key: &'a str,
+ #[serde(rename = "ct")]
+ pub causality: Option<CausalityToken>,
+ #[serde(rename = "v")]
+ pub value: K2vValue,
+}
+
+/// Single sub-operation of a ReadBatch.
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BatchReadOp<'a> {
+ pub partition_key: &'a str,
+ #[serde(flatten, borrow)]
+ pub filter: Filter<'a>,
+ #[serde(default)]
+ pub single_item: bool,
+ #[serde(default)]
+ pub conflicts_only: bool,
+ #[serde(default)]
+ pub tombstones: bool,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct BatchReadResponse<'a> {
+ #[serde(flatten, borrow)]
+ #[allow(dead_code)]
+ op: BatchReadOp<'a>,
+ items: Vec<BatchReadItem>,
+ #[allow(dead_code)]
+ more: bool,
+ next_start: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+struct BatchReadItem {
+ sk: String,
+ ct: CausalityToken,
+ v: Vec<K2vValue>,
+}
+
+/// Single sub-operation of a DeleteBatch
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BatchDeleteOp<'a> {
+ pub partition_key: &'a str,
+ pub prefix: Option<&'a str>,
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ #[serde(default)]
+ pub single_item: bool,
+}
+
+impl<'a> BatchDeleteOp<'a> {
+ pub fn new(partition_key: &'a str) -> Self {
+ BatchDeleteOp {
+ partition_key,
+ prefix: None,
+ start: None,
+ end: None,
+ single_item: false,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct BatchDeleteResponse<'a> {
+ #[serde(flatten, borrow)]
+ #[allow(dead_code)]
+ filter: BatchDeleteOp<'a>,
+ deleted_items: u64,
+}
+
+struct Response {
+ body: Vec<u8>,
+ status: StatusCode,
+ causality_token: Option<CausalityToken>,
+ content_type: Option<String>,
+}