From b2a2d3859fefd53dab0b87274d5aed1f6bb608a3 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 24 May 2022 12:48:05 +0200 Subject: K2V client improvements (#307) - [x] Better distinguish error types - [x] Parse error messages received from server - [x] Remove `src/` folder layer, we don't have that for other crates Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/307 Co-authored-by: Alex Co-committed-by: Alex --- Cargo.lock | 1 + Cargo.nix | 5 +- src/k2v-client/Cargo.toml | 5 + src/k2v-client/bin/k2v-cli.rs | 466 +++++++++++++++++++++++++++++ src/k2v-client/error.rs | 29 ++ src/k2v-client/lib.rs | 611 ++++++++++++++++++++++++++++++++++++++ src/k2v-client/src/bin/k2v-cli.rs | 466 ----------------------------- src/k2v-client/src/error.rs | 22 -- src/k2v-client/src/lib.rs | 566 ----------------------------------- 9 files changed, 1115 insertions(+), 1056 deletions(-) create mode 100644 src/k2v-client/bin/k2v-cli.rs create mode 100644 src/k2v-client/error.rs create mode 100644 src/k2v-client/lib.rs delete mode 100644 src/k2v-client/src/bin/k2v-cli.rs delete mode 100644 src/k2v-client/src/error.rs delete mode 100644 src/k2v-client/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index fcf3030a..630642ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1584,6 +1584,7 @@ dependencies = [ "clap 3.1.18", "garage_util 0.7.0", "http", + "log", "rusoto_core", "rusoto_credential", "rusoto_signature", diff --git a/Cargo.nix b/Cargo.nix index 371ce8d3..d100e7bb 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -688,7 +688,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2117,6 +2117,7 @@ in clap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."3.1.18" { inherit profileName; }; garage_util = rustPackages."unknown".garage_util."0.7.0" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; + log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; rusoto_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }; rusoto_credential = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }; rusoto_signature = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_signature."0.48.0" { inherit profileName; }; @@ -5029,7 +5030,7 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 84c6b8b2..224414ab 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] base64 = "0.13.0" http = "0.2.6" +log = "0.4" rusoto_core = "0.48.0" rusoto_credential = "0.48.0" rusoto_signature = "0.48.0" @@ -22,6 +23,10 @@ garage_util = { path = "../util", optional = true } [features] cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"] +[lib] +path = "lib.rs" + [[bin]] name = "k2v-cli" +path = "bin/k2v-cli.rs" required-features = ["cli"] diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs new file mode 100644 index 00000000..38c39361 --- /dev/null +++ b/src/k2v-client/bin/k2v-cli.rs @@ -0,0 +1,466 @@ +use k2v_client::*; + +use garage_util::formater::format_table; + +use rusoto_core::credential::AwsCredentials; +use rusoto_core::Region; + +use clap::{Parser, Subcommand}; + +/// K2V command line interface +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + /// Name of the region to use + #[clap(short, long, env = "AWS_REGION", default_value = "garage")] + region: String, + /// Url of the endpoint to connect to + #[clap(short, long, env = "K2V_ENDPOINT")] + endpoint: String, + /// Access key ID + #[clap(short, long, env = "AWS_ACCESS_KEY_ID")] + key_id: String, + /// Access key ID + #[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")] + secret: String, + /// Bucket name + #[clap(short, long, env = "K2V_BUCKET")] + bucket: String, + #[clap(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + /// Insert a single value + Insert { + /// Partition key to insert to + partition_key: String, + /// Sort key to insert to + sort_key: String, + /// Causality of the insertion + #[clap(short, long)] + causality: Option, + /// Value to insert + #[clap(flatten)] + value: Value, + }, + /// Read a single value + Read { + /// Partition key to read from + partition_key: String, + /// Sort key to read from + sort_key: String, + /// Output formating + #[clap(flatten)] + output_kind: ReadOutputKind, + }, + /// Delete a single value + Delete { + /// Partition key to delete from + partition_key: String, + /// Sort key to delete from + sort_key: String, + /// Causality information + #[clap(short, long)] + causality: String, + }, + /// List partition keys + ReadIndex { + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only partition keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Read a range of sort keys + ReadRange { + /// Partition key to read from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Delete a range of sort keys + DeleteRange { + /// Partition key to delete from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Delete only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, +} + +/// Where to read a value from +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))] +struct Value { + /// Read value from a file. use - to read from stdin + #[clap(short, long, group = "value")] + file: Option, + /// Read a base64 value from commandline + #[clap(short, long, group = "value")] + b64: Option, + /// Read a raw (UTF-8) value from the commandline + #[clap(short, long, group = "value")] + text: Option, +} + +impl Value { + async fn to_data(&self) -> Result, Error> { + if let Some(ref text) = self.text { + Ok(text.as_bytes().to_vec()) + } else if let Some(ref b64) = self.b64 { + base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) + } else if let Some(ref path) = self.file { + use tokio::io::AsyncReadExt; + if path == "-" { + let mut file = tokio::io::stdin(); + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } else { + let mut file = tokio::fs::File::open(path).await?; + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } + } else { + unreachable!("Value must have one option set") + } + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct ReadOutputKind { + /// Base64 output. Conflicts are line separated, first line is causality token + #[clap(short, long, group = "output-kind")] + b64: bool, + /// Raw output. Conflicts generate error, causality token is not returned + #[clap(short, long, group = "output-kind")] + raw: bool, + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +impl ReadOutputKind { + fn display_output(&self, val: CausalValue) -> ! { + use std::io::Write; + use std::process::exit; + + if self.json { + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &val).unwrap(); + exit(0); + } + + if self.raw { + let mut val = val.value; + if val.len() != 1 { + eprintln!( + "Raw mode can only read non-concurent values, found {} values, expected 1", + val.len() + ); + exit(1); + } + let val = val.pop().unwrap(); + match val { + K2vValue::Value(v) => { + std::io::stdout().write_all(&v).unwrap(); + exit(0); + } + K2vValue::Tombstone => { + eprintln!("Expected value, found tombstone"); + exit(2); + } + } + } + + let causality: String = val.causality.into(); + if self.b64 { + println!("{}", causality); + for val in val.value { + match val { + K2vValue::Value(v) => { + println!("{}", base64::encode(&v)) + } + K2vValue::Tombstone => { + println!(); + } + } + } + exit(0); + } + + // human + println!("causality: {}", causality); + println!("values:"); + for val in val.value { + match val { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" utf-8: {}", string); + } else { + println!(" base64: {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + exit(0); + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct BatchOutputKind { + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +/// Filter for batch operations +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] +struct Filter { + /// Match only keys starting with this prefix + #[clap(short, long, group = "filter")] + prefix: Option, + /// Match only keys lexicographically after this key (including this key itself) + #[clap(short, long, group = "filter")] + start: Option, + /// Match only keys lexicographically before this key (excluding this key) + #[clap(short, long, group = "filter")] + end: Option, + /// Only match the first X keys + #[clap(short, long)] + limit: Option, + /// Return keys in reverse order + #[clap(short, long)] + reverse: bool, + /// Return only keys where conflict happened + #[clap(short, long)] + conflicts_only: bool, + /// Also include keys storing only tombstones + #[clap(short, long)] + tombstones: bool, + /// Return any key + #[clap(short, long, group = "filter")] + all: bool, +} + +impl Filter { + fn k2v_filter(&self) -> k2v_client::Filter<'_> { + k2v_client::Filter { + start: self.start.as_deref(), + end: self.end.as_deref(), + prefix: self.prefix.as_deref(), + limit: self.limit, + reverse: self.reverse, + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let args = Args::parse(); + + let region = Region::Custom { + name: args.region, + endpoint: args.endpoint, + }; + + let creds = AwsCredentials::new(args.key_id, args.secret, None, None); + + let client = K2vClient::new(region, args.bucket, creds, None)?; + + match args.command { + Command::Insert { + partition_key, + sort_key, + causality, + value, + } => { + client + .insert_item( + &partition_key, + &sort_key, + value.to_data().await?, + causality.map(Into::into), + ) + .await?; + } + Command::Delete { + partition_key, + sort_key, + causality, + } => { + client + .delete_item(&partition_key, &sort_key, causality.into()) + .await?; + } + Command::Read { + partition_key, + sort_key, + output_kind, + } => { + let res = client.read_item(&partition_key, &sort_key).await?; + output_kind.display_output(res); + } + Command::ReadIndex { + output_kind, + filter, + } => { + if filter.conflicts_only || filter.tombstones { + return Err(Error::Message( + "conlicts-only and tombstones are invalid for read-index".into(), + )); + } + let res = client.read_index(filter.k2v_filter()).await?; + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + + let mut to_print = Vec::new(); + to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); + for (k, v) in res.items { + to_print.push(format!( + "{}\t{}\t{}\t{}\t{}", + k, v.entries, v.conflicts, v.values, v.bytes + )); + } + format_table(to_print); + } + } + Command::ReadRange { + partition_key, + output_kind, + filter, + } => { + let op = BatchReadOp { + partition_key: &partition_key, + filter: filter.k2v_filter(), + conflicts_only: filter.conflicts_only, + tombstones: filter.tombstones, + single_item: false, + }; + let mut res = client.read_batch(&[op]).await?; + let res = res.pop().unwrap(); + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + for (key, values) in res.items { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + } + } + Command::DeleteRange { + partition_key, + output_kind, + filter, + } => { + let op = BatchDeleteOp { + partition_key: &partition_key, + prefix: filter.prefix.as_deref(), + start: filter.start.as_deref(), + end: filter.end.as_deref(), + single_item: false, + }; + if filter.reverse + || filter.conflicts_only + || filter.tombstones + || filter.limit.is_some() + { + return Err(Error::Message( + "limit, conlicts-only, reverse and tombstones are invalid for delete-range" + .into(), + )); + } + + let res = client.delete_batch(&[op]).await?; + + if output_kind.json { + println!("{}", res[0]); + } else { + println!("deleted {} keys", res[0]); + } + } + } + + Ok(()) +} diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs new file mode 100644 index 00000000..37c221f2 --- /dev/null +++ b/src/k2v-client/error.rs @@ -0,0 +1,29 @@ +use std::borrow::Cow; + +use thiserror::Error; + +/// Errors returned by this crate +#[derive(Error, Debug)] +pub enum Error { + #[error("{0}, {1}: {2} (path = {3})")] + Remote( + http::StatusCode, + Cow<'static, str>, + Cow<'static, str>, + Cow<'static, str>, + ), + #[error("received invalid response: {0}")] + InvalidResponse(Cow<'static, str>), + #[error("not found")] + NotFound, + #[error("io error: {0}")] + IoError(#[from] std::io::Error), + #[error("rusoto tls error: {0}")] + RusotoTls(#[from] rusoto_core::request::TlsError), + #[error("rusoto http error: {0}")] + RusotoHttp(#[from] rusoto_core::HttpDispatchError), + #[error("deserialization error: {0}")] + Deserialization(#[from] serde_json::Error), + #[error("{0}")] + Message(Cow<'static, str>), +} diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs new file mode 100644 index 00000000..95974d7a --- /dev/null +++ b/src/k2v-client/lib.rs @@ -0,0 +1,611 @@ +use std::collections::BTreeMap; +use std::time::Duration; + +use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; +use http::status::StatusCode; +use http::HeaderMap; +use log::{debug, error}; + +use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; +use rusoto_credential::AwsCredentials; +use rusoto_signature::region::Region; +use rusoto_signature::signature::SignedRequest; +use serde::de::Error as DeError; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use tokio::io::AsyncReadExt; + +mod error; + +pub use error::Error; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); +const SERVICE: &str = "k2v"; +const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; + +/// Client used to query a K2V server. +pub struct K2vClient { + region: Region, + bucket: String, + creds: AwsCredentials, + client: HttpClient, +} + +impl K2vClient { + /// Create a new K2V client. + pub fn new( + region: Region, + bucket: String, + creds: AwsCredentials, + user_agent: Option, + ) -> Result { + let mut client = HttpClient::new()?; + if let Some(ua) = user_agent { + client.local_agent_prepend(ua); + } else { + client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); + } + Ok(K2vClient { + region, + bucket, + creds, + client, + }) + } + + /// Perform a ReadItem request, reading the value(s) stored for a single pk+sk. + pub async fn read_item( + &self, + partition_key: &str, + sort_key: &str, + ) -> Result { + let mut req = SignedRequest::new( + "GET", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_header(ACCEPT, "application/octet-stream, application/json"); + + let res = self.dispatch(req, None).await?; + + let causality = res + .causality_token + .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; + + if res.status == StatusCode::NO_CONTENT { + return Ok(CausalValue { + causality, + value: vec![K2vValue::Tombstone], + }); + } + + match res.content_type.as_deref() { + Some("application/octet-stream") => Ok(CausalValue { + causality, + value: vec![K2vValue::Value(res.body)], + }), + Some("application/json") => { + let value = serde_json::from_slice(&res.body)?; + Ok(CausalValue { causality, value }) + } + Some(ct) => Err(Error::InvalidResponse( + format!("invalid content type: {}", ct).into(), + )), + None => Err(Error::InvalidResponse("missing content type".into())), + } + } + + /// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be + /// updated. + pub async fn poll_item( + &self, + partition_key: &str, + sort_key: &str, + causality: CausalityToken, + timeout: Option, + ) -> Result, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let mut req = SignedRequest::new( + "GET", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_param("causality_token", &causality.0); + req.add_param("timeout", &timeout.as_secs().to_string()); + req.add_header(ACCEPT, "application/octet-stream, application/json"); + + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + let causality = res + .causality_token + .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + if res.status == StatusCode::NO_CONTENT { + return Ok(Some(CausalValue { + causality, + value: vec![K2vValue::Tombstone], + })); + } + + match res.content_type.as_deref() { + Some("application/octet-stream") => Ok(Some(CausalValue { + causality, + value: vec![K2vValue::Value(res.body)], + })), + Some("application/json") => { + let value = serde_json::from_slice(&res.body)?; + Ok(Some(CausalValue { causality, value })) + } + Some(ct) => Err(Error::InvalidResponse( + format!("invalid content type: {}", ct).into(), + )), + None => Err(Error::InvalidResponse("missing content type".into())), + } + } + + /// Perform an InsertItem request, inserting a value for a single pk+sk. + pub async fn insert_item( + &self, + partition_key: &str, + sort_key: &str, + value: Vec, + causality: Option, + ) -> Result<(), Error> { + let mut req = SignedRequest::new( + "PUT", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.set_payload(Some(value)); + + if let Some(causality) = causality { + req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + } + + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk. + pub async fn delete_item( + &self, + partition_key: &str, + sort_key: &str, + causality: CausalityToken, + ) -> Result<(), Error> { + let mut req = SignedRequest::new( + "DELETE", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("sort_key", sort_key); + req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a ReadIndex request, listing partition key which have at least one associated + /// sort key, and which matches the filter. + pub async fn read_index( + &self, + filter: Filter<'_>, + ) -> Result, Error> { + let mut req = + SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); + filter.insert_params(&mut req); + + let res = self.dispatch(req, None).await?; + + let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .partition_keys + .into_iter() + .map(|ReadIndexItem { pk, info }| (pk, info)) + .collect(); + + Ok(PaginatedRange { + items, + next_start: resp.next_start, + }) + } + + /// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is + /// *not* atomic: it is possible for some sub-operations to fails and others to success. In + /// that case, failure is reported. + pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + self.dispatch(req, None).await?; + Ok(()) + } + + /// Perform a ReadBatch request, reading multiple values or range of values at once. + pub async fn read_batch( + &self, + operations: &[BatchReadOp<'_>], + ) -> Result>, Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + req.add_param("search", ""); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, None).await?; + + let resp: Vec = serde_json::from_slice(&res.body)?; + + Ok(resp + .into_iter() + .map(|e| PaginatedRange { + items: e + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect(), + next_start: e.next_start, + }) + .collect()) + } + + /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without + /// providing causality information. + pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result, Error> { + let mut req = + SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); + req.add_param("delete", ""); + + let payload = serde_json::to_vec(operations)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, None).await?; + + let resp: Vec = serde_json::from_slice(&res.body)?; + + Ok(resp.into_iter().map(|r| r.deleted_items).collect()) + } + + async fn dispatch( + &self, + mut req: SignedRequest, + timeout: Option, + ) -> Result { + req.sign(&self.creds); + let mut res = self + .client + .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) + .await?; + + let causality_token = res + .headers + .remove(GARAGE_CAUSALITY_TOKEN) + .map(CausalityToken); + let content_type = res.headers.remove(CONTENT_TYPE); + + let body = match res.status { + StatusCode::OK => read_body(&mut res.headers, res.body).await?, + StatusCode::NO_CONTENT => Vec::new(), + StatusCode::NOT_FOUND => return Err(Error::NotFound), + StatusCode::NOT_MODIFIED => Vec::new(), + s => { + let err_body = read_body(&mut res.headers, res.body) + .await + .unwrap_or_default(); + let err_body_str = std::str::from_utf8(&err_body) + .map(String::from) + .unwrap_or_else(|_| base64::encode(&err_body)); + + if s.is_client_error() || s.is_server_error() { + error!("Error response {}: {}", res.status, err_body_str); + let err = match serde_json::from_slice::(&err_body) { + Ok(err) => Error::Remote( + res.status, + err.code.into(), + err.message.into(), + err.path.into(), + ), + Err(_) => Error::Remote( + res.status, + "unknown".into(), + err_body_str.into(), + "?".into(), + ), + }; + return Err(err); + } else { + let msg = format!( + "Unexpected response code {}. Response body: {}", + res.status, err_body_str + ); + error!("{}", msg); + return Err(Error::InvalidResponse(msg.into())); + } + } + }; + debug!( + "Response body: {}", + std::str::from_utf8(&body) + .map(String::from) + .unwrap_or_else(|_| base64::encode(&body)) + ); + + Ok(Response { + body, + status: res.status, + causality_token, + content_type, + }) + } +} + +async fn read_body(headers: &mut HeaderMap, body: ByteStream) -> Result, Error> { + let body_len = headers + .get(CONTENT_LENGTH) + .and_then(|h| h.parse().ok()) + .unwrap_or(0); + let mut res = Vec::with_capacity(body_len); + body.into_async_read().read_to_end(&mut res).await?; + Ok(res) +} + +/// An opaque token used to convey causality between operations. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[serde(transparent)] +pub struct CausalityToken(String); + +impl From for CausalityToken { + fn from(v: String) -> Self { + CausalityToken(v) + } +} + +impl From for String { + fn from(v: CausalityToken) -> Self { + v.0 + } +} + +/// A value in K2V. can be either a binary value, or a tombstone. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum K2vValue { + Tombstone, + Value(Vec), +} + +impl From> for K2vValue { + fn from(v: Vec) -> Self { + K2vValue::Value(v) + } +} + +impl From>> for K2vValue { + fn from(v: Option>) -> Self { + match v { + Some(v) => K2vValue::Value(v), + None => K2vValue::Tombstone, + } + } +} + +impl<'de> Deserialize<'de> for K2vValue { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: Option<&str> = Option::deserialize(d)?; + Ok(match val { + Some(s) => { + K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) + } + None => K2vValue::Tombstone, + }) + } +} + +impl Serialize for K2vValue { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + K2vValue::Tombstone => serializer.serialize_none(), + K2vValue::Value(v) => { + let b64 = base64::encode(v); + serializer.serialize_str(&b64) + } + } + } +} + +/// A set of K2vValue and associated causality information. +#[derive(Debug, Clone, Serialize)] +pub struct CausalValue { + pub causality: CausalityToken, + pub value: Vec, +} + +/// Result of paginated requests. +#[derive(Debug, Clone)] +pub struct PaginatedRange { + pub items: BTreeMap, + pub next_start: Option, +} + +/// Filter for batch operations. +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +pub struct Filter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, + pub limit: Option, + #[serde(default)] + pub reverse: bool, +} + +impl<'a> Filter<'a> { + fn insert_params(&self, req: &mut SignedRequest) { + if let Some(start) = &self.start { + req.add_param("start", start); + } + if let Some(end) = &self.end { + req.add_param("end", end); + } + if let Some(prefix) = &self.prefix { + req.add_param("prefix", prefix); + } + if let Some(limit) = &self.limit { + req.add_param("limit", &limit.to_string()); + } + if self.reverse { + req.add_param("reverse", "true"); + } + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReadIndexResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + filter: Filter<'a>, + partition_keys: Vec, + #[allow(dead_code)] + more: bool, + next_start: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct ReadIndexItem { + pk: String, + #[serde(flatten)] + info: PartitionInfo, +} + +/// Information about data stored with a given partition key. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct PartitionInfo { + pub entries: u64, + pub conflicts: u64, + pub values: u64, + pub bytes: u64, +} + +/// Single sub-operation of an InsertBatch. +#[derive(Debug, Clone, Serialize)] +pub struct BatchInsertOp<'a> { + #[serde(rename = "pk")] + pub partition_key: &'a str, + #[serde(rename = "sk")] + pub sort_key: &'a str, + #[serde(rename = "ct")] + pub causality: Option, + #[serde(rename = "v")] + pub value: K2vValue, +} + +/// Single sub-operation of a ReadBatch. +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BatchReadOp<'a> { + pub partition_key: &'a str, + #[serde(flatten, borrow)] + pub filter: Filter<'a>, + #[serde(default)] + pub single_item: bool, + #[serde(default)] + pub conflicts_only: bool, + #[serde(default)] + pub tombstones: bool, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BatchReadResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + op: BatchReadOp<'a>, + items: Vec, + #[allow(dead_code)] + more: bool, + next_start: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct BatchReadItem { + sk: String, + ct: CausalityToken, + v: Vec, +} + +/// Single sub-operation of a DeleteBatch +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BatchDeleteOp<'a> { + pub partition_key: &'a str, + pub prefix: Option<&'a str>, + pub start: Option<&'a str>, + pub end: Option<&'a str>, + #[serde(default)] + pub single_item: bool, +} + +impl<'a> BatchDeleteOp<'a> { + pub fn new(partition_key: &'a str) -> Self { + BatchDeleteOp { + partition_key, + prefix: None, + start: None, + end: None, + single_item: false, + } + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BatchDeleteResponse<'a> { + #[serde(flatten, borrow)] + #[allow(dead_code)] + filter: BatchDeleteOp<'a>, + deleted_items: u64, +} + +#[derive(Deserialize)] +struct ErrorResponse { + code: String, + message: String, + #[allow(dead_code)] + region: String, + path: String, +} + +struct Response { + body: Vec, + status: StatusCode, + causality_token: Option, + content_type: Option, +} diff --git a/src/k2v-client/src/bin/k2v-cli.rs b/src/k2v-client/src/bin/k2v-cli.rs deleted file mode 100644 index 38c39361..00000000 --- a/src/k2v-client/src/bin/k2v-cli.rs +++ /dev/null @@ -1,466 +0,0 @@ -use k2v_client::*; - -use garage_util::formater::format_table; - -use rusoto_core::credential::AwsCredentials; -use rusoto_core::Region; - -use clap::{Parser, Subcommand}; - -/// K2V command line interface -#[derive(Parser, Debug)] -#[clap(author, version, about, long_about = None)] -struct Args { - /// Name of the region to use - #[clap(short, long, env = "AWS_REGION", default_value = "garage")] - region: String, - /// Url of the endpoint to connect to - #[clap(short, long, env = "K2V_ENDPOINT")] - endpoint: String, - /// Access key ID - #[clap(short, long, env = "AWS_ACCESS_KEY_ID")] - key_id: String, - /// Access key ID - #[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")] - secret: String, - /// Bucket name - #[clap(short, long, env = "K2V_BUCKET")] - bucket: String, - #[clap(subcommand)] - command: Command, -} - -#[derive(Subcommand, Debug)] -enum Command { - /// Insert a single value - Insert { - /// Partition key to insert to - partition_key: String, - /// Sort key to insert to - sort_key: String, - /// Causality of the insertion - #[clap(short, long)] - causality: Option, - /// Value to insert - #[clap(flatten)] - value: Value, - }, - /// Read a single value - Read { - /// Partition key to read from - partition_key: String, - /// Sort key to read from - sort_key: String, - /// Output formating - #[clap(flatten)] - output_kind: ReadOutputKind, - }, - /// Delete a single value - Delete { - /// Partition key to delete from - partition_key: String, - /// Sort key to delete from - sort_key: String, - /// Causality information - #[clap(short, long)] - causality: String, - }, - /// List partition keys - ReadIndex { - /// Output formating - #[clap(flatten)] - output_kind: BatchOutputKind, - /// Output only partition keys matching this filter - #[clap(flatten)] - filter: Filter, - }, - /// Read a range of sort keys - ReadRange { - /// Partition key to read from - partition_key: String, - /// Output formating - #[clap(flatten)] - output_kind: BatchOutputKind, - /// Output only sort keys matching this filter - #[clap(flatten)] - filter: Filter, - }, - /// Delete a range of sort keys - DeleteRange { - /// Partition key to delete from - partition_key: String, - /// Output formating - #[clap(flatten)] - output_kind: BatchOutputKind, - /// Delete only sort keys matching this filter - #[clap(flatten)] - filter: Filter, - }, -} - -/// Where to read a value from -#[derive(Parser, Debug)] -#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))] -struct Value { - /// Read value from a file. use - to read from stdin - #[clap(short, long, group = "value")] - file: Option, - /// Read a base64 value from commandline - #[clap(short, long, group = "value")] - b64: Option, - /// Read a raw (UTF-8) value from the commandline - #[clap(short, long, group = "value")] - text: Option, -} - -impl Value { - async fn to_data(&self) -> Result, Error> { - if let Some(ref text) = self.text { - Ok(text.as_bytes().to_vec()) - } else if let Some(ref b64) = self.b64 { - base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) - } else if let Some(ref path) = self.file { - use tokio::io::AsyncReadExt; - if path == "-" { - let mut file = tokio::io::stdin(); - let mut vec = Vec::new(); - file.read_to_end(&mut vec).await?; - Ok(vec) - } else { - let mut file = tokio::fs::File::open(path).await?; - let mut vec = Vec::new(); - file.read_to_end(&mut vec).await?; - Ok(vec) - } - } else { - unreachable!("Value must have one option set") - } - } -} - -#[derive(Parser, Debug)] -#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] -struct ReadOutputKind { - /// Base64 output. Conflicts are line separated, first line is causality token - #[clap(short, long, group = "output-kind")] - b64: bool, - /// Raw output. Conflicts generate error, causality token is not returned - #[clap(short, long, group = "output-kind")] - raw: bool, - /// Human formated output - #[clap(short = 'H', long, group = "output-kind")] - human: bool, - /// JSON formated output - #[clap(short, long, group = "output-kind")] - json: bool, -} - -impl ReadOutputKind { - fn display_output(&self, val: CausalValue) -> ! { - use std::io::Write; - use std::process::exit; - - if self.json { - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &val).unwrap(); - exit(0); - } - - if self.raw { - let mut val = val.value; - if val.len() != 1 { - eprintln!( - "Raw mode can only read non-concurent values, found {} values, expected 1", - val.len() - ); - exit(1); - } - let val = val.pop().unwrap(); - match val { - K2vValue::Value(v) => { - std::io::stdout().write_all(&v).unwrap(); - exit(0); - } - K2vValue::Tombstone => { - eprintln!("Expected value, found tombstone"); - exit(2); - } - } - } - - let causality: String = val.causality.into(); - if self.b64 { - println!("{}", causality); - for val in val.value { - match val { - K2vValue::Value(v) => { - println!("{}", base64::encode(&v)) - } - K2vValue::Tombstone => { - println!(); - } - } - } - exit(0); - } - - // human - println!("causality: {}", causality); - println!("values:"); - for val in val.value { - match val { - K2vValue::Value(v) => { - if let Ok(string) = std::str::from_utf8(&v) { - println!(" utf-8: {}", string); - } else { - println!(" base64: {}", base64::encode(&v)); - } - } - K2vValue::Tombstone => { - println!(" tombstone"); - } - } - } - exit(0); - } -} - -#[derive(Parser, Debug)] -#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] -struct BatchOutputKind { - /// Human formated output - #[clap(short = 'H', long, group = "output-kind")] - human: bool, - /// JSON formated output - #[clap(short, long, group = "output-kind")] - json: bool, -} - -/// Filter for batch operations -#[derive(Parser, Debug)] -#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] -struct Filter { - /// Match only keys starting with this prefix - #[clap(short, long, group = "filter")] - prefix: Option, - /// Match only keys lexicographically after this key (including this key itself) - #[clap(short, long, group = "filter")] - start: Option, - /// Match only keys lexicographically before this key (excluding this key) - #[clap(short, long, group = "filter")] - end: Option, - /// Only match the first X keys - #[clap(short, long)] - limit: Option, - /// Return keys in reverse order - #[clap(short, long)] - reverse: bool, - /// Return only keys where conflict happened - #[clap(short, long)] - conflicts_only: bool, - /// Also include keys storing only tombstones - #[clap(short, long)] - tombstones: bool, - /// Return any key - #[clap(short, long, group = "filter")] - all: bool, -} - -impl Filter { - fn k2v_filter(&self) -> k2v_client::Filter<'_> { - k2v_client::Filter { - start: self.start.as_deref(), - end: self.end.as_deref(), - prefix: self.prefix.as_deref(), - limit: self.limit, - reverse: self.reverse, - } - } -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - let args = Args::parse(); - - let region = Region::Custom { - name: args.region, - endpoint: args.endpoint, - }; - - let creds = AwsCredentials::new(args.key_id, args.secret, None, None); - - let client = K2vClient::new(region, args.bucket, creds, None)?; - - match args.command { - Command::Insert { - partition_key, - sort_key, - causality, - value, - } => { - client - .insert_item( - &partition_key, - &sort_key, - value.to_data().await?, - causality.map(Into::into), - ) - .await?; - } - Command::Delete { - partition_key, - sort_key, - causality, - } => { - client - .delete_item(&partition_key, &sort_key, causality.into()) - .await?; - } - Command::Read { - partition_key, - sort_key, - output_kind, - } => { - let res = client.read_item(&partition_key, &sort_key).await?; - output_kind.display_output(res); - } - Command::ReadIndex { - output_kind, - filter, - } => { - if filter.conflicts_only || filter.tombstones { - return Err(Error::Message( - "conlicts-only and tombstones are invalid for read-index".into(), - )); - } - let res = client.read_index(filter.k2v_filter()).await?; - if output_kind.json { - let values = res - .items - .into_iter() - .map(|(k, v)| { - let mut value = serde_json::to_value(v).unwrap(); - value - .as_object_mut() - .unwrap() - .insert("sort_key".to_owned(), k.into()); - value - }) - .collect::>(); - let json = serde_json::json!({ - "next_key": res.next_start, - "values": values, - }); - - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &json).unwrap(); - } else { - if let Some(next) = res.next_start { - println!("next key: {}", next); - } - - let mut to_print = Vec::new(); - to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); - for (k, v) in res.items { - to_print.push(format!( - "{}\t{}\t{}\t{}\t{}", - k, v.entries, v.conflicts, v.values, v.bytes - )); - } - format_table(to_print); - } - } - Command::ReadRange { - partition_key, - output_kind, - filter, - } => { - let op = BatchReadOp { - partition_key: &partition_key, - filter: filter.k2v_filter(), - conflicts_only: filter.conflicts_only, - tombstones: filter.tombstones, - single_item: false, - }; - let mut res = client.read_batch(&[op]).await?; - let res = res.pop().unwrap(); - if output_kind.json { - let values = res - .items - .into_iter() - .map(|(k, v)| { - let mut value = serde_json::to_value(v).unwrap(); - value - .as_object_mut() - .unwrap() - .insert("sort_key".to_owned(), k.into()); - value - }) - .collect::>(); - let json = serde_json::json!({ - "next_key": res.next_start, - "values": values, - }); - - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &json).unwrap(); - } else { - if let Some(next) = res.next_start { - println!("next key: {}", next); - } - for (key, values) in res.items { - println!("key: {}", key); - let causality: String = values.causality.into(); - println!("causality: {}", causality); - for value in values.value { - match value { - K2vValue::Value(v) => { - if let Ok(string) = std::str::from_utf8(&v) { - println!(" value(utf-8): {}", string); - } else { - println!(" value(base64): {}", base64::encode(&v)); - } - } - K2vValue::Tombstone => { - println!(" tombstone"); - } - } - } - } - } - } - Command::DeleteRange { - partition_key, - output_kind, - filter, - } => { - let op = BatchDeleteOp { - partition_key: &partition_key, - prefix: filter.prefix.as_deref(), - start: filter.start.as_deref(), - end: filter.end.as_deref(), - single_item: false, - }; - if filter.reverse - || filter.conflicts_only - || filter.tombstones - || filter.limit.is_some() - { - return Err(Error::Message( - "limit, conlicts-only, reverse and tombstones are invalid for delete-range" - .into(), - )); - } - - let res = client.delete_batch(&[op]).await?; - - if output_kind.json { - println!("{}", res[0]); - } else { - println!("deleted {} keys", res[0]); - } - } - } - - Ok(()) -} diff --git a/src/k2v-client/src/error.rs b/src/k2v-client/src/error.rs deleted file mode 100644 index 62357934..00000000 --- a/src/k2v-client/src/error.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::borrow::Cow; - -use thiserror::Error; - -/// Errors returned by this crate -#[derive(Error, Debug)] -pub enum Error { - #[error("received invalid response: {0}")] - InvalidResponse(Cow<'static, str>), - #[error("not found")] - NotFound, - #[error("io error: {0}")] - IoError(#[from] std::io::Error), - #[error("rusoto tls error: {0}")] - RusotoTls(#[from] rusoto_core::request::TlsError), - #[error("rusoto http error: {0}")] - RusotoHttp(#[from] rusoto_core::HttpDispatchError), - #[error("deserialization error: {0}")] - Deserialization(#[from] serde_json::Error), - #[error("{0}")] - Message(Cow<'static, str>), -} diff --git a/src/k2v-client/src/lib.rs b/src/k2v-client/src/lib.rs deleted file mode 100644 index ba1cd6ea..00000000 --- a/src/k2v-client/src/lib.rs +++ /dev/null @@ -1,566 +0,0 @@ -use std::collections::BTreeMap; -use std::time::Duration; - -use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; -use http::status::StatusCode; -use http::HeaderMap; - -use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; -use rusoto_credential::AwsCredentials; -use rusoto_signature::region::Region; -use rusoto_signature::signature::SignedRequest; -use serde::de::Error as DeError; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; - -use tokio::io::AsyncReadExt; - -mod error; - -pub use error::Error; - -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); -const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); -const SERVICE: &str = "k2v"; -const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; - -/// Client used to query a K2V server. -pub struct K2vClient { - region: Region, - bucket: String, - creds: AwsCredentials, - client: HttpClient, -} - -impl K2vClient { - /// Create a new K2V client. - pub fn new( - region: Region, - bucket: String, - creds: AwsCredentials, - user_agent: Option, - ) -> Result { - let mut client = HttpClient::new()?; - if let Some(ua) = user_agent { - client.local_agent_prepend(ua); - } else { - client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); - } - Ok(K2vClient { - region, - bucket, - creds, - client, - }) - } - - /// Perform a ReadItem request, reading the value(s) stored for a single pk+sk. - pub async fn read_item( - &self, - partition_key: &str, - sort_key: &str, - ) -> Result { - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(ACCEPT, "application/octet-stream, application/json"); - - let res = self.dispatch(req, None).await?; - - let causality = res - .causality_token - .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; - - if res.status == StatusCode::NO_CONTENT { - return Ok(CausalValue { - causality, - value: vec![K2vValue::Tombstone], - }); - } - - match res.content_type.as_deref() { - Some("application/octet-stream") => Ok(CausalValue { - causality, - value: vec![K2vValue::Value(res.body)], - }), - Some("application/json") => { - let value = serde_json::from_slice(&res.body)?; - Ok(CausalValue { causality, value }) - } - Some(ct) => Err(Error::InvalidResponse( - format!("invalid content type: {}", ct).into(), - )), - None => Err(Error::InvalidResponse("missing content type".into())), - } - } - - /// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be - /// updated. - pub async fn poll_item( - &self, - partition_key: &str, - sort_key: &str, - causality: CausalityToken, - timeout: Option, - ) -> Result, Error> { - let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); - - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_param("causality_token", &causality.0); - req.add_param("timeout", &timeout.as_secs().to_string()); - req.add_header(ACCEPT, "application/octet-stream, application/json"); - - let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; - - let causality = res - .causality_token - .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; - - if res.status == StatusCode::NOT_MODIFIED { - return Ok(None); - } - - if res.status == StatusCode::NO_CONTENT { - return Ok(Some(CausalValue { - causality, - value: vec![K2vValue::Tombstone], - })); - } - - match res.content_type.as_deref() { - Some("application/octet-stream") => Ok(Some(CausalValue { - causality, - value: vec![K2vValue::Value(res.body)], - })), - Some("application/json") => { - let value = serde_json::from_slice(&res.body)?; - Ok(Some(CausalValue { causality, value })) - } - Some(ct) => Err(Error::InvalidResponse( - format!("invalid content type: {}", ct).into(), - )), - None => Err(Error::InvalidResponse("missing content type".into())), - } - } - - /// Perform an InsertItem request, inserting a value for a single pk+sk. - pub async fn insert_item( - &self, - partition_key: &str, - sort_key: &str, - value: Vec, - causality: Option, - ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "PUT", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.set_payload(Some(value)); - - if let Some(causality) = causality { - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); - } - - self.dispatch(req, None).await?; - Ok(()) - } - - /// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk. - pub async fn delete_item( - &self, - partition_key: &str, - sort_key: &str, - causality: CausalityToken, - ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "DELETE", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); - - self.dispatch(req, None).await?; - Ok(()) - } - - /// Perform a ReadIndex request, listing partition key which have at least one associated - /// sort key, and which matches the filter. - pub async fn read_index( - &self, - filter: Filter<'_>, - ) -> Result, Error> { - let mut req = - SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); - filter.insert_params(&mut req); - - let res = self.dispatch(req, None).await?; - - let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?; - - let items = resp - .partition_keys - .into_iter() - .map(|ReadIndexItem { pk, info }| (pk, info)) - .collect(); - - Ok(PaginatedRange { - items, - next_start: resp.next_start, - }) - } - - /// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is - /// *not* atomic: it is possible for some sub-operations to fails and others to success. In - /// that case, failure is reported. - pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - - let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); - self.dispatch(req, None).await?; - Ok(()) - } - - /// Perform a ReadBatch request, reading multiple values or range of values at once. - pub async fn read_batch( - &self, - operations: &[BatchReadOp<'_>], - ) -> Result>, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("search", ""); - - let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); - let res = self.dispatch(req, None).await?; - - let resp: Vec = serde_json::from_slice(&res.body)?; - - Ok(resp - .into_iter() - .map(|e| PaginatedRange { - items: e - .items - .into_iter() - .map(|BatchReadItem { sk, ct, v }| { - ( - sk, - CausalValue { - causality: ct, - value: v, - }, - ) - }) - .collect(), - next_start: e.next_start, - }) - .collect()) - } - - /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without - /// providing causality information. - pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("delete", ""); - - let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); - let res = self.dispatch(req, None).await?; - - let resp: Vec = serde_json::from_slice(&res.body)?; - - Ok(resp.into_iter().map(|r| r.deleted_items).collect()) - } - - async fn dispatch( - &self, - mut req: SignedRequest, - timeout: Option, - ) -> Result { - req.sign(&self.creds); - let mut res = self - .client - .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) - .await?; - - let causality_token = res - .headers - .remove(GARAGE_CAUSALITY_TOKEN) - .map(CausalityToken); - let content_type = res.headers.remove(CONTENT_TYPE); - - let body = match res.status { - StatusCode::OK => read_body(&mut res.headers, res.body).await?, - StatusCode::NO_CONTENT => Vec::new(), - StatusCode::NOT_FOUND => return Err(Error::NotFound), - StatusCode::NOT_MODIFIED => Vec::new(), - _ => { - return Err(Error::InvalidResponse( - format!("invalid error code: {}", res.status).into(), - )) - } - }; - - Ok(Response { - body, - status: res.status, - causality_token, - content_type, - }) - } -} - -async fn read_body(headers: &mut HeaderMap, body: ByteStream) -> Result, Error> { - let body_len = headers - .get(CONTENT_LENGTH) - .and_then(|h| h.parse().ok()) - .unwrap_or(0); - let mut res = Vec::with_capacity(body_len); - body.into_async_read().read_to_end(&mut res).await?; - Ok(res) -} - -/// An opaque token used to convey causality between operations. -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] -#[serde(transparent)] -pub struct CausalityToken(String); - -impl From for CausalityToken { - fn from(v: String) -> Self { - CausalityToken(v) - } -} - -impl From for String { - fn from(v: CausalityToken) -> Self { - v.0 - } -} - -/// A value in K2V. can be either a binary value, or a tombstone. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum K2vValue { - Tombstone, - Value(Vec), -} - -impl From> for K2vValue { - fn from(v: Vec) -> Self { - K2vValue::Value(v) - } -} - -impl From>> for K2vValue { - fn from(v: Option>) -> Self { - match v { - Some(v) => K2vValue::Value(v), - None => K2vValue::Tombstone, - } - } -} - -impl<'de> Deserialize<'de> for K2vValue { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let val: Option<&str> = Option::deserialize(d)?; - Ok(match val { - Some(s) => { - K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) - } - None => K2vValue::Tombstone, - }) - } -} - -impl Serialize for K2vValue { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - K2vValue::Tombstone => serializer.serialize_none(), - K2vValue::Value(v) => { - let b64 = base64::encode(v); - serializer.serialize_str(&b64) - } - } - } -} - -/// A set of K2vValue and associated causality information. -#[derive(Debug, Clone, Serialize)] -pub struct CausalValue { - pub causality: CausalityToken, - pub value: Vec, -} - -/// Result of paginated requests. -#[derive(Debug, Clone)] -pub struct PaginatedRange { - pub items: BTreeMap, - pub next_start: Option, -} - -/// Filter for batch operations. -#[derive(Debug, Default, Clone, Deserialize, Serialize)] -pub struct Filter<'a> { - pub start: Option<&'a str>, - pub end: Option<&'a str>, - pub prefix: Option<&'a str>, - pub limit: Option, - #[serde(default)] - pub reverse: bool, -} - -impl<'a> Filter<'a> { - fn insert_params(&self, req: &mut SignedRequest) { - if let Some(start) = &self.start { - req.add_param("start", start); - } - if let Some(end) = &self.end { - req.add_param("end", end); - } - if let Some(prefix) = &self.prefix { - req.add_param("prefix", prefix); - } - if let Some(limit) = &self.limit { - req.add_param("limit", &limit.to_string()); - } - if self.reverse { - req.add_param("reverse", "true"); - } - } -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -struct ReadIndexResponse<'a> { - #[serde(flatten, borrow)] - #[allow(dead_code)] - filter: Filter<'a>, - partition_keys: Vec, - #[allow(dead_code)] - more: bool, - next_start: Option, -} - -#[derive(Debug, Clone, Deserialize)] -struct ReadIndexItem { - pk: String, - #[serde(flatten)] - info: PartitionInfo, -} - -/// Information about data stored with a given partition key. -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct PartitionInfo { - pub entries: u64, - pub conflicts: u64, - pub values: u64, - pub bytes: u64, -} - -/// Single sub-operation of an InsertBatch. -#[derive(Debug, Clone, Serialize)] -pub struct BatchInsertOp<'a> { - #[serde(rename = "pk")] - pub partition_key: &'a str, - #[serde(rename = "sk")] - pub sort_key: &'a str, - #[serde(rename = "ct")] - pub causality: Option, - #[serde(rename = "v")] - pub value: K2vValue, -} - -/// Single sub-operation of a ReadBatch. -#[derive(Debug, Default, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct BatchReadOp<'a> { - pub partition_key: &'a str, - #[serde(flatten, borrow)] - pub filter: Filter<'a>, - #[serde(default)] - pub single_item: bool, - #[serde(default)] - pub conflicts_only: bool, - #[serde(default)] - pub tombstones: bool, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -struct BatchReadResponse<'a> { - #[serde(flatten, borrow)] - #[allow(dead_code)] - op: BatchReadOp<'a>, - items: Vec, - #[allow(dead_code)] - more: bool, - next_start: Option, -} - -#[derive(Debug, Clone, Deserialize)] -struct BatchReadItem { - sk: String, - ct: CausalityToken, - v: Vec, -} - -/// Single sub-operation of a DeleteBatch -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct BatchDeleteOp<'a> { - pub partition_key: &'a str, - pub prefix: Option<&'a str>, - pub start: Option<&'a str>, - pub end: Option<&'a str>, - #[serde(default)] - pub single_item: bool, -} - -impl<'a> BatchDeleteOp<'a> { - pub fn new(partition_key: &'a str) -> Self { - BatchDeleteOp { - partition_key, - prefix: None, - start: None, - end: None, - single_item: false, - } - } -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -struct BatchDeleteResponse<'a> { - #[serde(flatten, borrow)] - #[allow(dead_code)] - filter: BatchDeleteOp<'a>, - deleted_items: u64, -} - -struct Response { - body: Vec, - status: StatusCode, - causality_token: Option, - content_type: Option, -} -- cgit v1.2.3