aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-05-22 09:03:08 +0000
committerAlex <alex@adnab.me>2023-05-22 09:03:08 +0000
commit9d833bb7efc2c166036db38da89b0b5ac8f466fe (patch)
tree827c751387fcd4424c4e9af467b23cf5514619e7 /src
parent03efc191c1697140d24c431e88bd4964c77823e5 (diff)
parentc3d3b837ebeb98e8639659183ae801b3a280de99 (diff)
downloadgarage-9d833bb7efc2c166036db38da89b0b5ac8f466fe.tar.gz
garage-9d833bb7efc2c166036db38da89b0b5ac8f466fe.zip
Merge pull request 'K2V-client improvements' (#577) from k2v-client-aws-sigv4 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/577
Diffstat (limited to 'src')
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/tests/common/mod.rs14
-rw-r--r--src/garage/tests/k2v_client/mod.rs1
-rw-r--r--src/garage/tests/k2v_client/simple.rs60
-rw-r--r--src/garage/tests/lib.rs2
-rw-r--r--src/k2v-client/Cargo.toml14
-rw-r--r--src/k2v-client/bin/k2v-cli.rs15
-rw-r--r--src/k2v-client/error.rs16
-rw-r--r--src/k2v-client/lib.rs301
9 files changed, 276 insertions, 149 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index e188cd2a..52d0ea79 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -73,6 +73,8 @@ assert-json-diff = "2.0"
serde_json = "1.0"
base64 = "0.21"
+k2v-client.workspace = true
+
[features]
default = [ "bundled-libs", "metrics", "sled", "k2v" ]
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index eca3e42b..0b8c6755 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -1,5 +1,6 @@
use aws_sdk_s3::{Client, Region};
use ext::*;
+use k2v_client::K2vClient;
#[macro_use]
pub mod macros;
@@ -68,6 +69,19 @@ impl Context {
bucket_name
}
+
+ /// Build a K2vClient for a given bucket
+ pub fn k2v_client(&self, bucket: &str) -> K2vClient {
+ let config = k2v_client::K2vClientConfig {
+ region: REGION.to_string(),
+ endpoint: self.garage.k2v_uri().to_string(),
+ aws_access_key_id: self.key.id.clone(),
+ aws_secret_access_key: self.key.secret.clone(),
+ bucket: bucket.to_string(),
+ user_agent: None,
+ };
+ K2vClient::new(config).expect("Could not create K2V client")
+ }
}
pub fn context() -> Context {
diff --git a/src/garage/tests/k2v_client/mod.rs b/src/garage/tests/k2v_client/mod.rs
new file mode 100644
index 00000000..b252f36b
--- /dev/null
+++ b/src/garage/tests/k2v_client/mod.rs
@@ -0,0 +1 @@
+pub mod simple;
diff --git a/src/garage/tests/k2v_client/simple.rs b/src/garage/tests/k2v_client/simple.rs
new file mode 100644
index 00000000..1a3118ef
--- /dev/null
+++ b/src/garage/tests/k2v_client/simple.rs
@@ -0,0 +1,60 @@
+use std::time::Duration;
+
+use k2v_client::*;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ k2v_client
+ .insert_item("root", "test1", b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item("root", "test1").await.unwrap();
+
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+}
+
+#[tokio::test]
+async fn test_special_chars() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ let (pk, sk) = ("root@plépp", "≤≤««");
+ k2v_client
+ .insert_item(pk, sk, b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item(pk, sk).await.unwrap();
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+
+ // sleep a bit before read_index
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = k2v_client.read_index(Default::default()).await.unwrap();
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), pk);
+
+ let res = k2v_client
+ .read_batch(&[BatchReadOp {
+ partition_key: pk,
+ filter: Default::default(),
+ single_item: false,
+ conflicts_only: false,
+ tombstones: false,
+ }])
+ .await
+ .unwrap();
+ assert_eq!(res.len(), 1);
+ let res = &res[0];
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), sk);
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 87be1327..e450baac 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -8,3 +8,5 @@ mod s3;
#[cfg(feature = "k2v")]
mod k2v;
+#[cfg(feature = "k2v")]
+mod k2v_client;
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 79af5242..2ccb9fe5 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.0.3"
+version = "0.0.4"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -10,13 +10,15 @@ readme = "../../README.md"
[dependencies]
base64 = "0.21"
+sha2 = "0.10"
+hex = "0.4"
http = "0.2"
log = "0.4"
-rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
-rusoto_credential = "0.48.0"
-rusoto_signature = "0.48.0"
-hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
-serde = "1.0"
+aws-sigv4 = "0.55"
+percent-encoding = "2.2"
+hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] }
+hyper-rustls = { version = "0.24", features = ["http2"] }
+serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index 984b4192..5a2422ab 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -8,9 +8,6 @@ use k2v_client::*;
use format_table::format_table;
-use rusoto_core::credential::AwsCredentials;
-use rusoto_core::Region;
-
use clap::{Parser, Subcommand};
/// K2V command line interface
@@ -408,14 +405,16 @@ async fn main() -> Result<(), Error> {
let args = Args::parse();
- let region = Region::Custom {
- name: args.region,
+ let config = K2vClientConfig {
endpoint: args.endpoint,
+ region: args.region,
+ aws_access_key_id: args.key_id,
+ aws_secret_access_key: args.secret,
+ bucket: args.bucket,
+ user_agent: None,
};
- let creds = AwsCredentials::new(args.key_id, args.secret, None, None);
-
- let client = K2vClient::new(region, args.bucket, creds, None)?;
+ let client = K2vClient::new(config)?;
match args.command {
Command::Insert {
diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs
index 37c221f2..564ce497 100644
--- a/src/k2v-client/error.rs
+++ b/src/k2v-client/error.rs
@@ -18,12 +18,20 @@ pub enum Error {
NotFound,
#[error("io error: {0}")]
IoError(#[from] std::io::Error),
- #[error("rusoto tls error: {0}")]
- RusotoTls(#[from] rusoto_core::request::TlsError),
- #[error("rusoto http error: {0}")]
- RusotoHttp(#[from] rusoto_core::HttpDispatchError),
+ #[error("http error: {0}")]
+ Http(#[from] http::Error),
+ #[error("hyper error: {0}")]
+ Hyper(#[from] hyper::Error),
+ #[error("invalid header: {0}")]
+ Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error),
+ #[error("invalid signature parameters: {0}")]
+ SignParameters(#[from] aws_sigv4::signing_params::BuildError),
+ #[error("could not sign request: {0}")]
+ SignRequest(#[from] aws_sigv4::http_request::SigningError),
+ #[error("request timed out")]
+ Timeout,
#[error("{0}")]
Message(Cow<'static, str>),
}
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index 3d1b5461..425c351f 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -1,21 +1,23 @@
use std::collections::BTreeMap;
-use std::time::Duration;
+use std::convert::TryInto;
+use std::time::{Duration, SystemTime};
use base64::prelude::*;
-use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
-use http::status::StatusCode;
-use http::HeaderMap;
use log::{debug, error};
+use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
+
+use http::header::{ACCEPT, CONTENT_TYPE};
+use http::status::StatusCode;
+use http::{HeaderName, HeaderValue, Request};
+use hyper::{body::Bytes, Body};
+use hyper::{client::connect::HttpConnector, Client as HttpClient};
+use hyper_rustls::HttpsConnector;
+
+use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings};
-use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
-use rusoto_credential::AwsCredentials;
-use rusoto_signature::region::Region;
-use rusoto_signature::signature::SignedRequest;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use tokio::io::AsyncReadExt;
-
mod error;
pub use error::Error;
@@ -23,41 +25,57 @@ pub use error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v";
-const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
+const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token");
+
+const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'/')
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+
+pub struct K2vClientConfig {
+ pub endpoint: String,
+ pub region: String,
+ pub aws_access_key_id: String,
+ pub aws_secret_access_key: String,
+ pub bucket: String,
+ pub user_agent: Option<String>,
+}
/// Client used to query a K2V server.
pub struct K2vClient {
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- client: HttpClient,
+ config: K2vClientConfig,
+ user_agent: HeaderValue,
+ client: HttpClient<HttpsConnector<HttpConnector>>,
}
impl K2vClient {
/// Create a new K2V client.
- pub fn new(
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- user_agent: Option<String>,
- ) -> Result<Self, Error> {
+ pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
- let mut client = HttpClient::from_connector(connector);
- if let Some(ua) = user_agent {
- client.local_agent_prepend(ua);
- } else {
- client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION")));
- }
+ let client = HttpClient::builder().build(connector);
+ let user_agent: std::borrow::Cow<str> = match &config.user_agent {
+ Some(ua) => ua.into(),
+ None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
+ };
+ let user_agent = HeaderValue::from_str(&user_agent)
+ .map_err(|_| Error::Message("invalid user agent".into()))?;
Ok(K2vClient {
- region,
- bucket,
- creds,
+ config,
client,
+ user_agent,
})
}
@@ -67,15 +85,10 @@ impl K2vClient {
partition_key: &str,
sort_key: &str,
) -> Result<CausalValue, Error> {
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(ACCEPT, "application/octet-stream, application/json");
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
let causality = res
@@ -92,7 +105,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
}),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -116,16 +129,17 @@ impl K2vClient {
) -> Result<Option<CausalValue>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
+ let url = self.build_url(
+ Some(partition_key),
+ &[
+ ("sort_key", sort_key),
+ ("causality_token", &causality.0),
+ ("timeout", &timeout.as_secs().to_string()),
+ ],
);
- req.add_param("sort_key", sort_key);
- req.add_param("causality_token", &causality.0);
- req.add_param("timeout", &timeout.as_secs().to_string());
- req.add_header(ACCEPT, "application/octet-stream, application/json");
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
@@ -147,7 +161,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(Some(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
})),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -177,16 +191,10 @@ impl K2vClient {
timeout: timeout.as_secs(),
};
- let mut req = SignedRequest::new(
- "POST",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("poll_range", "");
-
+ let url = self.build_url(Some(partition_key), &[("poll_range", "")]);
let payload = serde_json::to_vec(&request)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(Bytes::from(payload))?;
+
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED {
@@ -220,18 +228,12 @@ impl K2vClient {
value: Vec<u8>,
causality: Option<CausalityToken>,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "PUT",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.set_payload(Some(value));
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let mut req = Request::put(url);
if let Some(causality) = causality {
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0);
}
+ let req = req.body(Bytes::from(value))?;
self.dispatch(req, None).await?;
Ok(())
@@ -244,14 +246,10 @@ impl K2vClient {
sort_key: &str,
causality: CausalityToken,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "DELETE",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::delete(url)
+ .header(GARAGE_CAUSALITY_TOKEN, &causality.0)
+ .body(Bytes::new())?;
self.dispatch(req, None).await?;
Ok(())
@@ -263,9 +261,9 @@ impl K2vClient {
&self,
filter: Filter<'_>,
) -> Result<PaginatedRange<PartitionInfo>, Error> {
- let mut req =
- SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket));
- filter.insert_params(&mut req);
+ let params = filter.query_params();
+ let url = self.build_url(None, &params);
+ let req = Request::get(url).body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
@@ -287,11 +285,10 @@ impl K2vClient {
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In
/// that case, failure is reported.
pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
-
+ let url = self.build_url::<&str>(None, &[]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
self.dispatch(req, None).await?;
Ok(())
}
@@ -301,12 +298,10 @@ impl K2vClient {
&self,
operations: &[BatchReadOp<'_>],
) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("search", "");
-
+ let url = self.build_url(None, &[("search", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
@@ -335,12 +330,10 @@ impl K2vClient {
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
/// providing causality information.
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("delete", "");
-
+ let url = self.build_url(None, &[("delete", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
@@ -350,30 +343,64 @@ impl K2vClient {
async fn dispatch(
&self,
- mut req: SignedRequest,
+ mut req: Request<Bytes>,
timeout: Option<Duration>,
) -> Result<Response, Error> {
- req.sign(&self.creds);
- let mut res = self
- .client
- .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
- .await?;
-
- let causality_token = res
- .headers
- .remove(GARAGE_CAUSALITY_TOKEN)
- .map(CausalityToken);
- let content_type = res.headers.remove(CONTENT_TYPE);
+ req.headers_mut()
+ .insert(http::header::USER_AGENT, self.user_agent.clone());
+
+ use sha2::{Digest, Sha256};
+ let mut hasher = Sha256::new();
+ hasher.update(req.body());
+ let hash = hex::encode(&hasher.finalize());
+ req.headers_mut()
+ .insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap());
+
+ debug!("request uri: {:?}", req.uri());
+
+ // Sign request
+ let signing_settings = SigningSettings::default();
+ let signing_params = SigningParams::builder()
+ .access_key(&self.config.aws_access_key_id)
+ .secret_key(&self.config.aws_secret_access_key)
+ .region(&self.config.region)
+ .service_name(SERVICE)
+ .time(SystemTime::now())
+ .settings(signing_settings)
+ .build()?;
+ // Convert the HTTP request into a signable request
+ let signable_request = SignableRequest::from(&req);
+
+ // Sign and then apply the signature to the request
+ let (signing_instructions, _signature) =
+ sign(signable_request, &signing_params)?.into_parts();
+ signing_instructions.apply_to_request(&mut req);
+
+ // Send and wait for timeout
+ let res = tokio::select! {
+ res = self.client.request(req.map(Body::from)) => res?,
+ _ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => {
+ return Err(Error::Timeout);
+ }
+ };
+
+ let (mut res, body) = res.into_parts();
+ let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) {
+ Some(v) => Some(CausalityToken(v.to_str()?.to_string())),
+ None => None,
+ };
+ let content_type = match res.headers.remove(CONTENT_TYPE) {
+ Some(v) => Some(v.to_str()?.to_string()),
+ None => None,
+ };
let body = match res.status {
- StatusCode::OK => read_body(&mut res.headers, res.body).await?,
- StatusCode::NO_CONTENT => Vec::new(),
+ StatusCode::OK => hyper::body::to_bytes(body).await?,
+ StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound),
- StatusCode::NOT_MODIFIED => Vec::new(),
+ StatusCode::NOT_MODIFIED => Bytes::new(),
s => {
- let err_body = read_body(&mut res.headers, res.body)
- .await
- .unwrap_or_default();
+ let err_body = hyper::body::to_bytes(body).await.unwrap_or_default();
let err_body_str = std::str::from_utf8(&err_body)
.map(String::from)
.unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body));
@@ -419,16 +446,26 @@ impl K2vClient {
content_type,
})
}
-}
-async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> {
- let body_len = headers
- .get(CONTENT_LENGTH)
- .and_then(|h| h.parse().ok())
- .unwrap_or(0);
- let mut res = Vec::with_capacity(body_len);
- body.into_async_read().read_to_end(&mut res).await?;
- Ok(res)
+ fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String {
+ let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket);
+ if let Some(pk) = partition_key {
+ url.push('/');
+ url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET));
+ }
+ if !query.is_empty() {
+ url.push('?');
+ for (i, (k, v)) in query.iter().enumerate() {
+ if i > 0 {
+ url.push('&');
+ }
+ url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET));
+ url.push('=');
+ url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET));
+ }
+ }
+ url
+ }
}
/// An opaque token used to convey causality between operations.
@@ -557,22 +594,24 @@ struct PollRangeResponse {
}
impl<'a> Filter<'a> {
- fn insert_params(&self, req: &mut SignedRequest) {
- if let Some(start) = &self.start {
- req.add_param("start", start);
+ fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> {
+ let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8);
+ if let Some(start) = self.start.as_deref() {
+ res.push(("start", start.into()));
}
- if let Some(end) = &self.end {
- req.add_param("end", end);
+ if let Some(end) = self.end.as_deref() {
+ res.push(("end", end.into()));
}
- if let Some(prefix) = &self.prefix {
- req.add_param("prefix", prefix);
+ if let Some(prefix) = self.prefix.as_deref() {
+ res.push(("prefix", prefix.into()));
}
if let Some(limit) = &self.limit {
- req.add_param("limit", &limit.to_string());
+ res.push(("limit", limit.to_string().into()));
}
if self.reverse {
- req.add_param("reverse", "true");
+ res.push(("reverse", "true".into()));
}
+ res
}
}
@@ -694,7 +733,7 @@ struct ErrorResponse {
}
struct Response {
- body: Vec<u8>,
+ body: Bytes,
status: StatusCode,
causality_token: Option<CausalityToken>,
content_type: Option<String>,