diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-05 19:27:12 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-05 19:27:12 +0100 |
commit | a22bd319202f05bce4ad13072238c7ba81d518fb (patch) | |
tree | 817d90efaeaa51ea8aebaf5b25ad56c8b968dc0c | |
parent | 0bb5b77530ad432e4c77f13b395fe74613812337 (diff) | |
download | garage-a22bd319202f05bce4ad13072238c7ba81d518fb.tar.gz garage-a22bd319202f05bce4ad13072238c7ba81d518fb.zip |
[dep-upgrade-202402] migration to http/hyper 1.0 for k2v api
-rw-r--r-- | src/api/admin/bucket.rs | 6 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 8 | ||||
-rw-r--r-- | src/api/admin/error.rs | 12 | ||||
-rw-r--r-- | src/api/admin/key.rs | 6 | ||||
-rw-r--r-- | src/api/common_error.rs | 23 | ||||
-rw-r--r-- | src/api/helpers.rs | 24 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 16 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 31 | ||||
-rw-r--r-- | src/api/k2v/error.rs | 32 | ||||
-rw-r--r-- | src/api/k2v/index.rs | 7 | ||||
-rw-r--r-- | src/api/k2v/item.rs | 46 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 3 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 19 | ||||
-rw-r--r-- | src/api/s3/error.rs | 28 | ||||
-rw-r--r-- | src/api/signature/error.rs | 4 | ||||
-rw-r--r-- | src/web/web_server.rs | 4 |
16 files changed, 131 insertions, 138 deletions
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 0bfb87c5..1b22dd03 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -271,7 +271,7 @@ pub async fn handle_create_bucket( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<CreateBucketRequest>(req).await?; + let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?; if let Some(ga) = &req.global_alias { if !is_valid_bucket_name(ga) { @@ -412,7 +412,7 @@ pub async fn handle_update_bucket( id: String, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<UpdateBucketRequest>(req).await?; + let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?; let bucket_id = parse_bucket_id(&id)?; let mut bucket = garage @@ -474,7 +474,7 @@ pub async fn handle_bucket_change_key_perm( req: Request<IncomingBody>, new_perm_flag: bool, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; + let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?; let bucket_id = parse_bucket_id(&req.bucket_id)?; diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 1ec8d6de..3876c608 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -64,7 +64,7 @@ pub async fn handle_connect_cluster_nodes( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<Vec<String>>(req).await?; + let req = parse_json_body::<Vec<String>, _, Error>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) .await @@ -206,7 +206,7 @@ pub async fn handle_update_cluster_layout( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; let mut layout = garage.system.get_cluster_layout(); @@ -246,7 +246,7 @@ pub async fn handle_apply_cluster_layout( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; let layout = garage.system.get_cluster_layout(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; @@ -264,7 +264,7 @@ pub async fn handle_revert_cluster_layout( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; + let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; let layout = garage.system.get_cluster_layout(); let layout = layout.revert_staged_changes(Some(param.version))?; diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 98cc7a9e..011c903f 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -40,18 +40,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n), - } - } -} - impl Error { fn code(&self) -> &'static str { match self { diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 3e5d2cab..1efaca16 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -65,7 +65,7 @@ pub async fn handle_create_key( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<CreateKeyRequest>(req).await?; + let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); garage.key_table.insert(&key).await?; @@ -83,7 +83,7 @@ pub async fn handle_import_key( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<ImportKeyRequest>(req).await?; + let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; if prev_key.is_some() { @@ -114,7 +114,7 @@ pub async fn handle_update_key( id: String, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<UpdateKeyRequest>(req).await?; + let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?; let mut key = garage.key_helper().get_existing_key(&id).await?; diff --git a/src/api/common_error.rs b/src/api/common_error.rs index 20f9f266..4381f227 100644 --- a/src/api/common_error.rs +++ b/src/api/common_error.rs @@ -3,6 +3,8 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +use garage_model::helper::error::Error as HelperError; + /// Errors of this crate #[derive(Debug, Error)] pub enum CommonError { @@ -28,6 +30,10 @@ pub enum CommonError { #[error(display = "Bad request: {}", _0)] BadRequest(String), + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + // ---- SPECIFIC ERROR CONDITIONS ---- // These have to be error codes referenced in the S3 spec here: // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList @@ -64,7 +70,9 @@ impl CommonError { CommonError::Forbidden(_) => StatusCode::FORBIDDEN, CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, - CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, + CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => { + StatusCode::BAD_REQUEST + } } } @@ -84,6 +92,7 @@ impl CommonError { CommonError::BucketAlreadyExists => "BucketAlreadyExists", CommonError::BucketNotEmpty => "BucketNotEmpty", CommonError::InvalidBucketName(_) => "InvalidBucketName", + CommonError::InvalidHeader(_) => "InvalidHeaderValue", } } @@ -92,6 +101,18 @@ impl CommonError { } } +impl From<HelperError> for CommonError { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::InternalError(i), + HelperError::BadRequest(b) => Self::BadRequest(b), + HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n), + HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n), + e => Self::bad_request(format!("{}", e)), + } + } +} + pub trait CommonErrorDerivative: From<CommonError> { fn internal_error<M: ToString>(msg: M) -> Self { Self::from(CommonError::InternalError(GarageError::Message( diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 541b2def..57aa1ea1 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,8 +1,10 @@ use http_body_util::{BodyExt, Full as FullBody}; -use hyper::{body::Incoming as IncomingBody, Request, Response}; +use hyper::{body::Body, Request, Response}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; +use garage_util::error::Error as GarageError; + use crate::common_error::{CommonError as Error, *}; /// What kind of authorization is required to perform a given action @@ -141,6 +143,7 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { // =============== body helpers ================= +pub type EmptyBody = http_body_util::Empty<bytes::Bytes>; pub type BytesBody = FullBody<bytes::Bytes>; pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>; @@ -153,22 +156,33 @@ pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> { pub fn empty_body<E>() -> BoxBody<E> { BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) } +pub fn string_bytes_body(s: String) -> BytesBody { + BytesBody::from(bytes::Bytes::from(s.into_bytes())) +} -pub async fn parse_json_body<T>(req: Request<IncomingBody>) -> Result<T, Error> +pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E> where T: for<'de> Deserialize<'de>, + B: Body, + E: From<<B as Body>::Error> + From<Error>, { let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } -pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, Error> { - let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; +pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E> +where + E: From<Error>, +{ + let resp_json = serde_json::to_string_pretty(res) + .map_err(GarageError::from) + .map_err(Error::from)?; Ok(Response::builder() .status(hyper::StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(string_body(resp_json))?) + .body(string_body(resp_json)) + .unwrap()) } pub fn is_default<T: Default + PartialEq>(v: &T) -> bool { diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 3a032aba..128742c4 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; -use hyper::{Body, Method, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -25,6 +25,9 @@ use crate::k2v::item::*; use crate::k2v::router::Endpoint; use crate::s3::cors::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody<Error>; + pub struct K2VApiServer { garage: Arc<Garage>, } @@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer { type Endpoint = K2VApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> { let (endpoint, bucket_name) = Endpoint::from_request(req)?; Ok(K2VApiEndpoint { @@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: K2VApiEndpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let K2VApiEndpoint { bucket_name, endpoint, @@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is procesed early, before we even check for an API key if let Endpoint::Options = endpoint { - return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + let options_res = handle_options_api(garage, &req, Some(bucket_name)) .await - .ok_or_bad_request("Error handling OPTIONS")?); + .ok_or_bad_request("Error handling OPTIONS")?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 294380ea..ae2778b1 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -13,15 +13,16 @@ use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; let mut items2 = vec![]; for it in items { @@ -41,15 +42,15 @@ pub async fn handle_insert_batch( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_read_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -139,9 +140,9 @@ async fn handle_read_batch_query( pub async fn handle_delete_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range( garage: Arc<Garage>, bucket_id: Uuid, partition_key: &str, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { use garage_model::k2v::sub::PollRange; - let query = parse_json_body::<PollRangeQuery>(req).await?; + let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; @@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 4eb017ab..72e712bf 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -1,13 +1,11 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; - -use garage_model::helper::error::Error as HelperError; +use hyper::{HeaderMap, StatusCode}; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::*; use crate::signature::error::Error as SignatureError; /// Errors of this crate @@ -30,10 +28,6 @@ pub enum Error { #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client asked for an invalid return format (invalid Accept header) #[error(display = "Not acceptable: {}", _0)] NotAcceptable(String), @@ -54,18 +48,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::Common(CommonError::BadRequest(format!("{}", e))), - } - } -} - impl From<SignatureError> for Error { fn from(err: SignatureError) -> Self { match err { @@ -74,7 +56,6 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -90,7 +71,6 @@ impl Error { Error::NotAcceptable(_) => "NotAcceptable", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InvalidBase64(_) => "InvalidBase64", - Error::InvalidHeader(_) => "InvalidHeaderValue", Error::InvalidUtf8Str(_) => "InvalidUtf8String", } } @@ -105,7 +85,6 @@ impl ApiError for Error { Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) - | Error::InvalidHeader(_) | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, } } @@ -115,14 +94,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -130,6 +109,7 @@ impl ApiError for Error { } "# .into() - })) + }); + string_bytes_body(error_str) } } diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 6c1d4a91..1baec1db 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use hyper::{Body, Response}; +use hyper::Response; use serde::Serialize; use garage_util::data::*; @@ -12,6 +12,7 @@ use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; +use crate::k2v::api_server::ResBody; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -23,7 +24,7 @@ pub async fn handle_read_index( end: Option<String>, limit: Option<u64>, reverse: Option<bool>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let reverse = reverse.unwrap_or(false); let ring: Arc<Ring> = garage.system.ring.borrow().clone(); @@ -68,7 +69,7 @@ pub async fn handle_read_index( next_start, }; - Ok(json_ok_response(&resp)?) + json_ok_response::<Error, _>(&resp) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 33f4da53..0c5931a1 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use http::header; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -11,6 +11,8 @@ use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; +use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; @@ -22,7 +24,7 @@ pub enum ReturnFormat { } impl ReturnFormat { - pub fn from(req: &Request<Body>) -> Result<Self, Error> { + pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> { let accept = match req.headers().get(header::ACCEPT) { Some(a) => a.to_str()?, None => return Ok(Self::Json), @@ -40,7 +42,7 @@ impl ReturnFormat { } } - pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { + pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> { let vals = item.values(); if vals.is_empty() { @@ -52,7 +54,7 @@ impl ReturnFormat { Self::Binary if vals.len() > 1 => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .status(StatusCode::CONFLICT) - .body(Body::empty())?), + .body(empty_body())?), Self::Binary => { assert!(vals.len() == 1); Self::make_binary_response(ct, vals[0]) @@ -62,22 +64,22 @@ impl ReturnFormat { } } - fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { + fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> { match v { DvvsValue::Deleted => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::NO_CONTENT) - .body(Body::empty())?), + .body(empty_body())?), DvvsValue::Value(v) => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::OK) - .body(Body::from(v.to_vec()))?), + .body(bytes_body(v.to_vec().into()))?), } } - fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> { let items = v .iter() .map(|v| match v { @@ -91,7 +93,7 @@ impl ReturnFormat { .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/json") .status(StatusCode::OK) - .body(Body::from(json_body))?) + .body(string_body(json_body))?) } } @@ -99,11 +101,11 @@ impl ReturnFormat { #[allow(clippy::ptr_arg)] pub async fn handle_read_item( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let format = ReturnFormat::from(req)?; let item = garage @@ -124,11 +126,11 @@ pub async fn handle_read_item( pub async fn handle_insert_item( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -137,7 +139,9 @@ pub async fn handle_insert_item( .map(CausalContext::parse_helper) .transpose()?; - let body = req.into_body().collect().await?.to_bytes(); + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); let value = DvvsValue::Value(body.to_vec()); @@ -155,16 +159,16 @@ pub async fn handle_insert_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_delete_item( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -189,20 +193,20 @@ pub async fn handle_delete_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let format = ReturnFormat::from(req)?; let causal_context = @@ -227,6 +231,6 @@ pub async fn handle_poll_item( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 7717fd49..495c5832 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -121,7 +121,8 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; + let options_res = handle_options_api(garage, &req, bucket_name).await?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 4b8754a9..e069cae4 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -14,6 +14,7 @@ use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::common_error::CommonError; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; @@ -94,11 +95,11 @@ pub async fn handle_put_cors( .body(empty_body())?) } -pub async fn handle_options_s3api( +pub async fn handle_options_api( garage: Arc<Garage>, req: &Request<IncomingBody>, bucket_name: Option<String>, -) -> Result<Response<ResBody>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -128,7 +129,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(empty_body())?) + .body(EmptyBody::new())?) } } else { // If there is no bucket name in the request, @@ -138,14 +139,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(empty_body())?) + .body(EmptyBody::new())?) } } pub fn handle_options_for_bucket( req: &Request<IncomingBody>, bucket: &Bucket, -) -> Result<Response<ResBody>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { let origin = req .headers() .get("Origin") @@ -168,13 +169,15 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(empty_body())?; + .body(EmptyBody::new())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } } - Err(Error::forbidden("This CORS request is not allowed.")) + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) } pub fn find_matching_cors_rule<'a>( @@ -216,7 +219,7 @@ where } pub fn add_cors_headers( - resp: &mut Response<ResBody>, + resp: &mut Response<impl Body>, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 8afe4954..a4d222de 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -4,8 +4,6 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use garage_model::helper::error::Error as HelperError; - use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; @@ -63,10 +61,6 @@ pub enum Error { #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), @@ -87,18 +81,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::bad_request(format!("{}", e)), - } - } -} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -119,7 +101,6 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -144,9 +125,7 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", - Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { - "InvalidRequest" - } + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", } } } @@ -166,8 +145,7 @@ impl ApiError for Error { | Error::EntityTooSmall | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) - | Error::InvalidUtf8String(_) - | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, } } @@ -207,6 +185,6 @@ impl ApiError for Error { "# .into() }); - BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) + string_bytes_body(error_str) } } diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs index f0d7c816..2d92a072 100644 --- a/src/api/signature/error.rs +++ b/src/api/signature/error.rs @@ -18,10 +18,6 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8Str(#[error(source)] std::str::Utf8Error), - - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), } impl<T> From<T> for Error diff --git a/src/web/web_server.rs b/src/web/web_server.rs index ce438469..668a897a 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -280,7 +280,9 @@ impl WebServer { ); let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), + Method::OPTIONS => handle_options_for_bucket(req, &bucket) + .map_err(ApiError::from) + .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await, _ => Err(ApiError::bad_request("HTTP method not supported")), |