aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/api_server.rs55
-rw-r--r--src/api/s3/bucket.rs32
-rw-r--r--src/api/s3/copy.rs21
-rw-r--r--src/api/s3/cors.rs52
-rw-r--r--src/api/s3/delete.rs17
-rw-r--r--src/api/s3/error.rs42
-rw-r--r--src/api/s3/get.rs168
-rw-r--r--src/api/s3/lifecycle.rs23
-rw-r--r--src/api/s3/list.rs17
-rw-r--r--src/api/s3/multipart.rs33
-rw-r--r--src/api/s3/post_object.rs36
-rw-r--r--src/api/s3/put.rs22
-rw-r--r--src/api/s3/router.rs29
-rw-r--r--src/api/s3/website.rs23
14 files changed, 347 insertions, 223 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 887839dd..7fac6261 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use hyper::header;
-use hyper::{Body, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -34,6 +34,9 @@ use crate::s3::put::*;
use crate::s3::router::Endpoint;
use crate::s3::website::*;
+pub use crate::signature::streaming::ReqBody;
+pub type ResBody = BoxBody<Error>;
+
pub struct S3ApiServer {
garage: Arc<Garage>,
}
@@ -48,19 +51,19 @@ impl S3ApiServer {
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, S3ApiServer { garage })
- .run_server(addr, None, shutdown_signal)
+ .run_server(addr, None, must_exit)
.await
}
async fn handle_request_without_bucket(
&self,
- _req: Request<Body>,
+ _req: Request<ReqBody>,
api_key: Key,
endpoint: Endpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
match endpoint {
Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
@@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer {
type Endpoint = S3ApiEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> {
let authority = req
.headers()
.get(header::HOST)
@@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: S3ApiEndpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let S3ApiEndpoint {
bucket_name,
endpoint,
@@ -118,7 +121,8 @@ impl ApiHandler for S3ApiServer {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, bucket_name).await;
+ let options_res = handle_options_api(garage, &req, bucket_name).await?;
+ return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
@@ -174,8 +178,26 @@ impl ApiHandler for S3ApiServer {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
- key, part_number, ..
- } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ key,
+ part_number,
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ ..
+ } => {
+ let overrides = GetObjectOverrides {
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ };
+ handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
+ }
Endpoint::UploadPart {
key,
part_number,
@@ -235,8 +257,7 @@ impl ApiHandler for S3ApiServer {
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
- let empty_body: Body = Body::from(vec![]);
- let response = Response::builder().body(empty_body).unwrap();
+ let response = Response::builder().body(empty_body()).unwrap();
Ok(response)
}
Endpoint::DeleteBucket {} => {
@@ -257,7 +278,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
@@ -287,7 +308,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
@@ -320,7 +341,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 733981e1..fa2f1b6d 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
@@ -14,11 +15,13 @@ use garage_util::data::*;
use garage_util::time::*;
use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
-pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
let loc = s3_xml::LocationConstraint {
xmlns: (),
region: garage.config.s3_api.s3_region.to_string(),
@@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>,
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> {
let versioning = s3_xml::VersioningConfiguration {
xmlns: (),
status: None,
@@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(
+ garage: &Garage,
+ api_key: &Key,
+) -> Result<Response<ResBody>, Error> {
let key_p = api_key.params().ok_or_internal_error(
"Key should not be in deleted state at this point (in handle_list_buckets)",
)?;
@@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_create_bucket(
garage: &Garage,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key: Key,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -194,7 +200,7 @@ pub async fn handle_create_bucket(
Ok(Response::builder()
.header("Location", format!("/{}", bucket_name))
- .body(Body::empty())
+ .body(empty_body())
.unwrap())
}
@@ -203,7 +209,7 @@ pub async fn handle_delete_bucket(
bucket_id: Uuid,
bucket_name: String,
api_key: Key,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let key_params = api_key
.params()
.ok_or_internal_error("Key should not be deleted at this point")?;
@@ -282,7 +288,7 @@ pub async fn handle_delete_bucket(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 68b4f0c9..ba9bfc88 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt};
use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
-use hyper::{Body, Request, Response};
+use hyper::{Request, Response};
use serde::Serialize;
use garage_rpc::netapp::bytes_buf::BytesBuf;
@@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::parse_bucket_key;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::multipart;
use crate::s3::put::get_headers;
@@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&garage, api_key, req).await?;
@@ -176,18 +177,18 @@ pub async fn handle_copy(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_upload_part_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
@@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
)
- .body(Body::from(resp_xml))?)
+ .body(string_body(resp_xml))?)
}
async fn get_copy_source(
garage: &Garage,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
) -> Result<Object, Error> {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
@@ -501,7 +502,7 @@ struct CopyPreconditionHeaders {
}
impl CopyPreconditionHeaders {
- fn parse(req: &Request<Body>) -> Result<Self, Error> {
+ fn parse(req: &Request<ReqBody>) -> Result<Self, Error> {
Ok(Self {
copy_source_if_match: req
.headers()
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 49097ad1..e069cae4 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -5,10 +5,18 @@ use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
};
-use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
+use hyper::{
+ body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response,
+ StatusCode,
+};
+
+use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
+use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -17,7 +25,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -34,18 +42,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -55,16 +63,16 @@ pub async fn handle_delete_cors(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -84,14 +92,14 @@ pub async fn handle_put_cors(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
-pub async fn handle_options_s3api(
+pub async fn handle_options_api(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket_name: Option<String>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
// FIXME: CORS rules of buckets with local aliases are
// not taken into account.
@@ -121,7 +129,7 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "*")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
} else {
// If there is no bucket name in the request,
@@ -131,14 +139,14 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
}
pub fn handle_options_for_bucket(
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket: &Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
.headers()
.get("Origin")
@@ -161,18 +169,20 @@ pub fn handle_options_for_bucket(
if let Some(rule) = matching_rule {
let mut resp = Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?;
+ .body(EmptyBody::new())?;
add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
return Ok(resp);
}
}
- Err(Error::forbidden("This CORS request is not allowed."))
+ Err(CommonError::Forbidden(
+ "This CORS request is not allowed.".into(),
+ ))
}
pub fn find_matching_cors_rule<'a>(
bucket: &'a Bucket,
- req: &Request<Body>,
+ req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, Error> {
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@@ -209,7 +219,7 @@ where
}
pub fn add_cors_headers(
- resp: &mut Response<Body>,
+ resp: &mut Response<impl Body>,
rule: &GarageCorsRule,
) -> Result<(), http::header::InvalidHeaderValue> {
let h = resp.headers_mut();
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 1c491eac..3fb39147 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -1,12 +1,15 @@
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
@@ -59,11 +62,11 @@ pub async fn handle_delete(
garage: Arc<Garage>,
bucket_id: Uuid,
key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
match handle_delete_internal(&garage, bucket_id, key).await {
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()),
Err(e) => Err(e),
}
@@ -72,10 +75,10 @@ pub async fn handle_delete(
pub async fn handle_delete_objects(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -118,7 +121,7 @@ pub async fn handle_delete_objects(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
struct DeleteRequest {
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index c50cff9f..f86c19a6 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -2,13 +2,12 @@ use std::convert::TryInto;
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
-
-use garage_model::helper::error::Error as HelperError;
+use hyper::{HeaderMap, StatusCode};
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
+use crate::helpers::*;
use crate::s3::xml as s3_xml;
use crate::signature::error::Error as SignatureError;
@@ -62,10 +61,6 @@ pub enum Error {
#[error(display = "Invalid XML: {}", _0)]
InvalidXml(String),
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
@@ -86,18 +81,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- e => Self::bad_request(format!("{}", e)),
- }
- }
-}
-
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@@ -118,7 +101,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
- SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
@@ -143,9 +125,7 @@ impl Error {
Error::NotImplemented(_) => "NotImplemented",
Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange",
- Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
- "InvalidRequest"
- }
+ Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
}
}
}
@@ -165,8 +145,7 @@ impl ApiError for Error {
| Error::EntityTooSmall
| Error::InvalidXml(_)
| Error::InvalidUtf8Str(_)
- | Error::InvalidUtf8String(_)
- | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
+ | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
}
}
@@ -189,22 +168,23 @@ impl ApiError for Error {
}
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = s3_xml::Error {
code: s3_xml::Value(self.aws_code().to_string()),
message: s3_xml::Value(format!("{}", self)),
resource: Some(s3_xml::Value(path.to_string())),
region: Some(s3_xml::Value(garage_region.to_string())),
};
- Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
+ let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
r#"
<?xml version="1.0" encoding="UTF-8"?>
<Error>
- <Code>InternalError</Code>
- <Message>XML encoding of error failed</Message>
+ <Code>InternalError</Code>
+ <Message>XML encoding of error failed</Message>
</Error>
- "#
+ "#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 5e682726..53f0a345 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -1,17 +1,20 @@
//! Function related to GET and HEAD requests
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use futures::future;
use futures::stream::{self, StreamExt};
use http::header::{
- ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
- IF_NONE_MATCH, LAST_MODIFIED, RANGE,
+ ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
+ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
+ LAST_MODIFIED, RANGE,
};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
-use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
+use garage_block::manager::BlockStream;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
@@ -20,10 +23,22 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
+#[derive(Default)]
+pub struct GetObjectOverrides {
+ pub(crate) response_cache_control: Option<String>,
+ pub(crate) response_content_disposition: Option<String>,
+ pub(crate) response_content_encoding: Option<String>,
+ pub(crate) response_content_language: Option<String>,
+ pub(crate) response_content_type: Option<String>,
+ pub(crate) response_expires: Option<String>,
+}
+
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
@@ -49,11 +64,37 @@ fn object_headers(
resp
}
+/// Override headers according to specific query parameters, see
+/// section "Overriding response header values through the request" in
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+fn getobject_override_headers(
+ overrides: GetObjectOverrides,
+ resp: &mut http::response::Builder,
+) -> Result<(), Error> {
+ // TODO: this only applies for signed requests, so when we support
+ // anonymous access in the future we will have to do a permission check here
+ let overrides = [
+ (CACHE_CONTROL, overrides.response_cache_control),
+ (CONTENT_DISPOSITION, overrides.response_content_disposition),
+ (CONTENT_ENCODING, overrides.response_content_encoding),
+ (CONTENT_LANGUAGE, overrides.response_content_language),
+ (CONTENT_TYPE, overrides.response_content_type),
+ (EXPIRES, overrides.response_expires),
+ ];
+ for (hdr, val_opt) in overrides {
+ if let Some(val) = val_opt {
+ let val = val.try_into().ok_or_bad_request("invalid header value")?;
+ resp.headers_mut().unwrap().insert(hdr, val);
+ }
+ }
+ Ok(())
+}
+
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
- req: &Request<Body>,
-) -> Option<Response<Body>> {
+ req: &Request<impl Body>,
+) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
@@ -80,7 +121,7 @@ fn try_answer_cached(
Some(
Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
)
} else {
@@ -91,11 +132,11 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -138,7 +179,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -163,7 +204,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
_ => unreachable!(),
}
@@ -171,18 +212,19 @@ pub async fn handle_head(
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+ overrides: GetObjectOverrides,
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -233,18 +275,18 @@ pub async fn handle_get(
(None, None) => (),
}
- let resp_builder = object_headers(last_v, last_v_meta)
+ let mut resp_builder = object_headers(last_v, last_v_meta)
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK);
+ getobject_override_headers(overrides, &mut resp_builder)?;
match &last_v_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- let body: Body = Body::from(bytes.to_vec());
- Ok(resp_builder.body(body)?)
+ Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let (tx, rx) = mpsc::channel(2);
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash;
@@ -282,20 +324,12 @@ pub async fn handle_get(
{
Ok(()) => (),
Err(e) => {
- let err = std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- let _ = tx
- .send(Box::pin(stream::once(future::ready(Err(err)))))
- .await;
+ let _ = tx.send(error_stream_item(e)).await;
}
}
});
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
-
- let body = hyper::body::Body::wrap_stream(body_stream);
+ let body = response_body_from_block_stream(rx);
Ok(resp_builder.body(body)?)
}
}
@@ -308,7 +342,10 @@ async fn handle_get_range(
version_meta: &ObjectVersionMeta,
begin: u64,
end: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Here we do not use getobject_override_headers because we don't
+ // want to add any overridden headers (those should not be added
+ // when returning PARTIAL_CONTENT)
let resp_builder = object_headers(version, version_meta)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
@@ -321,7 +358,7 @@ async fn handle_get_range(
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
if end as usize <= bytes.len() {
- let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
+ let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
} else {
Err(Error::internal_error(
@@ -348,7 +385,8 @@ async fn handle_get_part(
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Same as for get_range, no getobject_override_headers
let resp_builder =
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
@@ -364,7 +402,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(Body::from(bytes.to_vec()))?)
+ .body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -392,7 +430,7 @@ async fn handle_get_part(
}
fn parse_range_header(
- req: &Request<Body>,
+ req: &Request<impl Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@@ -434,7 +472,7 @@ fn body_from_blocks_range(
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
-) -> Body {
+) -> ResBody {
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
@@ -456,17 +494,17 @@ fn body_from_blocks_range(
}
let order_stream = OrderTag::stream();
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (block, block_offset))| {
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
+
+ tokio::spawn(async move {
+ match async {
let garage = garage.clone();
- async move {
- garage
+ for (i, (block, block_offset)) in blocks.iter().enumerate() {
+ let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- .scan(block_offset, move |chunk_offset, chunk| {
+ .await?
+ .scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
@@ -502,20 +540,42 @@ fn body_from_blocks_range(
};
futures::future::ready(r)
})
- .filter_map(futures::future::ready)
+ .filter_map(futures::future::ready);
+
+ let block_stream: BlockStream = Box::pin(block_stream);
+ tx.send(Box::pin(block_stream))
+ .await
+ .ok_or_message("channel closed")?;
}
- })
- .buffered(2)
- .flatten();
- hyper::body::Body::wrap_stream(body_stream)
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let _ = tx.send(error_stream_item(e)).await;
+ }
+ }
+ });
+
+ response_body_from_block_stream(rx)
+}
+
+fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
+ .flatten()
+ .map(|x| {
+ x.map(hyper::body::Frame::data)
+ .map_err(|e| Error::from(garage_util::error::Error::from(e)))
+ });
+ ResBody::new(http_body_util::StreamBody::new(body_stream))
}
-fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
- Box::pin(futures::stream::once(async move {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Could not get block {}: {}", i, e),
- ))
- }))
+fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ Box::pin(stream::once(future::ready(Err(err))))
}
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 1e7d6755..35757e8c 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -1,10 +1,13 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -16,7 +19,7 @@ use garage_model::bucket_table::{
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 1b9e8cd5..b832a4f4 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable};
use std::sync::Arc;
use base64::prelude::*;
-use hyper::{Body, Response};
+use hyper::Response;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -16,7 +16,8 @@ use garage_model::s3::object_table::*;
use garage_table::EnumerationOrder;
use crate::encoding::*;
-use crate::helpers::key_after_prefix;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
use crate::s3::multipart as s3_multipart;
use crate::s3::xml as s3_xml;
@@ -63,7 +64,7 @@ pub struct ListPartsQuery {
pub async fn handle_list(
garage: Arc<Garage>,
query: &ListObjectsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -162,13 +163,13 @@ pub async fn handle_list(
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_multipart_upload(
garage: Arc<Garage>,
query: &ListMultipartUploadsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_parts(
garage: Arc<Garage>,
query: &ListPartsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
@@ -319,7 +320,7 @@ pub async fn handle_list_parts(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
/*
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 6b786318..b9d15b21 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::prelude::*;
-use hyper::body::Body;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
@@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -26,11 +27,11 @@ use crate::signature::verify_signed_content;
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_put_part(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -87,8 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body = req.into_body().map_err(Error::from);
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let stream = body_stream(req.into_body());
+ let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
@@ -172,7 +173,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::empty())
+ .body(empty_body())
.unwrap();
Ok(response)
}
@@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup {
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_abort_multipart_upload(
@@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload(
bucket_id: Uuid,
key: &str,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
@@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Body::from(vec![])))
+ Ok(Response::new(empty_body()))
}
// ======== helpers ============
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 542b7a81..bca8d6c6 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::convert::TryInto;
+use std::convert::{Infallible, TryInto};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -9,12 +9,15 @@ use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
+use crate::s3::cors::*;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
@@ -22,9 +25,9 @@ use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let boundary = req
.headers()
.get(header::CONTENT_TYPE)
@@ -41,7 +44,8 @@ pub async fn handle_post_object(
);
let (head, body) = req.into_parts();
- let mut multipart = Multipart::with_constraints(body, boundary, constraints);
+ let stream = body_stream::<_, Error>(body);
+ let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
@@ -242,7 +246,7 @@ pub async fn handle_post_object(
let etag = format!("\"{}\"", md5);
- let resp = if let Some(mut target) = params
+ let mut resp = if let Some(mut target) = params
.get("success_action_redirect")
.and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok())
@@ -258,12 +262,11 @@ pub async fn handle_post_object(
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
- .body(target.into())?
+ .body(string_body(target))?
} else {
let path = head
.uri
- .into_parts()
- .path_and_query
+ .path_and_query()
.map(|paq| paq.path().to_string())
.unwrap_or_else(|| "/".to_string());
let authority = head
@@ -290,7 +293,7 @@ pub async fn handle_post_object(
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
match action {
- "200" => builder.status(StatusCode::OK).body(Body::empty())?,
+ "200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => {
let xml = s3_xml::PostObject {
xmlns: (),
@@ -302,12 +305,21 @@ pub async fn handle_post_object(
let body = s3_xml::to_xml_with_header(&xml)?;
builder
.status(StatusCode::CREATED)
- .body(Body::from(body.into_bytes()))?
+ .body(string_body(body))?
}
- _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
+ _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?,
}
};
+ let matching_cors_rule = find_matching_cors_rule(
+ &bucket,
+ &Request::from_parts(head, empty_body::<Infallible>()),
+ )?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
Ok(resp)
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index d1c88a76..8902b14c 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -4,12 +4,13 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
use futures::try_join;
-use hyper::body::{Body, Bytes};
-use hyper::header::{HeaderMap, HeaderValue};
-use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use hyper::body::Bytes;
+use hyper::header::{HeaderMap, HeaderValue};
+use hyper::{Request, Response};
+
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
@@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
pub async fn handle_put(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
@@ -48,13 +51,12 @@ pub async fn handle_put(
None => None,
};
- let (_head, body) = req.into_parts();
- let body = body.map_err(Error::from);
+ let stream = body_stream(req.into_body());
save_stream(
garage,
headers,
- body,
+ stream,
bucket,
key,
content_md5,
@@ -434,11 +436,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
}
}
-pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
+pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()
}
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index 821b0e07..e7ac1d77 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -125,6 +125,12 @@ pub enum Endpoint {
key: String,
part_number: Option<u64>,
version_id: Option<String>,
+ response_cache_control: Option<String>,
+ response_content_disposition: Option<String>,
+ response_content_encoding: Option<String>,
+ response_content_language: Option<String>,
+ response_content_type: Option<String>,
+ response_expires: Option<String>,
},
GetObjectAcl {
key: String,
@@ -170,7 +176,7 @@ pub enum Endpoint {
},
ListBuckets,
ListMultipartUploads {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_uploads: Option<usize>,
@@ -178,7 +184,7 @@ pub enum Endpoint {
upload_id_marker: Option<String>,
},
ListObjects {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
marker: Option<String>,
max_keys: Option<usize>,
@@ -188,7 +194,7 @@ pub enum Endpoint {
// This value should always be 2. It is not checked when constructing the struct
list_type: String,
continuation_token: Option<String>,
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
fetch_owner: Option<bool>,
max_keys: Option<usize>,
@@ -196,7 +202,7 @@ pub enum Endpoint {
start_after: Option<String>,
},
ListObjectVersions {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_keys: Option<u64>,
@@ -358,7 +364,14 @@ impl Endpoint {
(query.keyword.take().unwrap_or_default(), key, query, None),
key: [
EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker),
- EMPTY => GetObject (query_opt::version_id, opt_parse::part_number),
+ EMPTY => GetObject (query_opt::version_id,
+ opt_parse::part_number,
+ query_opt::response_cache_control,
+ query_opt::response_content_disposition,
+ query_opt::response_content_encoding,
+ query_opt::response_content_language,
+ query_opt::response_content_type,
+ query_opt::response_expires),
ACL => GetObjectAcl (query_opt::version_id),
LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id),
RETENTION => GetObjectRetention (query_opt::version_id),
@@ -671,6 +684,12 @@ generateQueryParameters! {
"partNumber" => part_number,
"part-number-marker" => part_number_marker,
"prefix" => prefix,
+ "response-cache-control" => response_cache_control,
+ "response-content-disposition" => response_content_disposition,
+ "response-content-encoding" => response_content_encoding,
+ "response-content-language" => response_content_language,
+ "response-content-type" => response_content_type,
+ "response-expires" => response_expires,
"select-type" => select_type,
"start-after" => start_after,
"uploadId" => upload_id,
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 7f2ab925..1c1dbf20 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -1,9 +1,12 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -12,7 +15,7 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_website(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -54,16 +57,16 @@ pub async fn handle_delete_website(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_website(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -83,7 +86,7 @@ pub async fn handle_put_website(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]