aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml72
-rw-r--r--src/api/admin/api_server.rs37
-rw-r--r--src/api/admin/bucket.rs41
-rw-r--r--src/api/admin/cluster.rs31
-rw-r--r--src/api/admin/error.rs23
-rw-r--r--src/api/admin/key.rs36
-rw-r--r--src/api/common_error.rs23
-rw-r--r--src/api/generic_server.rs216
-rw-r--r--src/api/helpers.rs68
-rw-r--r--src/api/k2v/api_server.rs22
-rw-r--r--src/api/k2v/batch.rs31
-rw-r--r--src/api/k2v/error.rs32
-rw-r--r--src/api/k2v/index.rs7
-rw-r--r--src/api/k2v/item.rs47
-rw-r--r--src/api/s3/api_server.rs55
-rw-r--r--src/api/s3/bucket.rs32
-rw-r--r--src/api/s3/copy.rs21
-rw-r--r--src/api/s3/cors.rs52
-rw-r--r--src/api/s3/delete.rs17
-rw-r--r--src/api/s3/error.rs42
-rw-r--r--src/api/s3/get.rs168
-rw-r--r--src/api/s3/lifecycle.rs23
-rw-r--r--src/api/s3/list.rs17
-rw-r--r--src/api/s3/multipart.rs33
-rw-r--r--src/api/s3/post_object.rs36
-rw-r--r--src/api/s3/put.rs22
-rw-r--r--src/api/s3/router.rs29
-rw-r--r--src/api/s3/website.rs23
-rw-r--r--src/api/signature/error.rs4
-rw-r--r--src/api/signature/payload.rs8
-rw-r--r--src/api/signature/streaming.rs34
31 files changed, 796 insertions, 506 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 15bf757e..9fb562a3 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -20,44 +20,46 @@ garage_block.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
-async-trait = "0.1.7"
-base64 = "0.21"
-bytes = "1.0"
-chrono = "0.4"
-crypto-common = "0.1"
-err-derive = "0.3"
-hex = "0.4"
-hmac = "0.12"
-idna = "0.4"
-tracing = "0.1"
-md-5 = "0.10"
-nom = "7.1"
-sha2 = "0.10"
+async-trait.workspace = true
+base64.workspace = true
+bytes.workspace = true
+chrono.workspace = true
+crypto-common.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+hmac.workspace = true
+idna.workspace = true
+tracing.workspace = true
+md-5.workspace = true
+nom.workspace = true
+pin-project.workspace = true
+sha2.workspace = true
-futures = "0.3"
-futures-util = "0.3"
-pin-project = "1.0.12"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-stream = "0.1"
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+tokio-stream.workspace = true
-form_urlencoded = "1.0.0"
-http = "0.2"
-httpdate = "1.0"
-http-range = "0.1"
-hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
-hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
-multer = "2.0"
-percent-encoding = "2.1.0"
-roxmltree = "0.18"
-serde = { version = "1.0", features = ["derive"] }
-serde_bytes = "0.11"
-serde_json = "1.0"
-quick-xml = { version = "0.26", features = [ "serialize" ] }
-url = "2.3"
+form_urlencoded.workspace = true
+http.workspace = true
+httpdate.workspace = true
+http-range.workspace = true
+http-body-util.workspace = true
+hyper.workspace = true
+hyper-util.workspace = true
+multer.workspace = true
+percent-encoding.workspace = true
+roxmltree.workspace = true
+url.workspace = true
-opentelemetry = "0.17"
-opentelemetry-prometheus = { version = "0.10", optional = true }
-prometheus = { version = "0.13", optional = true }
+serde.workspace = true
+serde_bytes.workspace = true
+serde_json.workspace = true
+quick-xml.workspace = true
+
+opentelemetry.workspace = true
+opentelemetry-prometheus = { workspace = true, optional = true }
+prometheus = { workspace = true, optional = true }
[features]
k2v = [ "garage_util/k2v", "garage_model/k2v" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 41a5e68c..2b9be24e 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -3,9 +3,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
@@ -27,7 +27,9 @@ use crate::admin::error::*;
use crate::admin::key::*;
use crate::admin::router_v0;
use crate::admin::router_v1::{Authorization, Endpoint};
-use crate::helpers::host_to_bucket;
+use crate::helpers::*;
+
+pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
@@ -63,24 +65,27 @@ impl AdminApiServer {
pub async fn run(
self,
bind_addr: UnixOrTCPSocketAddress,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
- .run_server(bind_addr, Some(0o220), shutdown_signal)
+ .run_server(bind_addr, Some(0o220), must_exit)
.await
}
- fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.header(ALLOW, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .body(Body::empty())?)
+ .body(empty_body())?)
}
- async fn handle_check_domain(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
+ async fn handle_check_domain(
+ &self,
+ req: Request<IncomingBody>,
+ ) -> Result<Response<ResBody>, Error> {
let query_params: HashMap<String, String> = req
.uri()
.query()
@@ -104,7 +109,7 @@ impl AdminApiServer {
if self.check_domain(domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::from(format!(
+ .body(string_body(format!(
"Domain '{domain}' is managed by Garage"
)))?)
} else {
@@ -167,7 +172,7 @@ impl AdminApiServer {
}
}
- fn handle_health(&self) -> Result<Response<Body>, Error> {
+ fn handle_health(&self) -> Result<Response<ResBody>, Error> {
let health = self.garage.system.health();
let (status, status_str) = match health.status {
@@ -189,10 +194,10 @@ impl AdminApiServer {
Ok(Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
- .body(Body::from(status_str))?)
+ .body(string_body(status_str))?)
}
- fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
#[cfg(feature = "metrics")]
{
use opentelemetry::trace::Tracer;
@@ -212,7 +217,7 @@ impl AdminApiServer {
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
- .body(Body::from(buffer))?)
+ .body(bytes_body(buffer.into()))?)
}
#[cfg(not(feature = "metrics"))]
Err(Error::bad_request(
@@ -229,7 +234,7 @@ impl ApiHandler for AdminApiServer {
type Endpoint = Endpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
Endpoint::from_v0(endpoint_v0)
@@ -240,9 +245,9 @@ impl ApiHandler for AdminApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: Endpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let expected_auth_header =
match endpoint.authorization_type() {
Authorization::None => None,
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 65929d61..a8718a9f 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
@@ -17,12 +17,13 @@ use garage_model::permission::*;
use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
use crate::admin::key::ApiBucketKeyPerm;
use crate::common_error::CommonError;
-use crate::helpers::{json_ok_response, parse_json_body};
+use crate::helpers::*;
-pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let buckets = garage
.bucket_table
.get_range(
@@ -90,7 +91,7 @@ pub async fn handle_get_bucket_info(
garage: &Arc<Garage>,
id: Option<String>,
global_alias: Option<String>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = match (id, global_alias) {
(Some(id), None) => parse_bucket_id(&id)?,
(None, Some(ga)) => garage
@@ -111,7 +112,7 @@ pub async fn handle_get_bucket_info(
async fn bucket_info_results(
garage: &Arc<Garage>,
bucket_id: Uuid,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
@@ -268,9 +269,9 @@ struct GetBucketInfoKey {
pub async fn handle_create_bucket(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<CreateBucketRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
if let Some(ga) = &req.global_alias {
if !is_valid_bucket_name(ga) {
@@ -360,7 +361,7 @@ struct CreateBucketLocalAlias {
pub async fn handle_delete_bucket(
garage: &Arc<Garage>,
id: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let helper = garage.bucket_helper();
let bucket_id = parse_bucket_id(&id)?;
@@ -403,15 +404,15 @@ pub async fn handle_delete_bucket(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_update_bucket(
garage: &Arc<Garage>,
id: String,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<UpdateBucketRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage
@@ -470,10 +471,10 @@ struct UpdateBucketWebsiteAccess {
pub async fn handle_bucket_change_key_perm(
garage: &Arc<Garage>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
new_perm_flag: bool,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&req.bucket_id)?;
@@ -526,7 +527,7 @@ pub async fn handle_global_alias_bucket(
garage: &Arc<Garage>,
bucket_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -541,7 +542,7 @@ pub async fn handle_global_unalias_bucket(
garage: &Arc<Garage>,
bucket_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -557,7 +558,7 @@ pub async fn handle_local_alias_bucket(
bucket_id: String,
access_key_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -573,7 +574,7 @@ pub async fn handle_local_unalias_bucket(
bucket_id: String,
access_key_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 8677257d..8ce6c5ed 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
-use hyper::{Body, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
@@ -12,10 +12,11 @@ use garage_rpc::layout;
use garage_model::garage::Garage;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
use crate::helpers::{json_ok_response, parse_json_body};
-pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@@ -110,7 +111,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
Ok(json_ok_response(&res)?)
}
-pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = ClusterHealth {
@@ -132,9 +133,9 @@ pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<
pub async fn handle_connect_cluster_nodes(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<Vec<String>>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<Vec<String>, _, Error>(req).await?;
let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
.await
@@ -154,7 +155,7 @@ pub async fn handle_connect_cluster_nodes(
Ok(json_ok_response(&res)?)
}
-pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let res = format_cluster_layout(&garage.system.cluster_layout());
Ok(json_ok_response(&res)?)
@@ -290,9 +291,9 @@ struct NodeResp {
pub async fn handle_update_cluster_layout(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
let mut layout = garage.system.cluster_layout().clone();
@@ -336,9 +337,9 @@ pub async fn handle_update_cluster_layout(
pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?;
let layout = garage.system.cluster_layout().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
@@ -356,7 +357,9 @@ pub async fn handle_apply_cluster_layout(
Ok(json_ok_response(&res)?)
}
-pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+) -> Result<Response<ResBody>, Error> {
let layout = garage.system.cluster_layout().clone();
let layout = layout.revert_staged_changes()?;
garage
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index ed1a07bd..2668b42d 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -1,13 +1,13 @@
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
+use hyper::{HeaderMap, StatusCode};
pub use garage_model::helper::error::Error as HelperError;
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
-use crate::helpers::CustomApiErrorBody;
+use crate::helpers::*;
/// Errors of this crate
#[derive(Debug, Error)]
@@ -40,18 +40,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
- }
- }
-}
-
impl Error {
fn code(&self) -> &'static str {
match self {
@@ -77,14 +65,14 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody {
code: self.code().to_string(),
message: format!("{}", self),
path: path.to_string(),
region: garage_region.to_string(),
};
- Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
r#"
{
"code": "InternalError",
@@ -92,6 +80,7 @@ impl ApiError for Error {
}
"#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 8d1c6890..1efaca16 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_table::*;
@@ -9,10 +9,11 @@ use garage_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
-use crate::helpers::{is_default, json_ok_response, parse_json_body};
+use crate::helpers::*;
-pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let res = garage
.key_table
.get_range(
@@ -45,7 +46,7 @@ pub async fn handle_get_key_info(
id: Option<String>,
search: Option<String>,
show_secret_key: bool,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let key = if let Some(id) = id {
garage.key_helper().get_existing_key(&id).await?
} else if let Some(search) = search {
@@ -62,9 +63,9 @@ pub async fn handle_get_key_info(
pub async fn handle_create_key(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<CreateKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@@ -80,9 +81,9 @@ struct CreateKeyRequest {
pub async fn handle_import_key(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<ImportKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
if prev_key.is_some() {
@@ -111,9 +112,9 @@ struct ImportKeyRequest {
pub async fn handle_update_key(
garage: &Arc<Garage>,
id: String,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<UpdateKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?;
let mut key = garage.key_helper().get_existing_key(&id).await?;
@@ -146,7 +147,10 @@ struct UpdateKeyRequest {
deny: Option<KeyPerm>,
}
-pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> {
+pub async fn handle_delete_key(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<ResBody>, Error> {
let mut key = garage.key_helper().get_existing_key(&id).await?;
key.state.as_option().unwrap();
@@ -155,14 +159,14 @@ pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Respo
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
async fn key_info_results(
garage: &Arc<Garage>,
key: Key,
show_secret: bool,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let mut relevant_buckets = HashMap::new();
let key_state = key.state.as_option().unwrap();
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
index ecb22fd8..c47555d4 100644
--- a/src/api/common_error.rs
+++ b/src/api/common_error.rs
@@ -3,6 +3,8 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
+use garage_model::helper::error::Error as HelperError;
+
/// Errors of this crate
#[derive(Debug, Error)]
pub enum CommonError {
@@ -28,6 +30,10 @@ pub enum CommonError {
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
// ---- SPECIFIC ERROR CONDITIONS ----
// These have to be error codes referenced in the S3 spec here:
// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
@@ -62,7 +68,9 @@ impl CommonError {
CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
- CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST,
+ CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => {
+ StatusCode::BAD_REQUEST
+ }
}
}
@@ -80,6 +88,7 @@ impl CommonError {
CommonError::BucketAlreadyExists => "BucketAlreadyExists",
CommonError::BucketNotEmpty => "BucketNotEmpty",
CommonError::InvalidBucketName(_) => "InvalidBucketName",
+ CommonError::InvalidHeader(_) => "InvalidHeaderValue",
}
}
@@ -88,6 +97,18 @@ impl CommonError {
}
}
+impl From<HelperError> for CommonError {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::InternalError(i),
+ HelperError::BadRequest(b) => Self::BadRequest(b),
+ HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n),
+ HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n),
+ e => Self::bad_request(format!("{}", e)),
+ }
+ }
+}
+
pub trait CommonErrorDerivative: From<CommonError> {
fn internal_error<M: ToString>(msg: M) -> Self {
Self::from(CommonError::InternalError(GarageError::Message(
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index fa346f48..9c49fdf3 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -1,3 +1,4 @@
+use std::convert::Infallible;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
@@ -5,16 +6,19 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::future::Future;
+use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
+use http_body_util::BodyExt;
use hyper::header::HeaderValue;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Request, Response, Server};
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{body::Incoming as IncomingBody, Request, Response};
use hyper::{HeaderMap, StatusCode};
+use hyper_util::rt::TokioIo;
-use hyperlocal::UnixServerExt;
-
-use tokio::net::UnixStream;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
+use tokio::sync::watch;
use opentelemetry::{
global,
@@ -28,6 +32,8 @@ use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
+use crate::helpers::{BoxBody, ErrorBody};
+
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn add_span_attributes(&self, span: SpanRef<'_>);
@@ -36,7 +42,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static {
pub trait ApiError: std::error::Error + Send + Sync + 'static {
fn http_status_code(&self) -> StatusCode;
fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
- fn http_body(&self, garage_region: &str, path: &str) -> Body;
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
}
#[async_trait]
@@ -47,12 +53,12 @@ pub(crate) trait ApiHandler: Send + Sync + 'static {
type Endpoint: ApiEndpoint;
type Error: ApiError;
- fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>;
+ fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: Self::Endpoint,
- ) -> Result<Response<Body>, Self::Error>;
+ ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>;
}
pub(crate) struct ApiServer<A: ApiHandler> {
@@ -99,74 +105,42 @@ impl<A: ApiHandler> ApiServer<A> {
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
unix_bind_addr_mode: Option<u32>,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
- let tcp_service = make_service_fn(|conn: &AddrStream| {
- let this = self.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let this = this.clone();
-
- this.handler(req, client_addr.to_string())
- }))
- }
- });
-
- let unix_service = make_service_fn(|_: &UnixStream| {
- let this = self.clone();
-
- let path = bind_addr.to_string();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let this = this.clone();
-
- this.handler(req, path.clone())
- }))
- }
- });
-
- info!(
- "{} API server listening on {}",
- A::API_NAME_DISPLAY,
- bind_addr
- );
+ let server_name = format!("{} API", A::API_NAME_DISPLAY);
+ info!("{} server listening on {}", server_name, bind_addr);
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
- Server::bind(&addr)
- .serve(tcp_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?
+ let listener = TcpListener::bind(addr).await?;
+
+ let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
fs::remove_file(path)?
}
- let bound = Server::bind_unix(path)?;
+ let listener = UnixListener::bind(path)?;
+ let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(
path,
Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
)?;
- bound
- .serve(unix_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?;
+ let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
- };
-
- Ok(())
+ }
}
async fn handler(
self: Arc<Self>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
addr: String,
- ) -> Result<Response<Body>, GarageError> {
+ ) -> Result<Response<BoxBody<A::Error>>, http::Error> {
let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) =
@@ -205,7 +179,7 @@ impl<A: ApiHandler> ApiServer<A> {
Ok(x)
}
Err(e) => {
- let body: Body = e.http_body(&self.region, uri.path());
+ let body = e.http_body(&self.region, uri.path());
let mut http_error_builder = Response::builder().status(e.http_status_code());
if let Some(header_map) = http_error_builder.headers_mut() {
@@ -219,12 +193,16 @@ impl<A: ApiHandler> ApiServer<A> {
} else {
info!("Response: error {}, {}", e.http_status_code(), e);
}
- Ok(http_error)
+ Ok(http_error
+ .map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!()))))
}
}
}
- async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> {
+ async fn handler_stage2(
+ &self,
+ req: Request<IncomingBody>,
+ ) -> Result<Response<BoxBody<A::Error>>, A::Error> {
let endpoint = self.api_handler.parse_endpoint(&req)?;
debug!("Endpoint: {}", endpoint.name());
@@ -265,3 +243,123 @@ impl<A: ApiHandler> ApiServer<A> {
res
}
}
+
+// ==== helper functions ====
+
+#[async_trait]
+pub trait Accept: Send + Sync + 'static {
+ type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
+}
+
+#[async_trait]
+impl Accept for TcpListener {
+ type Stream = TcpStream;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
+ self.accept()
+ .await
+ .map(|(stream, addr)| (stream, addr.to_string()))
+ }
+}
+
+pub struct UnixListenerOn(pub UnixListener, pub String);
+
+#[async_trait]
+impl Accept for UnixListenerOn {
+ type Stream = UnixStream;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
+ self.0
+ .accept()
+ .await
+ .map(|(stream, _addr)| (stream, self.1.clone()))
+ }
+}
+
+pub async fn server_loop<A, H, F, E>(
+ server_name: String,
+ listener: A,
+ handler: H,
+ mut must_exit: watch::Receiver<bool>,
+) -> Result<(), GarageError>
+where
+ A: Accept,
+ H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static,
+ F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
+ E: Send + Sync + std::error::Error + 'static,
+{
+ let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
+ let connection_collector = tokio::spawn({
+ let server_name = server_name.clone();
+ async move {
+ let mut connections = FuturesUnordered::new();
+ loop {
+ let collect_next = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ tokio::select! {
+ result = collect_next => {
+ trace!("{} server: HTTP connection finished: {:?}", server_name, result);
+ }
+ new_fut = conn_out.recv() => {
+ match new_fut {
+ Some(f) => connections.push(f),
+ None => break,
+ }
+ }
+ }
+ }
+ if !connections.is_empty() {
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ while let Some(conn_res) = connections.next().await {
+ trace!(
+ "{} server: HTTP connection finished: {:?}",
+ server_name,
+ conn_res
+ );
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ }
+ }
+ }
+ });
+
+ while !*must_exit.borrow() {
+ let (stream, client_addr) = tokio::select! {
+ acc = listener.accept() => acc?,
+ _ = must_exit.changed() => continue,
+ };
+
+ let io = TokioIo::new(stream);
+
+ let handler = handler.clone();
+ let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone());
+
+ let fut = tokio::task::spawn(async move {
+ let io = Box::pin(io);
+ if let Err(e) = http1::Builder::new()
+ .serve_connection(io, service_fn(serve))
+ .await
+ {
+ debug!("Error handling HTTP connection: {}", e);
+ }
+ });
+ conn_in.send(fut)?;
+ }
+
+ info!("{} server exiting", server_name);
+ drop(conn_in);
+ connection_collector.await?;
+
+ Ok(())
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index 1d55ebd5..5f488912 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,7 +1,17 @@
-use hyper::{Body, Request, Response};
+use std::convert::Infallible;
+
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use http_body_util::{BodyExt, Full as FullBody};
+use hyper::{
+ body::{Body, Bytes},
+ Request, Response,
+};
use idna::domain_to_unicode;
use serde::{Deserialize, Serialize};
+use garage_util::error::Error as GarageError;
+
use crate::common_error::{CommonError as Error, *};
/// What kind of authorization is required to perform a given action
@@ -138,18 +148,64 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None
}
-pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+// =============== body helpers =================
+
+pub type EmptyBody = http_body_util::Empty<bytes::Bytes>;
+pub type ErrorBody = FullBody<bytes::Bytes>;
+pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>;
+
+pub fn string_body<E>(s: String) -> BoxBody<E> {
+ bytes_body(bytes::Bytes::from(s.into_bytes()))
+}
+pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> {
+ BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!()))
+}
+pub fn empty_body<E>() -> BoxBody<E> {
+ BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!()))
+}
+pub fn error_body(s: String) -> ErrorBody {
+ ErrorBody::from(bytes::Bytes::from(s.into_bytes()))
+}
+
+pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E>
+where
+ T: for<'de> Deserialize<'de>,
+ B: Body,
+ E: From<<B as Body>::Error> + From<Error>,
+{
+ let body = req.into_body().collect().await?.to_bytes();
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
-pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> {
- let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?;
+pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E>
+where
+ E: From<Error>,
+{
+ let resp_json = serde_json::to_string_pretty(res)
+ .map_err(GarageError::from)
+ .map_err(Error::from)?;
Ok(Response::builder()
.status(hyper::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
- .body(Body::from(resp_json))?)
+ .body(string_body(resp_json))
+ .unwrap())
+}
+
+pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
+where
+ B: Body<Data = Bytes>,
+ <B as Body>::Error: Into<E>,
+ E: From<Error>,
+{
+ let stream = http_body_util::BodyStream::new(body);
+ let stream = TryStreamExt::map_err(stream, Into::into);
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| E::from(Error::bad_request("non-data frame")))
+ })
+ })
}
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 3a032aba..e97da2af 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -2,8 +2,8 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
-use hyper::{Body, Method, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -25,6 +25,9 @@ use crate::k2v::item::*;
use crate::k2v::router::Endpoint;
use crate::s3::cors::*;
+pub use crate::signature::streaming::ReqBody;
+pub type ResBody = BoxBody<Error>;
+
pub struct K2VApiServer {
garage: Arc<Garage>,
}
@@ -39,10 +42,10 @@ impl K2VApiServer {
garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, K2VApiServer { garage })
- .run_server(bind_addr, None, shutdown_signal)
+ .run_server(bind_addr, None, must_exit)
.await
}
}
@@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer {
type Endpoint = K2VApiEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
Ok(K2VApiEndpoint {
@@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: K2VApiEndpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let K2VApiEndpoint {
bucket_name,
endpoint,
@@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
- return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ let options_res = handle_options_api(garage, &req, Some(bucket_name))
.await
- .ok_or_bad_request("Error handling OPTIONS")?);
+ .ok_or_bad_request("Error handling OPTIONS")?;
+ return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 294380ea..ae2778b1 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use base64::prelude::*;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
@@ -13,15 +13,16 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
use crate::helpers::*;
+use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -41,15 +42,15 @@ pub async fn handle_insert_batch(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_read_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -139,9 +140,9 @@ async fn handle_read_batch_query(
pub async fn handle_delete_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range(
garage: Arc<Garage>,
bucket_id: Uuid,
partition_key: &str,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
use garage_model::k2v::sub::PollRange;
- let query = parse_json_body::<PollRangeQuery>(req).await?;
+ let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
@@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range(
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
index 4eb017ab..16479227 100644
--- a/src/api/k2v/error.rs
+++ b/src/api/k2v/error.rs
@@ -1,13 +1,11 @@
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
-
-use garage_model::helper::error::Error as HelperError;
+use hyper::{HeaderMap, StatusCode};
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
-use crate::helpers::CustomApiErrorBody;
+use crate::helpers::*;
use crate::signature::error::Error as SignatureError;
/// Errors of this crate
@@ -30,10 +28,6 @@ pub enum Error {
#[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError),
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client asked for an invalid return format (invalid Accept header)
#[error(display = "Not acceptable: {}", _0)]
NotAcceptable(String),
@@ -54,18 +48,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- e => Self::Common(CommonError::BadRequest(format!("{}", e))),
- }
- }
-}
-
impl From<SignatureError> for Error {
fn from(err: SignatureError) -> Self {
match err {
@@ -74,7 +56,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
- SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
@@ -90,7 +71,6 @@ impl Error {
Error::NotAcceptable(_) => "NotAcceptable",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::InvalidBase64(_) => "InvalidBase64",
- Error::InvalidHeader(_) => "InvalidHeaderValue",
Error::InvalidUtf8Str(_) => "InvalidUtf8String",
}
}
@@ -105,7 +85,6 @@ impl ApiError for Error {
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
Error::AuthorizationHeaderMalformed(_)
| Error::InvalidBase64(_)
- | Error::InvalidHeader(_)
| Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
}
}
@@ -115,14 +94,14 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody {
code: self.code().to_string(),
message: format!("{}", self),
path: path.to_string(),
region: garage_region.to_string(),
};
- Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
r#"
{
"code": "InternalError",
@@ -130,6 +109,7 @@ impl ApiError for Error {
}
"#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index e8cd1fba..291464ab 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use hyper::{Body, Response};
+use hyper::Response;
use serde::Serialize;
use garage_util::data::*;
@@ -11,6 +11,7 @@ use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
+use crate::k2v::api_server::ResBody;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@@ -22,7 +23,7 @@ pub async fn handle_read_index(
end: Option<String>,
limit: Option<u64>,
reverse: Option<bool>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let reverse = reverse.unwrap_or(false);
let node_id_vec = garage
@@ -71,7 +72,7 @@ pub async fn handle_read_index(
next_start,
};
- Ok(json_ok_response(&resp)?)
+ json_ok_response::<Error, _>(&resp)
}
#[derive(Serialize)]
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index e13a0f30..0c5931a1 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use http::header;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@@ -11,6 +11,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
+use crate::helpers::*;
+use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
@@ -22,7 +24,7 @@ pub enum ReturnFormat {
}
impl ReturnFormat {
- pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> {
let accept = match req.headers().get(header::ACCEPT) {
Some(a) => a.to_str()?,
None => return Ok(Self::Json),
@@ -40,7 +42,7 @@ impl ReturnFormat {
}
}
- pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> {
let vals = item.values();
if vals.is_empty() {
@@ -52,7 +54,7 @@ impl ReturnFormat {
Self::Binary if vals.len() > 1 => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.status(StatusCode::CONFLICT)
- .body(Body::empty())?),
+ .body(empty_body())?),
Self::Binary => {
assert!(vals.len() == 1);
Self::make_binary_response(ct, vals[0])
@@ -62,22 +64,22 @@ impl ReturnFormat {
}
}
- fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> {
match v {
DvvsValue::Deleted => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?),
+ .body(empty_body())?),
DvvsValue::Value(v) => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::OK)
- .body(Body::from(v.to_vec()))?),
+ .body(bytes_body(v.to_vec().into()))?),
}
}
- fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> {
let items = v
.iter()
.map(|v| match v {
@@ -91,7 +93,7 @@ impl ReturnFormat {
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/json")
.status(StatusCode::OK)
- .body(Body::from(json_body))?)
+ .body(string_body(json_body))?)
}
}
@@ -99,11 +101,11 @@ impl ReturnFormat {
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let item = garage
@@ -124,11 +126,11 @@ pub async fn handle_read_item(
pub async fn handle_insert_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -137,7 +139,10 @@ pub async fn handle_insert_item(
.map(CausalContext::parse_helper)
.transpose()?;
- let body = hyper::body::to_bytes(req.into_body()).await?;
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
+
let value = DvvsValue::Value(body.to_vec());
garage
@@ -154,16 +159,16 @@ pub async fn handle_insert_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_delete_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -188,20 +193,20 @@ pub async fn handle_delete_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let causal_context =
@@ -226,6 +231,6 @@ pub async fn handle_poll_item(
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 887839dd..7fac6261 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use hyper::header;
-use hyper::{Body, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -34,6 +34,9 @@ use crate::s3::put::*;
use crate::s3::router::Endpoint;
use crate::s3::website::*;
+pub use crate::signature::streaming::ReqBody;
+pub type ResBody = BoxBody<Error>;
+
pub struct S3ApiServer {
garage: Arc<Garage>,
}
@@ -48,19 +51,19 @@ impl S3ApiServer {
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, S3ApiServer { garage })
- .run_server(addr, None, shutdown_signal)
+ .run_server(addr, None, must_exit)
.await
}
async fn handle_request_without_bucket(
&self,
- _req: Request<Body>,
+ _req: Request<ReqBody>,
api_key: Key,
endpoint: Endpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
match endpoint {
Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
@@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer {
type Endpoint = S3ApiEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> {
let authority = req
.headers()
.get(header::HOST)
@@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: S3ApiEndpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let S3ApiEndpoint {
bucket_name,
endpoint,
@@ -118,7 +121,8 @@ impl ApiHandler for S3ApiServer {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, bucket_name).await;
+ let options_res = handle_options_api(garage, &req, bucket_name).await?;
+ return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
@@ -174,8 +178,26 @@ impl ApiHandler for S3ApiServer {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
- key, part_number, ..
- } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ key,
+ part_number,
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ ..
+ } => {
+ let overrides = GetObjectOverrides {
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ };
+ handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
+ }
Endpoint::UploadPart {
key,
part_number,
@@ -235,8 +257,7 @@ impl ApiHandler for S3ApiServer {
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
- let empty_body: Body = Body::from(vec![]);
- let response = Response::builder().body(empty_body).unwrap();
+ let response = Response::builder().body(empty_body()).unwrap();
Ok(response)
}
Endpoint::DeleteBucket {} => {
@@ -257,7 +278,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
@@ -287,7 +308,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
@@ -320,7 +341,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 733981e1..fa2f1b6d 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
@@ -14,11 +15,13 @@ use garage_util::data::*;
use garage_util::time::*;
use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
-pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
let loc = s3_xml::LocationConstraint {
xmlns: (),
region: garage.config.s3_api.s3_region.to_string(),
@@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>,
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> {
let versioning = s3_xml::VersioningConfiguration {
xmlns: (),
status: None,
@@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(
+ garage: &Garage,
+ api_key: &Key,
+) -> Result<Response<ResBody>, Error> {
let key_p = api_key.params().ok_or_internal_error(
"Key should not be in deleted state at this point (in handle_list_buckets)",
)?;
@@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_create_bucket(
garage: &Garage,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key: Key,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -194,7 +200,7 @@ pub async fn handle_create_bucket(
Ok(Response::builder()
.header("Location", format!("/{}", bucket_name))
- .body(Body::empty())
+ .body(empty_body())
.unwrap())
}
@@ -203,7 +209,7 @@ pub async fn handle_delete_bucket(
bucket_id: Uuid,
bucket_name: String,
api_key: Key,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let key_params = api_key
.params()
.ok_or_internal_error("Key should not be deleted at this point")?;
@@ -282,7 +288,7 @@ pub async fn handle_delete_bucket(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 68b4f0c9..ba9bfc88 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt};
use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
-use hyper::{Body, Request, Response};
+use hyper::{Request, Response};
use serde::Serialize;
use garage_rpc::netapp::bytes_buf::BytesBuf;
@@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::parse_bucket_key;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::multipart;
use crate::s3::put::get_headers;
@@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&garage, api_key, req).await?;
@@ -176,18 +177,18 @@ pub async fn handle_copy(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_upload_part_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
@@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
)
- .body(Body::from(resp_xml))?)
+ .body(string_body(resp_xml))?)
}
async fn get_copy_source(
garage: &Garage,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
) -> Result<Object, Error> {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
@@ -501,7 +502,7 @@ struct CopyPreconditionHeaders {
}
impl CopyPreconditionHeaders {
- fn parse(req: &Request<Body>) -> Result<Self, Error> {
+ fn parse(req: &Request<ReqBody>) -> Result<Self, Error> {
Ok(Self {
copy_source_if_match: req
.headers()
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 49097ad1..e069cae4 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -5,10 +5,18 @@ use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
};
-use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
+use hyper::{
+ body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response,
+ StatusCode,
+};
+
+use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
+use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -17,7 +25,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -34,18 +42,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -55,16 +63,16 @@ pub async fn handle_delete_cors(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -84,14 +92,14 @@ pub async fn handle_put_cors(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
-pub async fn handle_options_s3api(
+pub async fn handle_options_api(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket_name: Option<String>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
// FIXME: CORS rules of buckets with local aliases are
// not taken into account.
@@ -121,7 +129,7 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "*")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
} else {
// If there is no bucket name in the request,
@@ -131,14 +139,14 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
}
pub fn handle_options_for_bucket(
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket: &Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
.headers()
.get("Origin")
@@ -161,18 +169,20 @@ pub fn handle_options_for_bucket(
if let Some(rule) = matching_rule {
let mut resp = Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?;
+ .body(EmptyBody::new())?;
add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
return Ok(resp);
}
}
- Err(Error::forbidden("This CORS request is not allowed."))
+ Err(CommonError::Forbidden(
+ "This CORS request is not allowed.".into(),
+ ))
}
pub fn find_matching_cors_rule<'a>(
bucket: &'a Bucket,
- req: &Request<Body>,
+ req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, Error> {
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@@ -209,7 +219,7 @@ where
}
pub fn add_cors_headers(
- resp: &mut Response<Body>,
+ resp: &mut Response<impl Body>,
rule: &GarageCorsRule,
) -> Result<(), http::header::InvalidHeaderValue> {
let h = resp.headers_mut();
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 1c491eac..3fb39147 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -1,12 +1,15 @@
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
@@ -59,11 +62,11 @@ pub async fn handle_delete(
garage: Arc<Garage>,
bucket_id: Uuid,
key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
match handle_delete_internal(&garage, bucket_id, key).await {
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()),
Err(e) => Err(e),
}
@@ -72,10 +75,10 @@ pub async fn handle_delete(
pub async fn handle_delete_objects(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -118,7 +121,7 @@ pub async fn handle_delete_objects(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
struct DeleteRequest {
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index c50cff9f..f86c19a6 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -2,13 +2,12 @@ use std::convert::TryInto;
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
-
-use garage_model::helper::error::Error as HelperError;
+use hyper::{HeaderMap, StatusCode};
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
+use crate::helpers::*;
use crate::s3::xml as s3_xml;
use crate::signature::error::Error as SignatureError;
@@ -62,10 +61,6 @@ pub enum Error {
#[error(display = "Invalid XML: {}", _0)]
InvalidXml(String),
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
@@ -86,18 +81,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- e => Self::bad_request(format!("{}", e)),
- }
- }
-}
-
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@@ -118,7 +101,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
- SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
@@ -143,9 +125,7 @@ impl Error {
Error::NotImplemented(_) => "NotImplemented",
Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange",
- Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
- "InvalidRequest"
- }
+ Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
}
}
}
@@ -165,8 +145,7 @@ impl ApiError for Error {
| Error::EntityTooSmall
| Error::InvalidXml(_)
| Error::InvalidUtf8Str(_)
- | Error::InvalidUtf8String(_)
- | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
+ | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
}
}
@@ -189,22 +168,23 @@ impl ApiError for Error {
}
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = s3_xml::Error {
code: s3_xml::Value(self.aws_code().to_string()),
message: s3_xml::Value(format!("{}", self)),
resource: Some(s3_xml::Value(path.to_string())),
region: Some(s3_xml::Value(garage_region.to_string())),
};
- Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
+ let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
r#"
<?xml version="1.0" encoding="UTF-8"?>
<Error>
- <Code>InternalError</Code>
- <Message>XML encoding of error failed</Message>
+ <Code>InternalError</Code>
+ <Message>XML encoding of error failed</Message>
</Error>
- "#
+ "#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 5e682726..53f0a345 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -1,17 +1,20 @@
//! Function related to GET and HEAD requests
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use futures::future;
use futures::stream::{self, StreamExt};
use http::header::{
- ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
- IF_NONE_MATCH, LAST_MODIFIED, RANGE,
+ ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
+ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
+ LAST_MODIFIED, RANGE,
};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
-use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
+use garage_block::manager::BlockStream;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
@@ -20,10 +23,22 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
+#[derive(Default)]
+pub struct GetObjectOverrides {
+ pub(crate) response_cache_control: Option<String>,
+ pub(crate) response_content_disposition: Option<String>,
+ pub(crate) response_content_encoding: Option<String>,
+ pub(crate) response_content_language: Option<String>,
+ pub(crate) response_content_type: Option<String>,
+ pub(crate) response_expires: Option<String>,
+}
+
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
@@ -49,11 +64,37 @@ fn object_headers(
resp
}
+/// Override headers according to specific query parameters, see
+/// section "Overriding response header values through the request" in
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+fn getobject_override_headers(
+ overrides: GetObjectOverrides,
+ resp: &mut http::response::Builder,
+) -> Result<(), Error> {
+ // TODO: this only applies for signed requests, so when we support
+ // anonymous access in the future we will have to do a permission check here
+ let overrides = [
+ (CACHE_CONTROL, overrides.response_cache_control),
+ (CONTENT_DISPOSITION, overrides.response_content_disposition),
+ (CONTENT_ENCODING, overrides.response_content_encoding),
+ (CONTENT_LANGUAGE, overrides.response_content_language),
+ (CONTENT_TYPE, overrides.response_content_type),
+ (EXPIRES, overrides.response_expires),
+ ];
+ for (hdr, val_opt) in overrides {
+ if let Some(val) = val_opt {
+ let val = val.try_into().ok_or_bad_request("invalid header value")?;
+ resp.headers_mut().unwrap().insert(hdr, val);
+ }
+ }
+ Ok(())
+}
+
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
- req: &Request<Body>,
-) -> Option<Response<Body>> {
+ req: &Request<impl Body>,
+) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
@@ -80,7 +121,7 @@ fn try_answer_cached(
Some(
Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
)
} else {
@@ -91,11 +132,11 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -138,7 +179,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -163,7 +204,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
_ => unreachable!(),
}
@@ -171,18 +212,19 @@ pub async fn handle_head(
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+ overrides: GetObjectOverrides,
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -233,18 +275,18 @@ pub async fn handle_get(
(None, None) => (),
}
- let resp_builder = object_headers(last_v, last_v_meta)
+ let mut resp_builder = object_headers(last_v, last_v_meta)
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK);
+ getobject_override_headers(overrides, &mut resp_builder)?;
match &last_v_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- let body: Body = Body::from(bytes.to_vec());
- Ok(resp_builder.body(body)?)
+ Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let (tx, rx) = mpsc::channel(2);
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash;
@@ -282,20 +324,12 @@ pub async fn handle_get(
{
Ok(()) => (),
Err(e) => {
- let err = std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- let _ = tx
- .send(Box::pin(stream::once(future::ready(Err(err)))))
- .await;
+ let _ = tx.send(error_stream_item(e)).await;
}
}
});
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
-
- let body = hyper::body::Body::wrap_stream(body_stream);
+ let body = response_body_from_block_stream(rx);
Ok(resp_builder.body(body)?)
}
}
@@ -308,7 +342,10 @@ async fn handle_get_range(
version_meta: &ObjectVersionMeta,
begin: u64,
end: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Here we do not use getobject_override_headers because we don't
+ // want to add any overridden headers (those should not be added
+ // when returning PARTIAL_CONTENT)
let resp_builder = object_headers(version, version_meta)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
@@ -321,7 +358,7 @@ async fn handle_get_range(
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
if end as usize <= bytes.len() {
- let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
+ let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
} else {
Err(Error::internal_error(
@@ -348,7 +385,8 @@ async fn handle_get_part(
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Same as for get_range, no getobject_override_headers
let resp_builder =
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
@@ -364,7 +402,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(Body::from(bytes.to_vec()))?)
+ .body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -392,7 +430,7 @@ async fn handle_get_part(
}
fn parse_range_header(
- req: &Request<Body>,
+ req: &Request<impl Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@@ -434,7 +472,7 @@ fn body_from_blocks_range(
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
-) -> Body {
+) -> ResBody {
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
@@ -456,17 +494,17 @@ fn body_from_blocks_range(
}
let order_stream = OrderTag::stream();
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (block, block_offset))| {
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
+
+ tokio::spawn(async move {
+ match async {
let garage = garage.clone();
- async move {
- garage
+ for (i, (block, block_offset)) in blocks.iter().enumerate() {
+ let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- .scan(block_offset, move |chunk_offset, chunk| {
+ .await?
+ .scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
@@ -502,20 +540,42 @@ fn body_from_blocks_range(
};
futures::future::ready(r)
})
- .filter_map(futures::future::ready)
+ .filter_map(futures::future::ready);
+
+ let block_stream: BlockStream = Box::pin(block_stream);
+ tx.send(Box::pin(block_stream))
+ .await
+ .ok_or_message("channel closed")?;
}
- })
- .buffered(2)
- .flatten();
- hyper::body::Body::wrap_stream(body_stream)
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let _ = tx.send(error_stream_item(e)).await;
+ }
+ }
+ });
+
+ response_body_from_block_stream(rx)
+}
+
+fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
+ .flatten()
+ .map(|x| {
+ x.map(hyper::body::Frame::data)
+ .map_err(|e| Error::from(garage_util::error::Error::from(e)))
+ });
+ ResBody::new(http_body_util::StreamBody::new(body_stream))
}
-fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
- Box::pin(futures::stream::once(async move {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Could not get block {}: {}", i, e),
- ))
- }))
+fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ Box::pin(stream::once(future::ready(Err(err))))
}
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 1e7d6755..35757e8c 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -1,10 +1,13 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -16,7 +19,7 @@ use garage_model::bucket_table::{
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 1b9e8cd5..b832a4f4 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable};
use std::sync::Arc;
use base64::prelude::*;
-use hyper::{Body, Response};
+use hyper::Response;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -16,7 +16,8 @@ use garage_model::s3::object_table::*;
use garage_table::EnumerationOrder;
use crate::encoding::*;
-use crate::helpers::key_after_prefix;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
use crate::s3::multipart as s3_multipart;
use crate::s3::xml as s3_xml;
@@ -63,7 +64,7 @@ pub struct ListPartsQuery {
pub async fn handle_list(
garage: Arc<Garage>,
query: &ListObjectsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -162,13 +163,13 @@ pub async fn handle_list(
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_multipart_upload(
garage: Arc<Garage>,
query: &ListMultipartUploadsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_parts(
garage: Arc<Garage>,
query: &ListPartsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
@@ -319,7 +320,7 @@ pub async fn handle_list_parts(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
/*
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 6b786318..b9d15b21 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::prelude::*;
-use hyper::body::Body;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
@@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -26,11 +27,11 @@ use crate::signature::verify_signed_content;
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_put_part(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -87,8 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body = req.into_body().map_err(Error::from);
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let stream = body_stream(req.into_body());
+ let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
@@ -172,7 +173,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::empty())
+ .body(empty_body())
.unwrap();
Ok(response)
}
@@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup {
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_abort_multipart_upload(
@@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload(
bucket_id: Uuid,
key: &str,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
@@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Body::from(vec![])))
+ Ok(Response::new(empty_body()))
}
// ======== helpers ============
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 542b7a81..bca8d6c6 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::convert::TryInto;
+use std::convert::{Infallible, TryInto};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -9,12 +9,15 @@ use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
+use crate::s3::cors::*;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
@@ -22,9 +25,9 @@ use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let boundary = req
.headers()
.get(header::CONTENT_TYPE)
@@ -41,7 +44,8 @@ pub async fn handle_post_object(
);
let (head, body) = req.into_parts();
- let mut multipart = Multipart::with_constraints(body, boundary, constraints);
+ let stream = body_stream::<_, Error>(body);
+ let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
@@ -242,7 +246,7 @@ pub async fn handle_post_object(
let etag = format!("\"{}\"", md5);
- let resp = if let Some(mut target) = params
+ let mut resp = if let Some(mut target) = params
.get("success_action_redirect")
.and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok())
@@ -258,12 +262,11 @@ pub async fn handle_post_object(
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
- .body(target.into())?
+ .body(string_body(target))?
} else {
let path = head
.uri
- .into_parts()
- .path_and_query
+ .path_and_query()
.map(|paq| paq.path().to_string())
.unwrap_or_else(|| "/".to_string());
let authority = head
@@ -290,7 +293,7 @@ pub async fn handle_post_object(
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
match action {
- "200" => builder.status(StatusCode::OK).body(Body::empty())?,
+ "200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => {
let xml = s3_xml::PostObject {
xmlns: (),
@@ -302,12 +305,21 @@ pub async fn handle_post_object(
let body = s3_xml::to_xml_with_header(&xml)?;
builder
.status(StatusCode::CREATED)
- .body(Body::from(body.into_bytes()))?
+ .body(string_body(body))?
}
- _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
+ _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?,
}
};
+ let matching_cors_rule = find_matching_cors_rule(
+ &bucket,
+ &Request::from_parts(head, empty_body::<Infallible>()),
+ )?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
Ok(resp)
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index d1c88a76..8902b14c 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -4,12 +4,13 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
use futures::try_join;
-use hyper::body::{Body, Bytes};
-use hyper::header::{HeaderMap, HeaderValue};
-use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use hyper::body::Bytes;
+use hyper::header::{HeaderMap, HeaderValue};
+use hyper::{Request, Response};
+
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
@@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
pub async fn handle_put(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
@@ -48,13 +51,12 @@ pub async fn handle_put(
None => None,
};
- let (_head, body) = req.into_parts();
- let body = body.map_err(Error::from);
+ let stream = body_stream(req.into_body());
save_stream(
garage,
headers,
- body,
+ stream,
bucket,
key,
content_md5,
@@ -434,11 +436,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
}
}
-pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
+pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()
}
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index 821b0e07..e7ac1d77 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -125,6 +125,12 @@ pub enum Endpoint {
key: String,
part_number: Option<u64>,
version_id: Option<String>,
+ response_cache_control: Option<String>,
+ response_content_disposition: Option<String>,
+ response_content_encoding: Option<String>,
+ response_content_language: Option<String>,
+ response_content_type: Option<String>,
+ response_expires: Option<String>,
},
GetObjectAcl {
key: String,
@@ -170,7 +176,7 @@ pub enum Endpoint {
},
ListBuckets,
ListMultipartUploads {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_uploads: Option<usize>,
@@ -178,7 +184,7 @@ pub enum Endpoint {
upload_id_marker: Option<String>,
},
ListObjects {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
marker: Option<String>,
max_keys: Option<usize>,
@@ -188,7 +194,7 @@ pub enum Endpoint {
// This value should always be 2. It is not checked when constructing the struct
list_type: String,
continuation_token: Option<String>,
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
fetch_owner: Option<bool>,
max_keys: Option<usize>,
@@ -196,7 +202,7 @@ pub enum Endpoint {
start_after: Option<String>,
},
ListObjectVersions {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_keys: Option<u64>,
@@ -358,7 +364,14 @@ impl Endpoint {
(query.keyword.take().unwrap_or_default(), key, query, None),
key: [
EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker),
- EMPTY => GetObject (query_opt::version_id, opt_parse::part_number),
+ EMPTY => GetObject (query_opt::version_id,
+ opt_parse::part_number,
+ query_opt::response_cache_control,
+ query_opt::response_content_disposition,
+ query_opt::response_content_encoding,
+ query_opt::response_content_language,
+ query_opt::response_content_type,
+ query_opt::response_expires),
ACL => GetObjectAcl (query_opt::version_id),
LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id),
RETENTION => GetObjectRetention (query_opt::version_id),
@@ -671,6 +684,12 @@ generateQueryParameters! {
"partNumber" => part_number,
"part-number-marker" => part_number_marker,
"prefix" => prefix,
+ "response-cache-control" => response_cache_control,
+ "response-content-disposition" => response_content_disposition,
+ "response-content-encoding" => response_content_encoding,
+ "response-content-language" => response_content_language,
+ "response-content-type" => response_content_type,
+ "response-expires" => response_expires,
"select-type" => select_type,
"start-after" => start_after,
"uploadId" => upload_id,
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 7f2ab925..1c1dbf20 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -1,9 +1,12 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -12,7 +15,7 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_website(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -54,16 +57,16 @@ pub async fn handle_delete_website(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_website(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -83,7 +86,7 @@ pub async fn handle_put_website(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs
index f0d7c816..2d92a072 100644
--- a/src/api/signature/error.rs
+++ b/src/api/signature/error.rs
@@ -18,10 +18,6 @@ pub enum Error {
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
-
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
}
impl<T> From<T> for Error
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index b50fb3bb..423aad93 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -1,8 +1,8 @@
use std::collections::HashMap;
-use chrono::{DateTime, Duration, NaiveDateTime, Utc};
+use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
use hmac::Mac;
-use hyper::{Body, Method, Request};
+use hyper::{body::Incoming as IncomingBody, Method, Request};
use sha2::{Digest, Sha256};
use garage_table::*;
@@ -20,7 +20,7 @@ use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
service: &'static str,
- request: &Request<Body>,
+ request: &Request<IncomingBody>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
for (key, val) in request.headers() {
@@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
- Ok(DateTime::from_utc(date, Utc))
+ Ok(Utc.from_utc_datetime(&date))
}
pub async fn verify_v4(
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index c8358c4f..39147ca0 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -1,26 +1,30 @@
use std::pin::Pin;
-use chrono::{DateTime, NaiveDateTime, Utc};
+use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use garage_model::key_table::Key;
use hmac::Mac;
-use hyper::body::Bytes;
-use hyper::{Body, Request};
+use http_body_util::StreamBody;
+use hyper::body::{Bytes, Incoming as IncomingBody};
+use hyper::Request;
use garage_util::data::Hash;
use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
+use crate::helpers::*;
use crate::signature::error::*;
+pub type ReqBody = BoxBody<Error>;
+
pub fn parse_streaming_body(
api_key: &Key,
- req: Request<Body>,
+ req: Request<IncomingBody>,
content_sha256: &mut Option<Hash>,
region: &str,
service: &str,
-) -> Result<Request<Body>, Error> {
+) -> Result<Request<ReqBody>, Error> {
match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
@@ -40,26 +44,22 @@ pub fn parse_streaming_body(
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
- let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+ let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
let scope = compute_scope(&date, region, service);
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
- Body::wrap_stream(
- SignedPayloadStream::new(
- body.map_err(Error::from),
- signing_hmac,
- date,
- &scope,
- signature,
- )
- .map_err(Error::from),
- )
+ let stream = body_stream::<_, Error>(body);
+ let signed_payload_stream =
+ SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
+ .map(|x| x.map(hyper::body::Frame::data))
+ .map_err(Error::from);
+ ReqBody::new(StreamBody::new(signed_payload_stream))
}))
}
- _ => Ok(req),
+ _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
}
}