diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-13 11:24:56 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-13 11:36:28 +0100 |
commit | cf2af186fcc0c8f581a966454b6cd4720d3821f0 (patch) | |
tree | 37a978ba9ffb780fc828cff7b8ec93662d50884f /src/api/s3 | |
parent | db48dd3d6c1f9e86a62e9b8edfce2c1620bcd5f3 (diff) | |
parent | 823078b4cdaf93e09de0847c5eaa75beb7b26b7f (diff) | |
download | garage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.tar.gz garage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/api_server.rs | 55 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 32 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 21 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 52 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 17 | ||||
-rw-r--r-- | src/api/s3/error.rs | 42 | ||||
-rw-r--r-- | src/api/s3/get.rs | 168 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 23 | ||||
-rw-r--r-- | src/api/s3/list.rs | 17 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 33 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 36 | ||||
-rw-r--r-- | src/api/s3/put.rs | 22 | ||||
-rw-r--r-- | src/api/s3/router.rs | 29 | ||||
-rw-r--r-- | src/api/s3/website.rs | 23 |
14 files changed, 347 insertions, 223 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 887839dd..7fac6261 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::header; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -34,6 +34,9 @@ use crate::s3::put::*; use crate::s3::router::Endpoint; use crate::s3::website::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody<Error>; + pub struct S3ApiServer { garage: Arc<Garage>, } @@ -48,19 +51,19 @@ impl S3ApiServer { garage: Arc<Garage>, addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, None, shutdown_signal) + .run_server(addr, None, must_exit) .await } async fn handle_request_without_bucket( &self, - _req: Request<Body>, + _req: Request<ReqBody>, api_key: Key, endpoint: Endpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { match endpoint { Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), @@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer { type Endpoint = S3ApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> { let authority = req .headers() .get(header::HOST) @@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: S3ApiEndpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let S3ApiEndpoint { bucket_name, endpoint, @@ -118,7 +121,8 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; + let options_res = handle_options_api(garage, &req, bucket_name).await?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; @@ -174,8 +178,26 @@ impl ApiHandler for S3ApiServer { key, part_number, .. } => handle_head(garage, &req, bucket_id, &key, part_number).await, Endpoint::GetObject { - key, part_number, .. - } => handle_get(garage, &req, bucket_id, &key, part_number).await, + key, + part_number, + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + .. + } => { + let overrides = GetObjectOverrides { + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + }; + handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + } Endpoint::UploadPart { key, part_number, @@ -235,8 +257,7 @@ impl ApiHandler for S3ApiServer { } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); + let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } Endpoint::DeleteBucket {} => { @@ -257,7 +278,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), @@ -287,7 +308,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), prefix: prefix.unwrap_or_default(), @@ -320,7 +341,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 733981e1..fa2f1b6d 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; @@ -14,11 +15,13 @@ use garage_util::data::*; use garage_util::time::*; use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> { let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> { let versioning = s3_xml::VersioningConfiguration { xmlns: (), status: None, @@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> { +pub async fn handle_list_buckets( + garage: &Garage, + api_key: &Key, +) -> Result<Response<ResBody>, Error> { let key_p = api_key.params().ok_or_internal_error( "Key should not be in deleted state at this point (in handle_list_buckets)", )?; @@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_create_bucket( garage: &Garage, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, api_key: Key, bucket_name: String, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -194,7 +200,7 @@ pub async fn handle_create_bucket( Ok(Response::builder() .header("Location", format!("/{}", bucket_name)) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } @@ -203,7 +209,7 @@ pub async fn handle_delete_bucket( bucket_id: Uuid, bucket_name: String, api_key: Key, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let key_params = api_key .params() .ok_or_internal_error("Key should not be deleted at this point")?; @@ -282,7 +288,7 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 68b4f0c9..ba9bfc88 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; -use hyper::{Body, Request, Response}; +use hyper::{Request, Response}; use serde::Serialize; use garage_rpc::netapp::bytes_buf::BytesBuf; @@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::parse_bucket_key; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::multipart; use crate::s3::put::get_headers; @@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let source_object = get_copy_source(&garage, api_key, req).await?; @@ -176,18 +177,18 @@ pub async fn handle_copy( "x-amz-copy-source-version-id", hex::encode(source_version.uuid), ) - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_upload_part_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let dest_upload_id = multipart::decode_upload_id(upload_id)?; @@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), ) - .body(Body::from(resp_xml))?) + .body(string_body(resp_xml))?) } async fn get_copy_source( garage: &Garage, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, ) -> Result<Object, Error> { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; @@ -501,7 +502,7 @@ struct CopyPreconditionHeaders { } impl CopyPreconditionHeaders { - fn parse(req: &Request<Body>) -> Result<Self, Error> { + fn parse(req: &Request<ReqBody>) -> Result<Self, Error> { Ok(Self { copy_source_if_match: req .headers() diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 49097ad1..e069cae4 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -5,10 +5,18 @@ use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, }; -use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; +use hyper::{ + body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, + StatusCode, +}; + +use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -17,7 +25,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -34,18 +42,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_cors( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -55,16 +63,16 @@ pub async fn handle_delete_cors( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_cors( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -84,14 +92,14 @@ pub async fn handle_put_cors( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } -pub async fn handle_options_s3api( +pub async fn handle_options_api( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<IncomingBody>, bucket_name: Option<String>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -121,7 +129,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(Body::empty())?) + .body(EmptyBody::new())?) } } else { // If there is no bucket name in the request, @@ -131,14 +139,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(Body::empty())?) + .body(EmptyBody::new())?) } } pub fn handle_options_for_bucket( - req: &Request<Body>, + req: &Request<IncomingBody>, bucket: &Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { let origin = req .headers() .get("Origin") @@ -161,18 +169,20 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(Body::empty())?; + .body(EmptyBody::new())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } } - Err(Error::forbidden("This CORS request is not allowed.")) + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) } pub fn find_matching_cors_rule<'a>( bucket: &'a Bucket, - req: &Request<Body>, + req: &Request<impl Body>, ) -> Result<Option<&'a GarageCorsRule>, Error> { if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -209,7 +219,7 @@ where } pub fn add_cors_headers( - resp: &mut Response<Body>, + resp: &mut Response<impl Body>, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 1c491eac..3fb39147 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,12 +1,15 @@ use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; @@ -59,11 +62,11 @@ pub async fn handle_delete( garage: Arc<Garage>, bucket_id: Uuid, key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { match handle_delete_internal(&garage, bucket_id, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap()), Err(e) => Err(e), } @@ -72,10 +75,10 @@ pub async fn handle_delete( pub async fn handle_delete_objects( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -118,7 +121,7 @@ pub async fn handle_delete_objects( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } struct DeleteRequest { diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index c50cff9f..f86c19a6 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -2,13 +2,12 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; - -use garage_model::helper::error::Error as HelperError; +use hyper::{HeaderMap, StatusCode}; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; +use crate::helpers::*; use crate::s3::xml as s3_xml; use crate::signature::error::Error as SignatureError; @@ -62,10 +61,6 @@ pub enum Error { #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), @@ -86,18 +81,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::bad_request(format!("{}", e)), - } - } -} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -118,7 +101,6 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -143,9 +125,7 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", - Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { - "InvalidRequest" - } + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", } } } @@ -165,8 +145,7 @@ impl ApiError for Error { | Error::EntityTooSmall | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) - | Error::InvalidUtf8String(_) - | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, } } @@ -189,22 +168,23 @@ impl ApiError for Error { } } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = s3_xml::Error { code: s3_xml::Value(self.aws_code().to_string()), message: s3_xml::Value(format!("{}", self)), resource: Some(s3_xml::Value(path.to_string())), region: Some(s3_xml::Value(garage_region.to_string())), }; - Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { r#" <?xml version="1.0" encoding="UTF-8"?> <Error> - <Code>InternalError</Code> - <Message>XML encoding of error failed</Message> + <Code>InternalError</Code> + <Message>XML encoding of error failed</Message> </Error> - "# + "# .into() - })) + }); + error_body(error_str) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 5e682726..53f0a345 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -1,17 +1,20 @@ //! Function related to GET and HEAD requests +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; use futures::future; use futures::stream::{self, StreamExt}; use http::header::{ - ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, - IF_NONE_MATCH, LAST_MODIFIED, RANGE, + ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, + LAST_MODIFIED, RANGE, }; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; +use garage_block::manager::BlockStream; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; @@ -20,10 +23,22 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; +#[derive(Default)] +pub struct GetObjectOverrides { + pub(crate) response_cache_control: Option<String>, + pub(crate) response_content_disposition: Option<String>, + pub(crate) response_content_encoding: Option<String>, + pub(crate) response_content_language: Option<String>, + pub(crate) response_content_type: Option<String>, + pub(crate) response_expires: Option<String>, +} + fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -49,11 +64,37 @@ fn object_headers( resp } +/// Override headers according to specific query parameters, see +/// section "Overriding response header values through the request" in +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html +fn getobject_override_headers( + overrides: GetObjectOverrides, + resp: &mut http::response::Builder, +) -> Result<(), Error> { + // TODO: this only applies for signed requests, so when we support + // anonymous access in the future we will have to do a permission check here + let overrides = [ + (CACHE_CONTROL, overrides.response_cache_control), + (CONTENT_DISPOSITION, overrides.response_content_disposition), + (CONTENT_ENCODING, overrides.response_content_encoding), + (CONTENT_LANGUAGE, overrides.response_content_language), + (CONTENT_TYPE, overrides.response_content_type), + (EXPIRES, overrides.response_expires), + ]; + for (hdr, val_opt) in overrides { + if let Some(val) = val_opt { + let val = val.try_into().ok_or_bad_request("invalid header value")?; + resp.headers_mut().unwrap().insert(hdr, val); + } + } + Ok(()) +} + fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<Body>, -) -> Option<Response<Body>> { + req: &Request<impl Body>, +) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational @@ -80,7 +121,7 @@ fn try_answer_cached( Some( Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ) } else { @@ -91,11 +132,11 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -138,7 +179,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, "1") .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -163,7 +204,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } _ => unreachable!(), } @@ -171,18 +212,19 @@ pub async fn handle_head( Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } /// Handle GET request pub async fn handle_get( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { + overrides: GetObjectOverrides, +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -233,18 +275,18 @@ pub async fn handle_get( (None, None) => (), } - let resp_builder = object_headers(last_v, last_v_meta) + let mut resp_builder = object_headers(last_v, last_v_meta) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size)) .status(StatusCode::OK); + getobject_override_headers(overrides, &mut resp_builder)?; match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - let body: Body = Body::from(bytes.to_vec()); - Ok(resp_builder.body(body)?) + Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel(2); + let (tx, rx) = mpsc::channel::<BlockStream>(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -282,20 +324,12 @@ pub async fn handle_get( { Ok(()) => (), Err(e) => { - let err = std::io::Error::new( - std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - let _ = tx - .send(Box::pin(stream::once(future::ready(Err(err))))) - .await; + let _ = tx.send(error_stream_item(e)).await; } } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); - - let body = hyper::body::Body::wrap_stream(body_stream); + let body = response_body_from_block_stream(rx); Ok(resp_builder.body(body)?) } } @@ -308,7 +342,10 @@ async fn handle_get_range( version_meta: &ObjectVersionMeta, begin: u64, end: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { + // Here we do not use getobject_override_headers because we don't + // want to add any overridden headers (those should not be added + // when returning PARTIAL_CONTENT) let resp_builder = object_headers(version, version_meta) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( @@ -321,7 +358,7 @@ async fn handle_get_range( ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { if end as usize <= bytes.len() { - let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); + let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) } else { Err(Error::internal_error( @@ -348,7 +385,8 @@ async fn handle_get_part( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, part_number: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { + // Same as for get_range, no getobject_override_headers let resp_builder = object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); @@ -364,7 +402,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(Body::from(bytes.to_vec()))?) + .body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -392,7 +430,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<Body>, + req: &Request<impl Body>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -434,7 +472,7 @@ fn body_from_blocks_range( all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, -) -> Body { +) -> ResBody { // We will store here the list of blocks that have an intersection with the requested // range, as well as their "true offset", which is their actual offset in the complete // file (whereas block.offset designates the offset of the block WITHIN THE PART @@ -456,17 +494,17 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { + let (tx, rx) = mpsc::channel::<BlockStream>(2); + + tokio::spawn(async move { + match async { let garage = garage.clone(); - async move { - garage + for (i, (block, block_offset)) in blocks.iter().enumerate() { + let block_stream = garage .block_manager .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { + .await? + .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { let chunk_len = chunk_bytes.len() as u64; @@ -502,20 +540,42 @@ fn body_from_blocks_range( }; futures::future::ready(r) }) - .filter_map(futures::future::ready) + .filter_map(futures::future::ready); + + let block_stream: BlockStream = Box::pin(block_stream); + tx.send(Box::pin(block_stream)) + .await + .ok_or_message("channel closed")?; } - }) - .buffered(2) - .flatten(); - hyper::body::Body::wrap_stream(body_stream) + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let _ = tx.send(error_stream_item(e)).await; + } + } + }); + + response_body_from_block_stream(rx) +} + +fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) +fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + Box::pin(stream::once(future::ready(Err(err)))) } diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 1e7d6755..35757e8c 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,10 +1,13 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -16,7 +19,7 @@ use garage_model::bucket_table::{ use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 1b9e8cd5..b832a4f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable}; use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Response}; +use hyper::Response; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -16,7 +16,8 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; use crate::encoding::*; -use crate::helpers::key_after_prefix; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -63,7 +64,7 @@ pub struct ListPartsQuery { pub async fn handle_list( garage: Arc<Garage>, query: &ListObjectsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -162,13 +163,13 @@ pub async fn handle_list( let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_multipart_upload( garage: Arc<Garage>, query: &ListMultipartUploadsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_parts( garage: Arc<Garage>, query: &ListPartsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; @@ -319,7 +320,7 @@ pub async fn handle_list_parts( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } /* diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 6b786318..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use futures::prelude::*; -use hyper::body::Body; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -26,11 +27,11 @@ use crate::signature::verify_signed_content; pub async fn handle_create_multipart_upload( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_name: &str, bucket_id: Uuid, key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -87,8 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = req.into_body().map_err(Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), @@ -172,7 +173,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) + .body(empty_body()) .unwrap(); Ok(response) } @@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup { pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( @@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload( bucket_id: Uuid, key: &str, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = @@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Body::from(vec![]))) + Ok(Response::new(empty_body())) } // ======== helpers ============ diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 542b7a81..bca8d6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{Infallible, TryInto}; use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; @@ -9,12 +9,15 @@ use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; use futures::{Stream, StreamExt}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use crate::helpers::*; +use crate::s3::api_server::ResBody; +use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; @@ -22,9 +25,9 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc<Garage>, - req: Request<Body>, + req: Request<IncomingBody>, bucket_name: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let boundary = req .headers() .get(header::CONTENT_TYPE) @@ -41,7 +44,8 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let mut multipart = Multipart::with_constraints(body, boundary, constraints); + let stream = body_stream::<_, Error>(body); + let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { @@ -242,7 +246,7 @@ pub async fn handle_post_object( let etag = format!("\"{}\"", md5); - let resp = if let Some(mut target) = params + let mut resp = if let Some(mut target) = params .get("success_action_redirect") .and_then(|h| h.to_str().ok()) .and_then(|u| url::Url::parse(u).ok()) @@ -258,12 +262,11 @@ pub async fn handle_post_object( .status(StatusCode::SEE_OTHER) .header(header::LOCATION, target.clone()) .header(header::ETAG, etag) - .body(target.into())? + .body(string_body(target))? } else { let path = head .uri - .into_parts() - .path_and_query + .path_and_query() .map(|paq| paq.path().to_string()) .unwrap_or_else(|| "/".to_string()); let authority = head @@ -290,7 +293,7 @@ pub async fn handle_post_object( .header(header::LOCATION, location.clone()) .header(header::ETAG, etag.clone()); match action { - "200" => builder.status(StatusCode::OK).body(Body::empty())?, + "200" => builder.status(StatusCode::OK).body(empty_body())?, "201" => { let xml = s3_xml::PostObject { xmlns: (), @@ -302,12 +305,21 @@ pub async fn handle_post_object( let body = s3_xml::to_xml_with_header(&xml)?; builder .status(StatusCode::CREATED) - .body(Body::from(body.into_bytes()))? + .body(string_body(body))? } - _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?, + _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?, } }; + let matching_cors_rule = find_matching_cors_rule( + &bucket, + &Request::from_parts(head, empty_body::<Infallible>()), + )?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index d1c88a76..8902b14c 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,12 +4,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use hyper::body::{Body, Bytes}; -use hyper::header::{HeaderMap, HeaderValue}; -use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use hyper::body::Bytes; +use hyper::header::{HeaderMap, HeaderValue}; +use hyper::{Request, Response}; + use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, Context, @@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; pub async fn handle_put( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket: &Bucket, key: &String, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); @@ -48,13 +51,12 @@ pub async fn handle_put( None => None, }; - let (_head, body) = req.into_parts(); - let body = body.map_err(Error::from); + let stream = body_stream(req.into_body()); save_stream( garage, headers, - body, + stream, bucket, key, content_md5, @@ -434,11 +436,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { +pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap() } diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index 821b0e07..e7ac1d77 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -125,6 +125,12 @@ pub enum Endpoint { key: String, part_number: Option<u64>, version_id: Option<String>, + response_cache_control: Option<String>, + response_content_disposition: Option<String>, + response_content_encoding: Option<String>, + response_content_language: Option<String>, + response_content_type: Option<String>, + response_expires: Option<String>, }, GetObjectAcl { key: String, @@ -170,7 +176,7 @@ pub enum Endpoint { }, ListBuckets, ListMultipartUploads { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, key_marker: Option<String>, max_uploads: Option<usize>, @@ -178,7 +184,7 @@ pub enum Endpoint { upload_id_marker: Option<String>, }, ListObjects { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, marker: Option<String>, max_keys: Option<usize>, @@ -188,7 +194,7 @@ pub enum Endpoint { // This value should always be 2. It is not checked when constructing the struct list_type: String, continuation_token: Option<String>, - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, fetch_owner: Option<bool>, max_keys: Option<usize>, @@ -196,7 +202,7 @@ pub enum Endpoint { start_after: Option<String>, }, ListObjectVersions { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, key_marker: Option<String>, max_keys: Option<u64>, @@ -358,7 +364,14 @@ impl Endpoint { (query.keyword.take().unwrap_or_default(), key, query, None), key: [ EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker), - EMPTY => GetObject (query_opt::version_id, opt_parse::part_number), + EMPTY => GetObject (query_opt::version_id, + opt_parse::part_number, + query_opt::response_cache_control, + query_opt::response_content_disposition, + query_opt::response_content_encoding, + query_opt::response_content_language, + query_opt::response_content_type, + query_opt::response_expires), ACL => GetObjectAcl (query_opt::version_id), LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id), RETENTION => GetObjectRetention (query_opt::version_id), @@ -671,6 +684,12 @@ generateQueryParameters! { "partNumber" => part_number, "part-number-marker" => part_number_marker, "prefix" => prefix, + "response-cache-control" => response_cache_control, + "response-content-disposition" => response_content_disposition, + "response-content-encoding" => response_content_encoding, + "response-content-language" => response_content_language, + "response-content-type" => response_content_type, + "response-expires" => response_expires, "select-type" => select_type, "start-after" => start_after, "uploadId" => upload_id, diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 7f2ab925..1c1dbf20 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,9 +1,12 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -12,7 +15,7 @@ use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_website( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -54,16 +57,16 @@ pub async fn handle_delete_website( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_website( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -83,7 +86,7 @@ pub async fn handle_put_website( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] |