diff options
Diffstat (limited to 'src/k2v-client/lib.rs')
-rw-r--r-- | src/k2v-client/lib.rs | 60 |
1 files changed, 45 insertions, 15 deletions
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index 4aa7a20a..852274a7 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -9,11 +9,15 @@ use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC}; use http::header::{ACCEPT, CONTENT_TYPE}; use http::status::StatusCode; use http::{HeaderName, HeaderValue, Request}; -use hyper::{body::Bytes, Body}; -use hyper::{client::connect::HttpConnector, Client as HttpClient}; +use http_body_util::{BodyExt, Full as FullBody}; +use hyper::body::Bytes; use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; +use hyper_util::rt::TokioExecutor; -use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings}; +use aws_sdk_config::config::Credentials; +use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings}; +use aws_sigv4::sign::v4::SigningParams; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -22,6 +26,8 @@ mod error; pub use error::Error; +pub type Body = FullBody<Bytes>; + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const SERVICE: &str = "k2v"; @@ -53,19 +59,19 @@ pub struct K2vClientConfig { pub struct K2vClient { config: K2vClientConfig, user_agent: HeaderValue, - client: HttpClient<HttpsConnector<HttpConnector>>, + client: HttpClient<HttpsConnector<HttpConnector>, Body>, } impl K2vClient { /// Create a new K2V client. pub fn new(config: K2vClientConfig) -> Result<Self, Error> { let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + .with_native_roots()? .https_or_http() .enable_http1() .enable_http2() .build(); - let client = HttpClient::builder().build(connector); + let client = HttpClient::builder(TokioExecutor::new()).build(connector); let user_agent: std::borrow::Cow<str> = match &config.user_agent { Some(ua) => ua.into(), None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), @@ -363,21 +369,37 @@ impl K2vClient { // Sign request let signing_settings = SigningSettings::default(); + let identity = Credentials::new( + &self.config.aws_access_key_id, + &self.config.aws_secret_access_key, + None, + None, + "k2v-client", + ) + .into(); let signing_params = SigningParams::builder() - .access_key(&self.config.aws_access_key_id) - .secret_key(&self.config.aws_secret_access_key) + .identity(&identity) .region(&self.config.region) - .service_name(SERVICE) + .name(SERVICE) .time(SystemTime::now()) .settings(signing_settings) - .build()?; + .build()? + .into(); // Convert the HTTP request into a signable request - let signable_request = SignableRequest::from(&req); + let signable_request = SignableRequest::new( + req.method().as_str(), + req.uri().to_string(), + // TODO: get rid of Unwrap + req.headers() + .iter() + .map(|(x, y)| (x.as_str(), y.to_str().unwrap())), + SignableBody::Bytes(req.body().as_ref()), + )?; // Sign and then apply the signature to the request let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts(); - signing_instructions.apply_to_request(&mut req); + signing_instructions.apply_to_request_http1x(&mut req); // Send and wait for timeout let res = tokio::select! { @@ -398,12 +420,16 @@ impl K2vClient { }; let body = match res.status { - StatusCode::OK => hyper::body::to_bytes(body).await?, + StatusCode::OK => BodyExt::collect(body).await?.to_bytes(), StatusCode::NO_CONTENT => Bytes::new(), StatusCode::NOT_FOUND => return Err(Error::NotFound), StatusCode::NOT_MODIFIED => Bytes::new(), s => { - let err_body = hyper::body::to_bytes(body).await.unwrap_or_default(); + let err_body = body + .collect() + .await + .map(|x| x.to_bytes()) + .unwrap_or_default(); let err_body_str = std::str::from_utf8(&err_body) .map(String::from) .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body)); @@ -451,7 +477,11 @@ impl K2vClient { } fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String { - let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket); + let mut url = format!( + "{}/{}", + self.config.endpoint.trim_end_matches('/'), + self.config.bucket + ); if let Some(pk) = partition_key { url.push('/'); url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET)); |