diff options
Diffstat (limited to 'src/k2v-client')
-rw-r--r-- | src/k2v-client/Cargo.toml | 22 | ||||
-rw-r--r-- | src/k2v-client/bin/k2v-cli.rs | 62 | ||||
-rw-r--r-- | src/k2v-client/error.rs | 16 | ||||
-rw-r--r-- | src/k2v-client/lib.rs | 333 |
4 files changed, 254 insertions, 179 deletions
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 52c16d89..2ccb9fe5 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k2v-client" -version = "0.1.1" +version = "0.0.4" authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -10,24 +10,28 @@ readme = "../../README.md" [dependencies] base64 = "0.21" +sha2 = "0.10" +hex = "0.4" http = "0.2" log = "0.4" -rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } -rusoto_credential = "0.48.0" -rusoto_signature = "0.48.0" -hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] } -serde = "1.0" +aws-sigv4 = "0.55" +percent-encoding = "2.2" +hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] } +hyper-rustls = { version = "0.24", features = ["http2"] } +serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" thiserror = "1.0" -tokio = "1.24" +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } # cli deps clap = { version = "4.1", optional = true, features = ["derive", "env"] } -garage_util = { version = "0.8.2", path = "../util", optional = true } +format_table = { workspace = true, optional = true } +tracing = { version = "0.1", optional = true } +tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] } [features] -cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"] +cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"] [lib] path = "lib.rs" diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs index cdd63cce..b9461c89 100644 --- a/src/k2v-client/bin/k2v-cli.rs +++ b/src/k2v-client/bin/k2v-cli.rs @@ -2,12 +2,11 @@ use std::collections::BTreeMap; use std::process::exit; use std::time::Duration; -use k2v_client::*; +use base64::prelude::*; -use garage_util::formater::format_table; +use k2v_client::*; -use rusoto_core::credential::AwsCredentials; -use rusoto_core::Region; +use format_table::format_table; use clap::{Parser, Subcommand}; @@ -155,7 +154,9 @@ impl Value { if let Some(ref text) = self.text { Ok(text.as_bytes().to_vec()) } else if let Some(ref b64) = self.b64 { - base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) + BASE64_STANDARD + .decode(b64) + .map_err(|_| Error::Message("invalid base64 input".into())) } else if let Some(ref path) = self.file { use tokio::io::AsyncReadExt; if path == "-" { @@ -230,7 +231,7 @@ impl ReadOutputKind { for val in val.value { match val { K2vValue::Value(v) => { - println!("{}", base64::encode(&v)) + println!("{}", BASE64_STANDARD.encode(&v)) } K2vValue::Tombstone => { println!(); @@ -249,7 +250,7 @@ impl ReadOutputKind { if let Ok(string) = std::str::from_utf8(&v) { println!(" utf-8: {}", string); } else { - println!(" base64: {}", base64::encode(&v)); + println!(" base64: {}", BASE64_STANDARD.encode(&v)); } } K2vValue::Tombstone => { @@ -275,7 +276,7 @@ struct BatchOutputKind { impl BatchOutputKind { fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! { for (key, values) in values { - println!("key: {}", key); + println!("sort_key: {}", key); let causality: String = values.causality.into(); println!("causality: {}", causality); for value in values.value { @@ -284,7 +285,7 @@ impl BatchOutputKind { if let Ok(string) = std::str::from_utf8(&v) { println!(" value(utf-8): {}", string); } else { - println!(" value(base64): {}", base64::encode(&v)); + println!(" value(base64): {}", BASE64_STANDARD.encode(&v)); } } K2vValue::Tombstone => { @@ -310,23 +311,19 @@ impl BatchOutputKind { .collect::<Vec<_>>() } - fn display_poll_range_output( - &self, - seen_marker: String, - values: BTreeMap<String, CausalValue>, - ) -> ! { + fn display_poll_range_output(&self, poll_range: PollRangeResult) -> ! { if self.json { let json = serde_json::json!({ - "values": self.values_json(values), - "seen_marker": seen_marker, + "values": self.values_json(poll_range.items), + "seen_marker": poll_range.seen_marker, }); let stdout = std::io::stdout(); serde_json::to_writer_pretty(stdout, &json).unwrap(); exit(0) } else { - println!("seen marker: {}", seen_marker); - self.display_human_output(values) + println!("seen marker: {}", poll_range.seen_marker); + self.display_human_output(poll_range.items) } } @@ -393,16 +390,27 @@ impl Filter { #[tokio::main] async fn main() -> Result<(), Error> { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "warn") + } + + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) + .init(); + let args = Args::parse(); - let region = Region::Custom { - name: args.region, + let config = K2vClientConfig { endpoint: args.endpoint, + region: args.region, + aws_access_key_id: args.key_id, + aws_secret_access_key: args.secret, + bucket: args.bucket, + user_agent: None, }; - let creds = AwsCredentials::new(args.key_id, args.secret, None, None); - - let client = K2vClient::new(region, args.bucket, creds, None)?; + let client = K2vClient::new(config)?; match args.command { Command::Insert { @@ -489,8 +497,8 @@ async fn main() -> Result<(), Error> { ) .await?; match res { - Some((items, seen_marker)) => { - output_kind.display_poll_range_output(seen_marker, items); + Some(poll_range_output) => { + output_kind.display_poll_range_output(poll_range_output); } None => { if output_kind.json { @@ -520,7 +528,7 @@ async fn main() -> Result<(), Error> { value .as_object_mut() .unwrap() - .insert("sort_key".to_owned(), k.into()); + .insert("partition_key".to_owned(), k.into()); value }) .collect::<Vec<_>>(); @@ -537,7 +545,7 @@ async fn main() -> Result<(), Error> { } let mut to_print = Vec::new(); - to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); + to_print.push(format!("partition_key\tentries\tconflicts\tvalues\tbytes")); for (k, v) in res.items { to_print.push(format!( "{}\t{}\t{}\t{}\t{}", diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs index 37c221f2..564ce497 100644 --- a/src/k2v-client/error.rs +++ b/src/k2v-client/error.rs @@ -18,12 +18,20 @@ pub enum Error { NotFound, #[error("io error: {0}")] IoError(#[from] std::io::Error), - #[error("rusoto tls error: {0}")] - RusotoTls(#[from] rusoto_core::request::TlsError), - #[error("rusoto http error: {0}")] - RusotoHttp(#[from] rusoto_core::HttpDispatchError), + #[error("http error: {0}")] + Http(#[from] http::Error), + #[error("hyper error: {0}")] + Hyper(#[from] hyper::Error), + #[error("invalid header: {0}")] + Header(#[from] hyper::header::ToStrError), #[error("deserialization error: {0}")] Deserialization(#[from] serde_json::Error), + #[error("invalid signature parameters: {0}")] + SignParameters(#[from] aws_sigv4::signing_params::BuildError), + #[error("could not sign request: {0}")] + SignRequest(#[from] aws_sigv4::http_request::SigningError), + #[error("request timed out")] + Timeout, #[error("{0}")] Message(Cow<'static, str>), } diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index ca52d0cf..4aa7a20a 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -1,20 +1,23 @@ use std::collections::BTreeMap; -use std::time::Duration; +use std::convert::TryInto; +use std::time::{Duration, SystemTime}; -use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; -use http::status::StatusCode; -use http::HeaderMap; +use base64::prelude::*; use log::{debug, error}; +use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC}; + +use http::header::{ACCEPT, CONTENT_TYPE}; +use http::status::StatusCode; +use http::{HeaderName, HeaderValue, Request}; +use hyper::{body::Bytes, Body}; +use hyper::{client::connect::HttpConnector, Client as HttpClient}; +use hyper_rustls::HttpsConnector; + +use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings}; -use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; -use rusoto_credential::AwsCredentials; -use rusoto_signature::region::Region; -use rusoto_signature::signature::SignedRequest; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tokio::io::AsyncReadExt; - mod error; pub use error::Error; @@ -22,41 +25,57 @@ pub use error::Error; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const SERVICE: &str = "k2v"; -const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; +const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); +const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token"); + +const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC + .remove(b'_') + .remove(b'-') + .remove(b'.') + .remove(b'~'); +const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC + .remove(b'/') + .remove(b'_') + .remove(b'-') + .remove(b'.') + .remove(b'~'); + +pub struct K2vClientConfig { + pub endpoint: String, + pub region: String, + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, + pub user_agent: Option<String>, +} /// Client used to query a K2V server. pub struct K2vClient { - region: Region, - bucket: String, - creds: AwsCredentials, - client: HttpClient, + config: K2vClientConfig, + user_agent: HeaderValue, + client: HttpClient<HttpsConnector<HttpConnector>>, } impl K2vClient { /// Create a new K2V client. - pub fn new( - region: Region, - bucket: String, - creds: AwsCredentials, - user_agent: Option<String>, - ) -> Result<Self, Error> { + pub fn new(config: K2vClientConfig) -> Result<Self, Error> { let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() .https_or_http() .enable_http1() .enable_http2() .build(); - let mut client = HttpClient::from_connector(connector); - if let Some(ua) = user_agent { - client.local_agent_prepend(ua); - } else { - client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); - } + let client = HttpClient::builder().build(connector); + let user_agent: std::borrow::Cow<str> = match &config.user_agent { + Some(ua) => ua.into(), + None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), + }; + let user_agent = HeaderValue::from_str(&user_agent) + .map_err(|_| Error::Message("invalid user agent".into()))?; Ok(K2vClient { - region, - bucket, - creds, + config, client, + user_agent, }) } @@ -66,15 +85,10 @@ impl K2vClient { partition_key: &str, sort_key: &str, ) -> Result<CausalValue, Error> { - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(ACCEPT, "application/octet-stream, application/json"); - + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let req = Request::get(url) + .header(ACCEPT, "application/octet-stream, application/json") + .body(Bytes::new())?; let res = self.dispatch(req, None).await?; let causality = res @@ -91,7 +105,7 @@ impl K2vClient { match res.content_type.as_deref() { Some("application/octet-stream") => Ok(CausalValue { causality, - value: vec![K2vValue::Value(res.body)], + value: vec![K2vValue::Value(res.body.to_vec())], }), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; @@ -115,16 +129,17 @@ impl K2vClient { ) -> Result<Option<CausalValue>, Error> { let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); - let mut req = SignedRequest::new( - "GET", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), + let url = self.build_url( + Some(partition_key), + &[ + ("sort_key", sort_key), + ("causality_token", &causality.0), + ("timeout", &timeout.as_secs().to_string()), + ], ); - req.add_param("sort_key", sort_key); - req.add_param("causality_token", &causality.0); - req.add_param("timeout", &timeout.as_secs().to_string()); - req.add_header(ACCEPT, "application/octet-stream, application/json"); + let req = Request::get(url) + .header(ACCEPT, "application/octet-stream, application/json") + .body(Bytes::new())?; let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; @@ -146,7 +161,7 @@ impl K2vClient { match res.content_type.as_deref() { Some("application/octet-stream") => Ok(Some(CausalValue { causality, - value: vec![K2vValue::Value(res.body)], + value: vec![K2vValue::Value(res.body.to_vec())], })), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; @@ -167,7 +182,7 @@ impl K2vClient { filter: Option<PollRangeFilter<'_>>, seen_marker: Option<&str>, timeout: Option<Duration>, - ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> { + ) -> Result<Option<PollRangeResult>, Error> { let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); let request = PollRangeRequest { @@ -176,16 +191,10 @@ impl K2vClient { timeout: timeout.as_secs(), }; - let mut req = SignedRequest::new( - "POST", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("poll_range", ""); - + let url = self.build_url(Some(partition_key), &[("poll_range", "")]); let payload = serde_json::to_vec(&request)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(Bytes::from(payload))?; + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; if res.status == StatusCode::NOT_MODIFIED { @@ -208,7 +217,10 @@ impl K2vClient { }) .collect::<BTreeMap<_, _>>(); - Ok(Some((items, resp.seen_marker))) + Ok(Some(PollRangeResult { + items, + seen_marker: resp.seen_marker, + })) } /// Perform an InsertItem request, inserting a value for a single pk+sk. @@ -219,18 +231,12 @@ impl K2vClient { value: Vec<u8>, causality: Option<CausalityToken>, ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "PUT", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.set_payload(Some(value)); - + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let mut req = Request::put(url); if let Some(causality) = causality { - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0); } + let req = req.body(Bytes::from(value))?; self.dispatch(req, None).await?; Ok(()) @@ -243,14 +249,10 @@ impl K2vClient { sort_key: &str, causality: CausalityToken, ) -> Result<(), Error> { - let mut req = SignedRequest::new( - "DELETE", - SERVICE, - &self.region, - &format!("/{}/{}", self.bucket, partition_key), - ); - req.add_param("sort_key", sort_key); - req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); + let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]); + let req = Request::delete(url) + .header(GARAGE_CAUSALITY_TOKEN, &causality.0) + .body(Bytes::new())?; self.dispatch(req, None).await?; Ok(()) @@ -262,9 +264,9 @@ impl K2vClient { &self, filter: Filter<'_>, ) -> Result<PaginatedRange<PartitionInfo>, Error> { - let mut req = - SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); - filter.insert_params(&mut req); + let params = filter.query_params(); + let url = self.build_url(None, ¶ms); + let req = Request::get(url).body(Bytes::new())?; let res = self.dispatch(req, None).await?; @@ -286,11 +288,10 @@ impl K2vClient { /// *not* atomic: it is possible for some sub-operations to fails and others to success. In /// that case, failure is reported. pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - + let url = self.build_url::<&str>(None, &[]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + self.dispatch(req, None).await?; Ok(()) } @@ -300,12 +301,10 @@ impl K2vClient { &self, operations: &[BatchReadOp<'_>], ) -> Result<Vec<PaginatedRange<CausalValue>>, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("search", ""); - + let url = self.build_url(None, &[("search", "")]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + let res = self.dispatch(req, None).await?; let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?; @@ -334,12 +333,10 @@ impl K2vClient { /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without /// providing causality information. pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> { - let mut req = - SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); - req.add_param("delete", ""); - + let url = self.build_url(None, &[("delete", "")]); let payload = serde_json::to_vec(operations)?; - req.set_payload(Some(payload)); + let req = Request::post(url).body(payload.into())?; + let res = self.dispatch(req, None).await?; let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?; @@ -349,33 +346,67 @@ impl K2vClient { async fn dispatch( &self, - mut req: SignedRequest, + mut req: Request<Bytes>, timeout: Option<Duration>, ) -> Result<Response, Error> { - req.sign(&self.creds); - let mut res = self - .client - .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) - .await?; - - let causality_token = res - .headers - .remove(GARAGE_CAUSALITY_TOKEN) - .map(CausalityToken); - let content_type = res.headers.remove(CONTENT_TYPE); + req.headers_mut() + .insert(http::header::USER_AGENT, self.user_agent.clone()); + + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(req.body()); + let hash = hex::encode(&hasher.finalize()); + req.headers_mut() + .insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap()); + + debug!("request uri: {:?}", req.uri()); + + // Sign request + let signing_settings = SigningSettings::default(); + let signing_params = SigningParams::builder() + .access_key(&self.config.aws_access_key_id) + .secret_key(&self.config.aws_secret_access_key) + .region(&self.config.region) + .service_name(SERVICE) + .time(SystemTime::now()) + .settings(signing_settings) + .build()?; + // Convert the HTTP request into a signable request + let signable_request = SignableRequest::from(&req); + + // Sign and then apply the signature to the request + let (signing_instructions, _signature) = + sign(signable_request, &signing_params)?.into_parts(); + signing_instructions.apply_to_request(&mut req); + + // Send and wait for timeout + let res = tokio::select! { + res = self.client.request(req.map(Body::from)) => res?, + _ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => { + return Err(Error::Timeout); + } + }; + + let (mut res, body) = res.into_parts(); + let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) { + Some(v) => Some(CausalityToken(v.to_str()?.to_string())), + None => None, + }; + let content_type = match res.headers.remove(CONTENT_TYPE) { + Some(v) => Some(v.to_str()?.to_string()), + None => None, + }; let body = match res.status { - StatusCode::OK => read_body(&mut res.headers, res.body).await?, - StatusCode::NO_CONTENT => Vec::new(), + StatusCode::OK => hyper::body::to_bytes(body).await?, + StatusCode::NO_CONTENT => Bytes::new(), StatusCode::NOT_FOUND => return Err(Error::NotFound), - StatusCode::NOT_MODIFIED => Vec::new(), + StatusCode::NOT_MODIFIED => Bytes::new(), s => { - let err_body = read_body(&mut res.headers, res.body) - .await - .unwrap_or_default(); + let err_body = hyper::body::to_bytes(body).await.unwrap_or_default(); let err_body_str = std::str::from_utf8(&err_body) .map(String::from) - .unwrap_or_else(|_| base64::encode(&err_body)); + .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body)); if s.is_client_error() || s.is_server_error() { error!("Error response {}: {}", res.status, err_body_str); @@ -408,7 +439,7 @@ impl K2vClient { "Response body: {}", std::str::from_utf8(&body) .map(String::from) - .unwrap_or_else(|_| base64::encode(&body)) + .unwrap_or_else(|_| BASE64_STANDARD.encode(&body)) ); Ok(Response { @@ -418,16 +449,26 @@ impl K2vClient { content_type, }) } -} -async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> { - let body_len = headers - .get(CONTENT_LENGTH) - .and_then(|h| h.parse().ok()) - .unwrap_or(0); - let mut res = Vec::with_capacity(body_len); - body.into_async_read().read_to_end(&mut res).await?; - Ok(res) + fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String { + let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket); + if let Some(pk) = partition_key { + url.push('/'); + url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET)); + } + if !query.is_empty() { + url.push('?'); + for (i, (k, v)) in query.iter().enumerate() { + if i > 0 { + url.push('&'); + } + url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET)); + url.push('='); + url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)); + } + } + url + } } /// An opaque token used to convey causality between operations. @@ -482,9 +523,11 @@ impl<'de> Deserialize<'de> for K2vValue { { let val: Option<&str> = Option::deserialize(d)?; Ok(match val { - Some(s) => { - K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) - } + Some(s) => K2vValue::Value( + BASE64_STANDARD + .decode(s) + .map_err(|_| DeError::custom("invalid base64"))?, + ), None => K2vValue::Tombstone, }) } @@ -498,7 +541,7 @@ impl Serialize for K2vValue { match self { K2vValue::Tombstone => serializer.serialize_none(), K2vValue::Value(v) => { - let b64 = base64::encode(v); + let b64 = BASE64_STANDARD.encode(v); serializer.serialize_str(&b64) } } @@ -530,6 +573,7 @@ pub struct Filter<'a> { pub reverse: bool, } +/// Filter for a poll range operations. #[derive(Debug, Default, Clone, Serialize)] pub struct PollRangeFilter<'a> { pub start: Option<&'a str>, @@ -537,6 +581,15 @@ pub struct PollRangeFilter<'a> { pub prefix: Option<&'a str>, } +/// Response to a poll_range query +#[derive(Debug, Default, Clone, Serialize)] +pub struct PollRangeResult { + /// List of items that have changed since last PollRange call. + pub items: BTreeMap<String, CausalValue>, + /// opaque string representing items already seen for future PollRange calls. + pub seen_marker: String, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] struct PollRangeRequest<'a> { @@ -554,22 +607,24 @@ struct PollRangeResponse { } impl<'a> Filter<'a> { - fn insert_params(&self, req: &mut SignedRequest) { - if let Some(start) = &self.start { - req.add_param("start", start); + fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> { + let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8); + if let Some(start) = self.start.as_deref() { + res.push(("start", start.into())); } - if let Some(end) = &self.end { - req.add_param("end", end); + if let Some(end) = self.end.as_deref() { + res.push(("end", end.into())); } - if let Some(prefix) = &self.prefix { - req.add_param("prefix", prefix); + if let Some(prefix) = self.prefix.as_deref() { + res.push(("prefix", prefix.into())); } if let Some(limit) = &self.limit { - req.add_param("limit", &limit.to_string()); + res.push(("limit", limit.to_string().into())); } if self.reverse { - req.add_param("reverse", "true"); + res.push(("reverse", "true".into())); } + res } } @@ -691,7 +746,7 @@ struct ErrorResponse { } struct Response { - body: Vec<u8>, + body: Bytes, status: StatusCode, causality_token: Option<CausalityToken>, content_type: Option<String>, |