diff options
author | trinity-1686a <trinity.pointard@gmail.com> | 2022-05-18 22:24:09 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-18 22:24:09 +0200 |
commit | 64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e (patch) | |
tree | 40f2b00ecaa0f1b14a1e08fbc7883c5305fe9552 /src/k2v-client | |
parent | c692f55d5ce2c3ed08db7fbc4844debcc0aeb134 (diff) | |
download | garage-64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e.tar.gz garage-64c193e3dbb536d5d3c2881bc9aebbb3e4e6272e.zip |
Add a K2V client library and CLI (#303)
lib.rs could use getting split in modules, but I'm not sure how exactly
Co-authored-by: trinity-1686a <trinity@deuxfleurs.fr>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/303
Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
Co-committed-by: trinity-1686a <trinity.pointard@gmail.com>
Diffstat (limited to 'src/k2v-client')
-rw-r--r-- | src/k2v-client/Cargo.toml | 27 | ||||
-rw-r--r-- | src/k2v-client/README.md | 25 | ||||
-rw-r--r-- | src/k2v-client/src/bin/k2v-cli.rs | 466 | ||||
-rw-r--r-- | src/k2v-client/src/error.rs | 22 | ||||
-rw-r--r-- | src/k2v-client/src/lib.rs | 566 |
5 files changed, 1106 insertions, 0 deletions
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml new file mode 100644 index 00000000..84c6b8b2 --- /dev/null +++ b/src/k2v-client/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "k2v-client" +version = "0.1.0" +edition = "2018" + +[dependencies] +base64 = "0.13.0" +http = "0.2.6" +rusoto_core = "0.48.0" +rusoto_credential = "0.48.0" +rusoto_signature = "0.48.0" +serde = "1.0.137" +serde_json = "1.0.81" +thiserror = "1.0.31" +tokio = "1.17.0" + +# cli deps +clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } +garage_util = { path = "../util", optional = true } + + +[features] +cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"] + +[[bin]] +name = "k2v-cli" +required-features = ["cli"] diff --git a/src/k2v-client/README.md b/src/k2v-client/README.md new file mode 100644 index 00000000..db454805 --- /dev/null +++ b/src/k2v-client/README.md @@ -0,0 +1,25 @@ +Example usage: +```sh +# all these values can be provided on the cli instead +export AWS_ACCESS_KEY_ID=GK123456 +export AWS_SECRET_ACCESS_KEY=0123..789 +export AWS_REGION=garage +export K2V_ENDPOINT=http://172.30.2.1:3903 +export K2V_BUCKET=my-bucket + +cargo run --features=cli -- read-range my-partition-key --all + +cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string1" +cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string2" +cargo run --features=cli -- insert my-partition-key my-sort-key2 --text "my string" + +cargo run --features=cli -- read-range my-partition-key --all + +causality=$(cargo run --features=cli -- read my-partition-key my-sort-key2 -b | head -n1) +cargo run --features=cli -- delete my-partition-key my-sort-key2 -c $causality + +causality=$(cargo run --features=cli -- read my-partition-key my-sort-key -b | head -n1) +cargo run --features=cli -- insert my-partition-key my-sort-key --text "my string3" -c $causality + +cargo run --features=cli -- read-range my-partition-key --all +``` diff --git a/src/k2v-client/src/bin/k2v-cli.rs b/src/k2v-client/src/bin/k2v-cli.rs new file mode 100644 index 00000000..38c39361 --- /dev/null +++ b/src/k2v-client/src/bin/k2v-cli.rs @@ -0,0 +1,466 @@ +use k2v_client::*; + +use garage_util::formater::format_table; + +use rusoto_core::credential::AwsCredentials; +use rusoto_core::Region; + +use clap::{Parser, Subcommand}; + +/// K2V command line interface +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + /// Name of the region to use + #[clap(short, long, env = "AWS_REGION", default_value = "garage")] + region: String, + /// Url of the endpoint to connect to + #[clap(short, long, env = "K2V_ENDPOINT")] + endpoint: String, + /// Access key ID + #[clap(short, long, env = "AWS_ACCESS_KEY_ID")] + key_id: String, + /// Access key ID + #[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")] + secret: String, + /// Bucket name + #[clap(short, long, env = "K2V_BUCKET")] + bucket: String, + #[clap(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + /// Insert a single value + Insert { + /// Partition key to insert to + partition_key: String, + /// Sort key to insert to + sort_key: String, + /// Causality of the insertion + #[clap(short, long)] + causality: Option<String>, + /// Value to insert + #[clap(flatten)] + value: Value, + }, + /// Read a single value + Read { + /// Partition key to read from + partition_key: String, + /// Sort key to read from + sort_key: String, + /// Output formating + #[clap(flatten)] + output_kind: ReadOutputKind, + }, + /// Delete a single value + Delete { + /// Partition key to delete from + partition_key: String, + /// Sort key to delete from + sort_key: String, + /// Causality information + #[clap(short, long)] + causality: String, + }, + /// List partition keys + ReadIndex { + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only partition keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Read a range of sort keys + ReadRange { + /// Partition key to read from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Delete a range of sort keys + DeleteRange { + /// Partition key to delete from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Delete only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, +} + +/// Where to read a value from +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))] +struct Value { + /// Read value from a file. use - to read from stdin + #[clap(short, long, group = "value")] + file: Option<String>, + /// Read a base64 value from commandline + #[clap(short, long, group = "value")] + b64: Option<String>, + /// Read a raw (UTF-8) value from the commandline + #[clap(short, long, group = "value")] + text: Option<String>, +} + +impl Value { + async fn to_data(&self) -> Result<Vec<u8>, Error> { + if let Some(ref text) = self.text { + Ok(text.as_bytes().to_vec()) + } else if let Some(ref b64) = self.b64 { + base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) + } else if let Some(ref path) = self.file { + use tokio::io::AsyncReadExt; + if path == "-" { + let mut file = tokio::io::stdin(); + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } else { + let mut file = tokio::fs::File::open(path).await?; + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } + } else { + unreachable!("Value must have one option set") + } + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct ReadOutputKind { + /// Base64 output. Conflicts are line separated, first line is causality token + #[clap(short, long, group = "output-kind")] + b64: bool, + /// Raw output. Conflicts generate error, causality token is not returned + #[clap(short, long, group = "output-kind")] + raw: bool, + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +impl ReadOutputKind { + fn display_output(&self, val: CausalValue) -> ! { + use std::io::Write; + use std::process::exit; + + if self.json { + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &val).unwrap(); + exit(0); + } + + if self.raw { + let mut val = val.value; + if val.len() != 1 { + eprintln!( + "Raw mode can only read non-concurent values, found {} values, expected 1", + val.len() + ); + exit(1); + } + let val = val.pop().unwrap(); + match val { + K2vValue::Value(v) => { + std::io::stdout().write_all(&v).unwrap(); + exit(0); + } + K2vValue::Tombstone => { + eprintln!("Expected value, found tombstone"); + exit(2); + } + } + } + + let causality: String = val.causality.into(); + if self.b64 { + println!("{}", causality); + for val in val.value { + match val { + K2vValue::Value(v) => { + println!("{}", base64::encode(&v)) + } + K2vValue::Tombstone => { + println!(); + } + } + } + exit(0); + } + + // human + println!("causality: {}", causality); + println!("values:"); + for val in val.value { + match val { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" utf-8: {}", string); + } else { + println!(" base64: {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + exit(0); + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct BatchOutputKind { + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +/// Filter for batch operations +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] +struct Filter { + /// Match only keys starting with this prefix + #[clap(short, long, group = "filter")] + prefix: Option<String>, + /// Match only keys lexicographically after this key (including this key itself) + #[clap(short, long, group = "filter")] + start: Option<String>, + /// Match only keys lexicographically before this key (excluding this key) + #[clap(short, long, group = "filter")] + end: Option<String>, + /// Only match the first X keys + #[clap(short, long)] + limit: Option<u64>, + /// Return keys in reverse order + #[clap(short, long)] + reverse: bool, + /// Return only keys where conflict happened + #[clap(short, long)] + conflicts_only: bool, + /// Also include keys storing only tombstones + #[clap(short, long)] + tombstones: bool, + /// Return any key + #[clap(short, long, group = "filter")] + all: bool, +} + +impl Filter { + fn k2v_filter(&self) -> k2v_client::Filter<'_> { + k2v_client::Filter { + start: self.start.as_deref(), + end: self.end.as_deref(), + prefix: self.prefix.as_deref(), + limit: self.limit, + reverse: self.reverse, + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let args = Args::parse(); + + let region = Region::Custom { + name: args.region, + endpoint: args.endpoint, + }; + + let creds = AwsCredentials::new(args.key_id, args.secret, None, None); + + let client = K2vClient::new(region, args.bucket, creds, None)?; + + match args.command { + Command::Insert { + partition_key, + sort_key, + causality, + value, + } => { + client + .insert_item( + &partition_key, + &sort_key, + value.to_data().await?, + causality.map(Into::into), + ) + .await?; + } + Command::Delete { + partition_key, + sort_key, + causality, + } => { + client + .delete_item(&partition_key, &sort_key, causality.into()) + .await?; + } + Command::Read { + partition_key, + sort_key, + output_kind, + } => { + let res = client.read_item(&partition_key, &sort_key).await?; + output_kind.display_output(res); + } + Command::ReadIndex { + output_kind, + filter, + } => { + if filter.conflicts_only || filter.tombstones { + return Err(Error::Message( + "conlicts-only and tombstones are invalid for read-index".into(), + )); + } + let res = client.read_index(filter.k2v_filter()).await?; + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::<Vec<_>>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + + let mut to_print = Vec::new(); + to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); + for (k, v) in res.items { + to_print.push(format!( + "{}\t{}\t{}\t{}\t{}", + k, v.entries, v.conflicts, v.values, v.bytes + )); + } + format_table(to_print); + } + } + Command::ReadRange { + partition_key, + output_kind, + filter, + } => { + let op = BatchReadOp { + partition_key: &partition_key, + filter: filter.k2v_filter(), + conflicts_only: filter.conflicts_only, + tombstones: filter.tombstones, + single_item: false, + }; + let mut res = client.read_batch(&[op]).await?; + let res = res.pop().unwrap(); + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::<Vec<_>>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + for (key, values) in res.items { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + } + } + Command::DeleteRange { + partition_key, + output_kind, + filter, + } => { + let op = BatchDeleteOp { + partition_key: &partition_key, + prefix: filter.prefix.as_deref(), + start: filter.start.as_deref(), + end: filter.end.as_deref(), + single_item: false, + }; + if filter.reverse + || filter.conflicts_only + || filter.tombstones + || filter.limit.is_some() + { + return Err(Error::Message( + "limit, conlicts-only, reverse and tombstones are invalid for delete-range" + .into(), + )); + } + + let res = client.delete_batch(&[op]).await?; + + if output_kind.json { + println!("{}", res[0]); + } else { + println!("deleted {} keys", res[0]); + } + } + } + + Ok(()) +} diff --git a/src/k2v-client/src/error.rs b/src/k2v-client/src/error.rs new file mode 100644 index 00000000..62357934 --- /dev/null +++ b/src/k2v-client/src/error.rs @@ -0,0 +1,22 @@ +use std::borrow::Cow; + +use thiserror::Error; + +/// Errors returned by this crate +#[derive(Error, Debug)] +pub enum Error { + #[error("received invalid response: {0}")] + InvalidResponse(Cow<'static, str>), + #[error("not found")] + NotFound, + #[error("io error: {0}")] + IoError(#[from] std::io::Error), + #[error("rusoto tls error: {0}")] + RusotoTls(#[from] rusoto_core::request::TlsError), + #[error("rusoto http error: {0}")] + RusotoHttp(#[from] rusoto_core::HttpDispatchError), + #[error("deserialization error: {0}")] + Deserialization(#[from] serde_json::Error), + #[error("{0}")] + Message(Cow<'static, str>), +} diff --git a/src/k2v-client/src/lib.rs b/src/k2v-client/src/lib.rs new file mode 100644 index 00000000..ba1cd6ea --- /dev/null +++ b/src/k2v-client/src/lib.rs @@ -0,0 +1,566 @@ +use std::collections::BTreeMap; +use std::time::Duration; + +use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; +use http::status::StatusCode; +use http::HeaderMap; + +use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; +use rusoto_credential::AwsCredentials; +use rusoto_signature::region::Region; +use rusoto_signature::signature::SignedRequest; +use serde::de::Error as DeError; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use tokio::io::AsyncReadExt; + +mod error; + +pub use error::Error; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); +const SERVICE: &str = "k2v"; +const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; + +/// Client used to query a K2V server. +pub struct K2vClient { + region: Region, + bucket: String, + creds: AwsCredentials, + client: HttpClient, +} + +impl K2vClient { + /// Create a new K2V client. + pub fn new( + region: Region, + bucket: String, + creds: AwsCredentials, + user_agent: Option<String>, + ) -> Result<Self, Error> { + let mut client = HttpClient::new()?; + if let Some(ua) = user_agent { + client.local_agent_prepend(ua); + } else { + client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); + } + Ok(K2vClient { + region, + bucket, + creds, + client, + }) + } + + /// Perform a ReadItem request, reading the value(s) stored for a single pk+sk. + pub async fn read_item( + &self, + partition_key: &str, + sort_key: &str, + ) -> Result<CausalValue, Error> { + let mut req = SignedRequest::new( + "GET", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_header(ACCEPT, "application/octet-stream, application/json"); + + let res = self.dispatch(req, None).await?; + + let causality = res + .causality_token + .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; + + if res.status == StatusCode::NO_CONTENT { + return Ok(CausalValue { + causality, + value: vec![K2vValue::Tombstone], + }); + } + + match res.content_type.as_deref() { + Some("application/octet-stream") => Ok(CausalValue { + causality, + value: vec![K2vValue::Value(res.body)], + }), + Some("application/json") => { + let value = serde_json::from_slice(&res.body)?; + Ok(CausalValue { causality, value }) + } + Some(ct) => Err(Error::InvalidResponse( + format!("invalid content type: {}", ct).into(), + )), + None => Err(Error::InvalidResponse("missing content type".into())), + } + } + + /// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be + /// updated. + pub async fn poll_item( + &self, + partition_key: &str, + sort_key: &str, + causality: CausalityToken, + timeout: Option<Duration>, + ) -> Result<Option<CausalValue>, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let mut req = SignedRequest::new( + "GET", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_param("causality_token", &causality.0); + req.add_param("timeout", &timeout.as_secs().to_string()); + req.add_header(ACCEPT, "application/octet-stream, application/json"); + + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + let causality = res + .causality_token + .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + if res.status == StatusCode::NO_CONTENT { + return Ok(Some(CausalValue { + causality, + value: vec![K2vValue::Tombstone], + })); + } + + match res.content_type.as_deref() { + Some("application/octet-stream") => Ok(Some(CausalValue { + causality, + value: vec![K2vValue::Value(res.body)], + })), + Some("application/json") => { + let value = serde_json::from_slice(&res.body)?; + Ok(Some(CausalValue { causality, value })) + } + Some(ct) => Err(Error::InvalidResponse( + format!("invalid content type: {}", ct).into(), + )), + None => Err(Error::InvalidResponse("missing content type".into())), + } + } + + /// Perform an InsertItem request, inserting a value for a single pk+sk. + pub async fn insert_item( + &self, + partition_key: &str, + sort_key: &str, + value: Vec<u8>, + causality: Option<CausalityToken>, + ) -> Result<(), Error> { + let mut req = SignedRequest::new( + "PUT", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.set_payload(Some(value)); + + if let Some(causality) = causality { + req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + } + + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk. + pub async fn delete_item( + &self, + partition_key: &str, + sort_key: &str, + causality: CausalityToken, + ) -> Result<(), Error> { + let mut req = SignedRequest::new( + "DELETE", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a ReadIndex request, listing partition key which have at least one associated + /// sort key, and which matches the filter. + pub async fn read_index( + &self, + filter: Filter<'_>, + ) -> Result<PaginatedRange<PartitionInfo>, Error> { + let mut req = + SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); + filter.insert_params(&mut req); + + let res = self.dispatch(req, None).await?; + + let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .partition_keys + .into_iter() + .map(|ReadIndexItem { pk, info }| (pk, info)) + .collect(); + + Ok(PaginatedRange { + items, + next_start: resp.next_start, + }) + } + + /// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is + /// *not* atomic: it is possible for some sub-operations to fails and others to success. In + /// that case, failure is reported. + pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a ReadBatch request, reading multiple values or range of values at once. + pub async fn read_batch( + &self, + operations: &[BatchReadOp<'_>], + ) -> Result<Vec<PaginatedRange<CausalValue>>, Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + req.add_param("search", ""); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, None).await?; + + let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?; + + Ok(resp + .into_iter() + .map(|e| PaginatedRange { + items: e + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect(), + next_start: e.next_start, + }) + .collect()) + } + + /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without + /// providing causality information. + pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + req.add_param("delete", ""); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, None).await?; + + let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?; + + Ok(resp.into_iter().map(|r| r.deleted_items).collect()) + } + + async fn dispatch( + &self, + mut req: SignedRequest, + timeout: Option<Duration>, + ) -> Result<Response, Error> { + req.sign(&self.creds); + let mut res = self + .client + .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) + .await?; + + let causality_token = res + .headers + .remove(GARAGE_CAUSALITY_TOKEN) + .map(CausalityToken); + let content_type = res.headers.remove(CONTENT_TYPE); + + let body = match res.status { + StatusCode::OK => read_body(&mut res.headers, res.body).await?, + StatusCode::NO_CONTENT => Vec::new(), + StatusCode::NOT_FOUND => return Err(Error::NotFound), + StatusCode::NOT_MODIFIED => Vec::new(), + _ => { + return Err(Error::InvalidResponse( + format!("invalid error code: {}", res.status).into(), + )) + } + }; + + Ok(Response { + body, + status: res.status, + causality_token, + content_type, + }) + } +} + +async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> { + let body_len = headers + .get(CONTENT_LENGTH) + .and_then(|h| h.parse().ok()) + .unwrap_or(0); + let mut res = Vec::with_capacity(body_len); + body.into_async_read().read_to_end(&mut res).await?; + Ok(res) +} + +/// An opaque token used to convey causality between operations. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[serde(transparent)] +pub struct CausalityToken(String); + +impl From<String> for CausalityToken { + fn from(v: String) -> Self { + CausalityToken(v) + } +} + +impl From<CausalityToken> for String { + fn from(v: CausalityToken) -> Self { + v.0 + } +} + +/// A value in K2V. can be either a binary value, or a tombstone. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum K2vValue { + Tombstone, + Value(Vec<u8>), +} + +impl From<Vec<u8>> for K2vValue { + fn from(v: Vec<u8>) -> Self { + K2vValue::Value(v) + } +} + +impl From<Option<Vec<u8>>> for K2vValue { + fn from(v: Option<Vec<u8>>) -> Self { + match v { + Some(v) => K2vValue::Value(v), + None => K2vValue::Tombstone, + } + } +} + +impl<'de> Deserialize<'de> for K2vValue { + fn deserialize<D>(d: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let val: Option<&str> = Option::deserialize(d)?; + Ok(match val { + Some(s) => { + K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) + } + None => K2vValue::Tombstone, + }) + } +} + +impl Serialize for K2vValue { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + match self { + K2vValue::Tombstone => serializer.serialize_none(), + K2vValue::Value(v) => { + let b64 = base64::encode(v); + serializer.serialize_str(&b64) + } + } + } +} + +/// A set of K2vValue and associated causality information. +#[derive(Debug, Clone, Serialize)] +pub struct CausalValue { + pub causality: CausalityToken, + pub value: Vec<K2vValue>, +} + +/// Result of paginated requests. +#[derive(Debug, Clone)] +pub struct PaginatedRange<V> { + pub items: BTreeMap<String, V>, + pub next_start: Option<String>, +} + +/// Filter for batch operations. +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +pub struct Filter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, + pub limit: Option<u64>, + #[serde(default)] + pub reverse: bool, +} + +impl<'a> Filter<'a> { + fn insert_params(&self, req: &mut SignedRequest) { + if let Some(start) = &self.start { + req.add_param("start", start); + } + if let Some(end) = &self.end { + req.add_param("end", end); + } + if let Some(prefix) = &self.prefix { + req.add_param("prefix", prefix); + } + if let Some(limit) = &self.limit { + req.add_param("limit", &limit.to_string()); + } + if self.reverse { + req.add_param("reverse", "true"); + } + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReadIndexResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + filter: Filter<'a>, + partition_keys: Vec<ReadIndexItem>, + #[allow(dead_code)] + more: bool, + next_start: Option<String>, +} + +#[derive(Debug, Clone, Deserialize)] +struct ReadIndexItem { + pk: String, + #[serde(flatten)] + info: PartitionInfo, +} + +/// Information about data stored with a given partition key. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct PartitionInfo { + pub entries: u64, + pub conflicts: u64, + pub values: u64, + pub bytes: u64, +} + +/// Single sub-operation of an InsertBatch. +#[derive(Debug, Clone, Serialize)] +pub struct BatchInsertOp<'a> { + #[serde(rename = "pk")] + pub partition_key: &'a str, + #[serde(rename = "sk")] + pub sort_key: &'a str, + #[serde(rename = "ct")] + pub causality: Option<CausalityToken>, + #[serde(rename = "v")] + pub value: K2vValue, +} + +/// Single sub-operation of a ReadBatch. +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BatchReadOp<'a> { + pub partition_key: &'a str, + #[serde(flatten, borrow)] + pub filter: Filter<'a>, + #[serde(default)] + pub single_item: bool, + #[serde(default)] + pub conflicts_only: bool, + #[serde(default)] + pub tombstones: bool, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BatchReadResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + op: BatchReadOp<'a>, + items: Vec<BatchReadItem>, + #[allow(dead_code)] + more: bool, + next_start: Option<String>, +} + +#[derive(Debug, Clone, Deserialize)] +struct BatchReadItem { + sk: String, + ct: CausalityToken, + v: Vec<K2vValue>, +} + +/// Single sub-operation of a DeleteBatch +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BatchDeleteOp<'a> { + pub partition_key: &'a str, + pub prefix: Option<&'a str>, + pub start: Option<&'a str>, + pub end: Option<&'a str>, + #[serde(default)] + pub single_item: bool, +} + +impl<'a> BatchDeleteOp<'a> { + pub fn new(partition_key: &'a str) -> Self { + BatchDeleteOp { + partition_key, + prefix: None, + start: None, + end: None, + single_item: false, + } + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BatchDeleteResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + filter: BatchDeleteOp<'a>, + deleted_items: u64, +} + +struct Response { + body: Vec<u8>, + status: StatusCode, + causality_token: Option<CausalityToken>, + content_type: Option<String>, +} |