diff options
Diffstat (limited to 'src/api')
53 files changed, 1965 insertions, 1220 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml new file mode 100644 index 00000000..adddf306 --- /dev/null +++ b/src/api/admin/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "garage_api_admin" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Admin API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true +garage_rpc.workspace = true +garage_api_common.workspace = true + +argon2.workspace = true +async-trait.workspace = true +err-derive.workspace = true +hex.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } + +[features] +metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 0e4565bb..6f0c474f 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; @@ -20,15 +19,15 @@ use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; -use crate::generic_server::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; -use crate::admin::bucket::*; -use crate::admin::cluster::*; -use crate::admin::error::*; -use crate::admin::key::*; -use crate::admin::router_v0; -use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::*; +use crate::bucket::*; +use crate::cluster::*; +use crate::error::*; +use crate::key::*; +use crate::router_v0; +use crate::router_v1::{Authorization, Endpoint}; pub type ResBody = BoxBody<Error>; @@ -221,7 +220,6 @@ impl AdminApiServer { } } -#[async_trait] impl ApiHandler for AdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index ac3cba00..2537bfc9 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -17,11 +17,12 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::admin::key::ApiBucketKeyPerm; -use crate::common_error::CommonError; -use crate::helpers::*; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::key::ApiBucketKeyPerm; pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let buckets = garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 357ac600..ffa0fa71 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -12,9 +12,10 @@ use garage_rpc::layout; use garage_model::garage::Garage; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::{json_ok_response, parse_json_body}; +use garage_api_common::helpers::{json_ok_response, parse_json_body}; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let layout = garage.system.cluster_layout(); diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 2668b42d..201f9b40 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -1,20 +1,24 @@ +use std::convert::TryFrom; + use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// The API access key does not exist @@ -29,17 +33,21 @@ pub enum Error { KeyAlreadyExists(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) +commonErrorDerivative!(Error); + +/// FIXME: helper errors are transformed into their corresponding variants +/// in the Error struct, but in many case a helper error should be considered +/// an internal error. +impl From<HelperError> for Error { + fn from(err: HelperError) -> Error { + match CommonError::try_from(err) { + Ok(ce) => Self::Common(ce), + Err(HelperError::NoSuchAccessKey(k)) => Self::NoSuchAccessKey(k), + Err(_) => unreachable!(), + } } } -impl CommonErrorDerivative for Error {} - impl Error { fn code(&self) -> &'static str { match self { diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 291b6d54..bebf3063 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -9,9 +9,10 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::*; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = garage diff --git a/src/api/admin/mod.rs b/src/api/admin/lib.rs index 43a8c59c..599e9b44 100644 --- a/src/api/admin/mod.rs +++ b/src/api/admin/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router_v0; diff --git a/src/api/admin/router_v0.rs b/src/api/admin/router_v0.rs index 68676445..9dd742ba 100644 --- a/src/api/admin/router_v0.rs +++ b/src/api/admin/router_v0.rs @@ -2,8 +2,9 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; router_match! {@func diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs index cc5ff2ec..0b4901ea 100644 --- a/src/api/admin/router_v1.rs +++ b/src/api/admin/router_v1.rs @@ -2,9 +2,10 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::admin::router_v0; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; +use crate::router_v0; pub enum Authorization { None, diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml new file mode 100644 index 00000000..c33d585d --- /dev/null +++ b/src/api/common/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "garage_api_common" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Common functions for the API server crates for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true + +base64.workspace = true +bytes.workspace = true +chrono.workspace = true +crc32fast.workspace = true +crc32c.workspace = true +crypto-common.workspace = true +err-derive.workspace = true +hex.workspace = true +hmac.workspace = true +md-5.workspace = true +idna.workspace = true +tracing.workspace = true +nom.workspace = true +pin-project.workspace = true +sha1.workspace = true +sha2.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +hyper-util.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/common_error.rs b/src/api/common/common_error.rs index c47555d4..597a3511 100644 --- a/src/api/common_error.rs +++ b/src/api/common/common_error.rs @@ -1,3 +1,5 @@ +use std::convert::TryFrom; + use err_derive::Error; use hyper::StatusCode; @@ -55,6 +57,35 @@ pub enum CommonError { InvalidBucketName(String), } +#[macro_export] +macro_rules! commonErrorDerivative { + ( $error_struct: ident ) => { + impl From<garage_util::error::Error> for $error_struct { + fn from(err: garage_util::error::Error) -> Self { + Self::Common(CommonError::InternalError(err)) + } + } + impl From<http::Error> for $error_struct { + fn from(err: http::Error) -> Self { + Self::Common(CommonError::Http(err)) + } + } + impl From<hyper::Error> for $error_struct { + fn from(err: hyper::Error) -> Self { + Self::Common(CommonError::Hyper(err)) + } + } + impl From<hyper::header::ToStrError> for $error_struct { + fn from(err: hyper::header::ToStrError) -> Self { + Self::Common(CommonError::InvalidHeader(err)) + } + } + impl CommonErrorDerivative for $error_struct {} + }; +} + +pub use commonErrorDerivative; + impl CommonError { pub fn http_status_code(&self) -> StatusCode { match self { @@ -97,18 +128,39 @@ impl CommonError { } } -impl From<HelperError> for CommonError { - fn from(err: HelperError) -> Self { +impl TryFrom<HelperError> for CommonError { + type Error = HelperError; + + fn try_from(err: HelperError) -> Result<Self, HelperError> { match err { - HelperError::Internal(i) => Self::InternalError(i), - HelperError::BadRequest(b) => Self::BadRequest(b), - HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n), - HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n), - e => Self::bad_request(format!("{}", e)), + HelperError::Internal(i) => Ok(Self::InternalError(i)), + HelperError::BadRequest(b) => Ok(Self::BadRequest(b)), + HelperError::InvalidBucketName(n) => Ok(Self::InvalidBucketName(n)), + HelperError::NoSuchBucket(n) => Ok(Self::NoSuchBucket(n)), + e => Err(e), } } } +/// This function converts HelperErrors into CommonErrors, +/// for variants that exist in CommonError. +/// This is used for helper functions that might return InvalidBucketName +/// or NoSuchBucket for instance, and we want to pass that error +/// up to our caller. +pub fn pass_helper_error(err: HelperError) -> CommonError { + match CommonError::try_from(err) { + Ok(e) => e, + Err(e) => panic!("Helper error `{}` should hot have happenned here", e), + } +} + +pub fn helper_error_as_internal(err: HelperError) -> CommonError { + match err { + HelperError::Internal(e) => CommonError::InternalError(e), + e => CommonError::InternalError(GarageError::Message(e.to_string())), + } +} + pub trait CommonErrorDerivative: From<CommonError> { fn internal_error<M: ToString>(msg: M) -> Self { Self::from(CommonError::InternalError(GarageError::Message( diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs new file mode 100644 index 00000000..09b55c13 --- /dev/null +++ b/src/api/common/cors.rs @@ -0,0 +1,170 @@ +use std::sync::Arc; + +use http::header::{ + ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, + ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, +}; +use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, StatusCode}; + +use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule}; +use garage_model::garage::Garage; + +use crate::common_error::{ + helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError, +}; +use crate::helpers::*; + +pub fn find_matching_cors_rule<'a, B>( + bucket_params: &'a BucketParams, + req: &Request<B>, +) -> Result<Option<&'a GarageCorsRule>, CommonError> { + if let Some(cors_config) = bucket_params.cors_config.get() { + if let Some(origin) = req.headers().get("Origin") { + let origin = origin.to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + return Ok(cors_config.iter().find(|rule| { + cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) + })); + } + } + Ok(None) +} + +pub fn cors_rule_matches<'a, HI, S>( + rule: &GarageCorsRule, + origin: &'a str, + method: &'a str, + mut request_headers: HI, +) -> bool +where + HI: Iterator<Item = S>, + S: AsRef<str>, +{ + rule.allow_origins.iter().any(|x| x == "*" || x == origin) + && rule.allow_methods.iter().any(|x| x == "*" || x == method) + && request_headers.all(|h| { + rule.allow_headers + .iter() + .any(|x| x == "*" || x == h.as_ref()) + }) +} + +pub fn add_cors_headers( + resp: &mut Response<impl Body>, + rule: &GarageCorsRule, +) -> Result<(), http::header::InvalidHeaderValue> { + let h = resp.headers_mut(); + h.insert( + ACCESS_CONTROL_ALLOW_ORIGIN, + rule.allow_origins.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_METHODS, + rule.allow_methods.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_HEADERS, + rule.allow_headers.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_EXPOSE_HEADERS, + rule.expose_headers.join(", ").parse()?, + ); + Ok(()) +} + +pub async fn handle_options_api( + garage: Arc<Garage>, + req: &Request<IncomingBody>, + bucket_name: Option<String>, +) -> Result<Response<EmptyBody>, CommonError> { + // FIXME: CORS rules of buckets with local aliases are + // not taken into account. + + // If the bucket name is a global bucket name, + // we try to apply the CORS rules of that bucket. + // If a user has a local bucket name that has + // the same name, its CORS rules won't be applied + // and will be shadowed by the rules of the globally + // existing bucket (but this is inevitable because + // OPTIONS calls are not auhtenticated). + if let Some(bn) = bucket_name { + let helper = garage.bucket_helper(); + let bucket_id = helper + .resolve_global_bucket_name(&bn) + .await + .map_err(helper_error_as_internal)?; + if let Some(id) = bucket_id { + let bucket = garage + .bucket_helper() + .get_existing_bucket(id) + .await + .map_err(helper_error_as_internal)?; + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) + } else { + // If there is a bucket name in the request, but that name + // does not correspond to a global alias for a bucket, + // then it's either a non-existing bucket or a local bucket. + // We have no way of knowing, because the request is not + // authenticated and thus we can't resolve local aliases. + // We take the permissive approach of allowing everything, + // because we don't want to prevent web apps that use + // local bucket names from making API calls. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "*") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } + } else { + // If there is no bucket name in the request, + // we are doing a ListBuckets call, which we want to allow + // for all origins. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } +} + +pub fn handle_options_for_bucket<B>( + req: &Request<B>, + bucket_params: &BucketParams, +) -> Result<Response<EmptyBody>, CommonError> { + let origin = req + .headers() + .get("Origin") + .ok_or_bad_request("Missing Origin header")? + .to_str()?; + let request_method = req + .headers() + .get(ACCESS_CONTROL_REQUEST_METHOD) + .ok_or_bad_request("Missing Access-Control-Request-Method header")? + .to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + + if let Some(cors_config) = bucket_params.cors_config.get() { + let matching_rule = cors_config + .iter() + .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); + if let Some(rule) = matching_rule { + let mut resp = Response::builder() + .status(StatusCode::OK) + .body(EmptyBody::new())?; + add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; + return Ok(resp); + } + } + + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) +} diff --git a/src/api/encoding.rs b/src/api/common/encoding.rs index e286a784..e286a784 100644 --- a/src/api/encoding.rs +++ b/src/api/common/encoding.rs diff --git a/src/api/generic_server.rs b/src/api/common/generic_server.rs index 283abdd4..6ddc2ff2 100644 --- a/src/api/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; - use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; @@ -36,7 +34,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, ErrorBody}; -pub(crate) trait ApiEndpoint: Send + Sync + 'static { +pub trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); } @@ -47,8 +45,7 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } -#[async_trait] -pub(crate) trait ApiHandler: Send + Sync + 'static { +pub trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; @@ -56,14 +53,14 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Error: ApiError; fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; - async fn handle( + fn handle( &self, req: Request<IncomingBody>, endpoint: Self::Endpoint, - ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; + ) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send; } -pub(crate) struct ApiServer<A: ApiHandler> { +pub struct ApiServer<A: ApiHandler> { region: String, api_handler: A, @@ -248,13 +245,11 @@ impl<A: ApiHandler> ApiServer<A> { // ==== helper functions ==== -#[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; + fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send; } -#[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { @@ -266,7 +261,6 @@ impl Accept for TcpListener { pub struct UnixListenerOn(pub UnixListener, pub String); -#[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { diff --git a/src/api/helpers.rs b/src/api/common/helpers.rs index cf60005d..c8586de4 100644 --- a/src/api/helpers.rs +++ b/src/api/common/helpers.rs @@ -363,9 +363,9 @@ mod tests { } #[derive(Serialize)] -pub(crate) struct CustomApiErrorBody { - pub(crate) code: String, - pub(crate) message: String, - pub(crate) region: String, - pub(crate) path: String, +pub struct CustomApiErrorBody { + pub code: String, + pub message: String, + pub region: String, + pub path: String, } diff --git a/src/api/common/lib.rs b/src/api/common/lib.rs new file mode 100644 index 00000000..0e655a53 --- /dev/null +++ b/src/api/common/lib.rs @@ -0,0 +1,12 @@ +//! Crate for serving a S3 compatible API +#[macro_use] +extern crate tracing; + +pub mod common_error; + +pub mod cors; +pub mod encoding; +pub mod generic_server; +pub mod helpers; +pub mod router_macros; +pub mod signature; diff --git a/src/api/router_macros.rs b/src/api/common/router_macros.rs index 8f10a4f5..d9fe86db 100644 --- a/src/api/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -1,5 +1,6 @@ /// This macro is used to generate very repetitive match {} blocks in this module /// It is _not_ made to be used anywhere else +#[macro_export] macro_rules! router_match { (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } @@ -133,6 +134,7 @@ macro_rules! router_match { /// This macro is used to generate part of the code in this module. It must be called only one, and /// is useless outside of this module. +#[macro_export] macro_rules! generateQueryParameters { ( keywords: [ $($kw_param:expr => $kw_name: ident),* ], @@ -220,5 +222,5 @@ macro_rules! generateQueryParameters { } } -pub(crate) use generateQueryParameters; -pub(crate) use router_match; +pub use generateQueryParameters; +pub use router_match; diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs new file mode 100644 index 00000000..96be0d5b --- /dev/null +++ b/src/api/common/signature/body.rs @@ -0,0 +1,135 @@ +use std::sync::Mutex; + +use futures::prelude::*; +use futures::stream::BoxStream; +use http_body_util::{BodyExt, StreamBody}; +use hyper::body::{Bytes, Frame}; +use serde::Deserialize; +use tokio::sync::mpsc; +use tokio::task; + +use super::*; + +use crate::signature::checksum::*; + +pub struct ReqBody { + // why need mutex to be sync?? + pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, + pub(crate) checksummer: Checksummer, + pub(crate) expected_checksums: ExpectedChecksums, + pub(crate) trailer_algorithm: Option<ChecksumAlgorithm>, +} + +pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>; + +impl ReqBody { + pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) { + if more.md5.is_some() { + self.expected_checksums.md5 = more.md5; + } + if more.sha256.is_some() { + self.expected_checksums.sha256 = more.sha256; + } + if more.extra.is_some() { + self.expected_checksums.extra = more.extra; + } + self.checksummer.add_expected(&self.expected_checksums); + } + + pub fn add_md5(&mut self) { + self.checksummer.add_md5(); + } + + // ============ non-streaming ============= + + pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { + let body = self.collect().await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) + } + + pub async fn collect(self) -> Result<Bytes, Error> { + self.collect_with_checksums().await.map(|(b, _)| b) + } + + pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> { + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes(); + + self.checksummer.update(&bytes); + let checksums = self.checksummer.finalize(); + checksums.verify(&self.expected_checksums)?; + + Ok((bytes, checksums)) + } + + // ============ streaming ============= + + pub fn streaming_with_checksums( + self, + ) -> ( + BoxStream<'static, Result<Bytes, Error>>, + StreamingChecksumReceiver, + ) { + let Self { + stream, + mut checksummer, + mut expected_checksums, + trailer_algorithm, + } = self; + + let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(5); + + let join_checksums = tokio::spawn(async move { + while let Some(frame) = frame_rx.recv().await { + match frame.into_data() { + Ok(data) => { + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&data); + checksummer + }) + .await + .unwrap() + } + Err(frame) => { + let trailers = frame.into_trailers().unwrap(); + let algo = trailer_algorithm.unwrap(); + expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?); + break; + } + } + } + + if trailer_algorithm.is_some() && expected_checksums.extra.is_none() { + return Err(Error::bad_request("trailing checksum was not sent")); + } + + let checksums = checksummer.finalize(); + checksums.verify(&expected_checksums)?; + + Ok(checksums) + }); + + let stream: BoxStream<_> = stream.into_inner().unwrap(); + let stream = stream.filter_map(move |x| { + let frame_tx = frame_tx.clone(); + async move { + match x { + Err(e) => Some(Err(e)), + Ok(frame) => { + if frame.is_data() { + let data = frame.data_ref().unwrap().clone(); + let _ = frame_tx.send(frame).await; + Some(Ok(data)) + } else { + let _ = frame_tx.send(frame).await; + None + } + } + } + } + }); + + (stream.boxed(), join_checksums) + } +} diff --git a/src/api/s3/checksum.rs b/src/api/common/signature/checksum.rs index c7527163..3c5e7c53 100644 --- a/src/api/s3/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -11,11 +11,12 @@ use sha2::Sha256; use http::{HeaderMap, HeaderName, HeaderValue}; use garage_util::data::*; -use garage_util::error::OkOrMessage; -use garage_model::s3::object_table::*; +use super::*; -use crate::s3::error::*; +pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; + +pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); @@ -31,8 +32,8 @@ pub type Md5Checksum = [u8; 16]; pub type Sha1Checksum = [u8; 20]; pub type Sha256Checksum = [u8; 32]; -#[derive(Debug, Default)] -pub(crate) struct ExpectedChecksums { +#[derive(Debug, Default, Clone)] +pub struct ExpectedChecksums { // base64-encoded md5 (content-md5 header) pub md5: Option<String>, // content_sha256 (as a Hash / FixedBytes32) @@ -41,7 +42,7 @@ pub(crate) struct ExpectedChecksums { pub extra: Option<ChecksumValue>, } -pub(crate) struct Checksummer { +pub struct Checksummer { pub crc32: Option<Crc32>, pub crc32c: Option<Crc32c>, pub md5: Option<Md5>, @@ -50,7 +51,7 @@ pub(crate) struct Checksummer { } #[derive(Default)] -pub(crate) struct Checksums { +pub struct Checksums { pub crc32: Option<Crc32Checksum>, pub crc32c: Option<Crc32cChecksum>, pub md5: Option<Md5Checksum>, @@ -59,34 +60,48 @@ pub(crate) struct Checksums { } impl Checksummer { - pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { - let mut ret = Self { + pub fn new() -> Self { + Self { crc32: None, crc32c: None, md5: None, sha1: None, sha256: None, - }; + } + } + + pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self { + let mut ret = Self::new(); + ret.add_expected(expected); + if add_md5 { + ret.add_md5(); + } + ret + } + + pub fn add_md5(&mut self) { + self.md5 = Some(Md5::new()); + } - if expected.md5.is_some() || require_md5 { - ret.md5 = Some(Md5::new()); + pub fn add_expected(&mut self, expected: &ExpectedChecksums) { + if expected.md5.is_some() { + self.md5 = Some(Md5::new()); } if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { - ret.sha256 = Some(Sha256::new()); + self.sha256 = Some(Sha256::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { - ret.crc32 = Some(Crc32::new()); + self.crc32 = Some(Crc32::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { - ret.crc32c = Some(Crc32c::default()); + self.crc32c = Some(Crc32c::default()); } if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { - ret.sha1 = Some(Sha1::new()); + self.sha1 = Some(Sha1::new()); } - ret } - pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { + pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { match algo { Some(ChecksumAlgorithm::Crc32) => { self.crc32 = Some(Crc32::new()); @@ -105,7 +120,7 @@ impl Checksummer { self } - pub(crate) fn update(&mut self, bytes: &[u8]) { + pub fn update(&mut self, bytes: &[u8]) { if let Some(crc32) = &mut self.crc32 { crc32.update(bytes); } @@ -123,7 +138,7 @@ impl Checksummer { } } - pub(crate) fn finalize(self) -> Checksums { + pub fn finalize(self) -> Checksums { Checksums { crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())), crc32c: self @@ -183,153 +198,56 @@ impl Checksums { // ---- -#[derive(Default)] -pub(crate) struct MultipartChecksummer { - pub md5: Md5, - pub extra: Option<MultipartExtraChecksummer>, -} - -pub(crate) enum MultipartExtraChecksummer { - Crc32(Crc32), - Crc32c(Crc32c), - Sha1(Sha1), - Sha256(Sha256), -} - -impl MultipartChecksummer { - pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { - Self { - md5: Md5::new(), - extra: match algo { - None => None, - Some(ChecksumAlgorithm::Crc32) => { - Some(MultipartExtraChecksummer::Crc32(Crc32::new())) - } - Some(ChecksumAlgorithm::Crc32c) => { - Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) - } - Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), - Some(ChecksumAlgorithm::Sha256) => { - Some(MultipartExtraChecksummer::Sha256(Sha256::new())) - } - }, - } - } - - pub(crate) fn update( - &mut self, - etag: &str, - checksum: Option<ChecksumValue>, - ) -> Result<(), Error> { - self.md5 - .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); - match (&mut self.extra, checksum) { - (None, _) => (), - ( - Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), - Some(ChecksumValue::Crc32(x)), - ) => { - crc32.update(&x); - } - ( - Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), - Some(ChecksumValue::Crc32c(x)), - ) => { - crc32c.write(&x); - } - (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { - sha1.update(&x); - } - ( - Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), - Some(ChecksumValue::Sha256(x)), - ) => { - sha256.update(&x); - } - (Some(_), b) => { - return Err(Error::internal_error(format!( - "part checksum was not computed correctly, got: {:?}", - b - ))) - } - } - Ok(()) - } - - pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { - let md5 = self.md5.finalize()[..].try_into().unwrap(); - let extra = match self.extra { - None => None, - Some(MultipartExtraChecksummer::Crc32(crc32)) => { - Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) - } - Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( - u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), - )), - Some(MultipartExtraChecksummer::Sha1(sha1)) => { - Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) - } - Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( - sha256.finalize()[..].try_into().unwrap(), - )), - }; - (md5, extra) +pub fn parse_checksum_algorithm(algo: &str) -> Result<ChecksumAlgorithm, Error> { + match algo { + "CRC32" => Ok(ChecksumAlgorithm::Crc32), + "CRC32C" => Ok(ChecksumAlgorithm::Crc32c), + "SHA1" => Ok(ChecksumAlgorithm::Sha1), + "SHA256" => Ok(ChecksumAlgorithm::Sha256), + _ => Err(Error::bad_request("invalid checksum algorithm")), } } -// ---- - /// Extract the value of the x-amz-checksum-algorithm header -pub(crate) fn request_checksum_algorithm( +pub fn request_checksum_algorithm( headers: &HeaderMap<HeaderValue>, ) -> Result<Option<ChecksumAlgorithm>, Error> { match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { None => Ok(None), - Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), - Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), - Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), - Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some), + } +} + +pub fn request_trailer_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? { + None => Ok(None), + Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)), _ => Err(Error::bad_request("invalid checksum algorithm")), } } /// Extract the value of any of the x-amz-checksum-* headers -pub(crate) fn request_checksum_value( +pub fn request_checksum_value( headers: &HeaderMap<HeaderValue>, ) -> Result<Option<ChecksumValue>, Error> { let mut ret = vec![]; - if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { - let crc32 = BASE64_STANDARD - .decode(&crc32_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - ret.push(ChecksumValue::Crc32(crc32)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?); } - if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { - let crc32c = BASE64_STANDARD - .decode(&crc32c_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - ret.push(ChecksumValue::Crc32c(crc32c)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?); } - if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { - let sha1 = BASE64_STANDARD - .decode(&sha1_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - ret.push(ChecksumValue::Sha1(sha1)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA1) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?); } - if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { - let sha256 = BASE64_STANDARD - .decode(&sha256_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - ret.push(ChecksumValue::Sha256(sha256)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA256) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?); } if ret.len() > 1 { @@ -342,48 +260,47 @@ pub(crate) fn request_checksum_value( /// Checks for the presence of x-amz-checksum-algorithm /// if so extract the corresponding x-amz-checksum-* value -pub(crate) fn request_checksum_algorithm_value( +pub fn extract_checksum_value( headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - Some(x) if x == "CRC32" => { + algo: ChecksumAlgorithm, +) -> Result<ChecksumValue, Error> { + match algo { + ChecksumAlgorithm::Crc32 => { let crc32 = headers .get(X_AMZ_CHECKSUM_CRC32) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - Ok(Some(ChecksumValue::Crc32(crc32))) + Ok(ChecksumValue::Crc32(crc32)) } - Some(x) if x == "CRC32C" => { + ChecksumAlgorithm::Crc32c => { let crc32c = headers .get(X_AMZ_CHECKSUM_CRC32C) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - Ok(Some(ChecksumValue::Crc32c(crc32c))) + Ok(ChecksumValue::Crc32c(crc32c)) } - Some(x) if x == "SHA1" => { + ChecksumAlgorithm::Sha1 => { let sha1 = headers .get(X_AMZ_CHECKSUM_SHA1) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - Ok(Some(ChecksumValue::Sha1(sha1))) + Ok(ChecksumValue::Sha1(sha1)) } - Some(x) if x == "SHA256" => { + ChecksumAlgorithm::Sha256 => { let sha256 = headers .get(X_AMZ_CHECKSUM_SHA256) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - Ok(Some(ChecksumValue::Sha256(sha256))) + Ok(ChecksumValue::Sha256(sha256)) } - Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), - None => Ok(None), } } -pub(crate) fn add_checksum_response_headers( +pub fn add_checksum_response_headers( checksum: &Option<ChecksumValue>, mut resp: http::response::Builder, ) -> http::response::Builder { diff --git a/src/api/signature/error.rs b/src/api/common/signature/error.rs index 2d92a072..b2f396b5 100644 --- a/src/api/signature/error.rs +++ b/src/api/common/signature/error.rs @@ -18,6 +18,10 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8Str(#[error(source)] std::str::Utf8Error), + + /// The provided digest (checksum) value was invalid + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), } impl<T> From<T> for Error diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs new file mode 100644 index 00000000..50fbd304 --- /dev/null +++ b/src/api/common/signature/mod.rs @@ -0,0 +1,118 @@ +use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; + +use hyper::header::HeaderName; +use hyper::{body::Incoming as IncomingBody, Request}; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; +use garage_util::data::{sha256sum, Hash}; + +use error::*; + +pub mod body; +pub mod checksum; +pub mod error; +pub mod payload; +pub mod streaming; + +pub const SHORT_DATE: &str = "%Y%m%d"; +pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; + +// ---- Constants used in AWSv4 signatures ---- + +pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm"); +pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential"); +pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date"); +pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires"); +pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders"); +pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature"); +pub const X_AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); +pub const X_AMZ_TRAILER: HeaderName = HeaderName::from_static("x-amz-trailer"); + +/// Result of `sha256("")` +pub(crate) const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +// Signature calculation algorithm +pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256"; +type HmacSha256 = Hmac<Sha256>; + +// Possible values for x-amz-content-sha256, in addition to the actual sha256 +pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; +pub const STREAMING_UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"; +pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; + +// Used in the computation of StringToSign +pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; + +// ---- enums to describe stuff going on in signature calculation ---- + +#[derive(Debug)] +pub enum ContentSha256Header { + UnsignedPayload, + Sha256Checksum(Hash), + StreamingPayload { trailer: bool, signed: bool }, +} + +// ---- top-level functions ---- + +pub struct VerifiedRequest { + pub request: Request<streaming::ReqBody>, + pub access_key: Key, + pub content_sha256_header: ContentSha256Header, +} + +pub async fn verify_request( + garage: &Garage, + mut req: Request<IncomingBody>, + service: &'static str, +) -> Result<VerifiedRequest, Error> { + let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?; + + let request = streaming::parse_streaming_body( + req, + &checked_signature, + &garage.config.s3_api.s3_region, + service, + )?; + + let access_key = checked_signature + .key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; + + Ok(VerifiedRequest { + request, + access_key, + content_sha256_header: checked_signature.content_sha256_header, + }) +} + +pub fn signing_hmac( + datetime: &DateTime<Utc>, + secret_key: &str, + region: &str, + service: &str, +) -> Result<HmacSha256, crypto_common::InvalidLength> { + let secret = String::from("AWS4") + secret_key; + let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; + Ok(hmac) +} + +pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String { + format!( + "{}/{}/{}/aws4_request", + datetime.format(SHORT_DATE), + region, + service + ) +} diff --git a/src/api/signature/payload.rs b/src/api/common/signature/payload.rs index 9e5a6043..2d5f8603 100644 --- a/src/api/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -13,23 +13,9 @@ use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; -use super::LONG_DATETIME; -use super::{compute_scope, signing_hmac}; +use super::*; use crate::encoding::uri_encode; -use crate::signature::error::*; - -pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm"); -pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential"); -pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date"); -pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires"); -pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders"); -pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature"); -pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); - -pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256"; -pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; -pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; pub type QueryMap = HeaderMap<QueryValue>; pub struct QueryValue { @@ -39,11 +25,18 @@ pub struct QueryValue { value: String, } +#[derive(Debug)] +pub struct CheckedSignature { + pub key: Option<Key>, + pub content_sha256_header: ContentSha256Header, + pub signature_header: Option<String>, +} + pub async fn check_payload_signature( garage: &Garage, request: &mut Request<IncomingBody>, service: &'static str, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let query = parse_query_map(request.uri())?; if query.contains_key(&X_AMZ_ALGORITHM) { @@ -57,17 +50,46 @@ pub async fn check_payload_signature( // Unsigned (anonymous) request let content_sha256 = request .headers() - .get("x-amz-content-sha256") - .filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes()); - if let Some(content_sha256) = content_sha256 { - let sha256 = hex::decode(content_sha256) - .ok() - .and_then(|bytes| Hash::try_from(&bytes)) - .ok_or_bad_request("Invalid content sha256 hash")?; - Ok((None, Some(sha256))) + .get(X_AMZ_CONTENT_SHA256) + .map(|x| x.to_str()) + .transpose()?; + Ok(CheckedSignature { + key: None, + content_sha256_header: parse_x_amz_content_sha256(content_sha256)?, + signature_header: None, + }) + } +} + +fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Header, Error> { + let header = match header { + Some(x) => x, + None => return Ok(ContentSha256Header::UnsignedPayload), + }; + if header == UNSIGNED_PAYLOAD { + Ok(ContentSha256Header::UnsignedPayload) + } else if let Some(rest) = header.strip_prefix("STREAMING-") { + let (trailer, algo) = if let Some(rest2) = rest.strip_suffix("-TRAILER") { + (true, rest2) } else { - Ok((None, None)) - } + (false, rest) + }; + let signed = match algo { + AWS4_HMAC_SHA256_PAYLOAD => true, + UNSIGNED_PAYLOAD => false, + _ => { + return Err(Error::bad_request( + "invalid or unsupported x-amz-content-sha256", + )) + } + }; + Ok(ContentSha256Header::StreamingPayload { trailer, signed }) + } else { + let sha256 = hex::decode(header) + .ok() + .and_then(|bytes| Hash::try_from(&bytes)) + .ok_or_bad_request("Invalid content sha256 hash")?; + Ok(ContentSha256Header::Sha256Checksum(sha256)) } } @@ -76,7 +98,7 @@ async fn check_standard_signature( service: &'static str, request: &Request<IncomingBody>, query: QueryMap, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let authorization = Authorization::parse_header(request.headers())?; // Verify that all necessary request headers are included in signed_headers @@ -108,18 +130,13 @@ async fn check_standard_signature( let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; - let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD { - None - } else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD { - let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; - Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) - } else { - let bytes = hex::decode(authorization.content_sha256) - .ok_or_bad_request("Invalid content sha256 hash")?; - Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) - }; + let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?; - Ok((Some(key), content_sha256)) + Ok(CheckedSignature { + key: Some(key), + content_sha256_header, + signature_header: Some(authorization.signature), + }) } async fn check_presigned_signature( @@ -127,7 +144,7 @@ async fn check_presigned_signature( service: &'static str, request: &mut Request<IncomingBody>, mut query: QueryMap, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap(); let authorization = Authorization::parse_presigned(&algorithm.value, &query)?; @@ -193,7 +210,11 @@ async fn check_presigned_signature( // Presigned URLs always use UNSIGNED-PAYLOAD, // so there is no sha256 hash to return. - Ok((Some(key), None)) + Ok(CheckedSignature { + key: Some(key), + content_sha256_header: ContentSha256Header::UnsignedPayload, + signature_header: Some(authorization.signature), + }) } pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> { @@ -442,7 +463,7 @@ impl Authorization { .to_string(); let content_sha256 = headers - .get(X_AMZ_CONTENT_SH256) + .get(X_AMZ_CONTENT_SHA256) .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; let date = headers @@ -518,7 +539,7 @@ impl Authorization { }) } - pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> { + pub fn parse_form(params: &HeaderMap) -> Result<Self, Error> { let algorithm = params .get(X_AMZ_ALGORITHM) .ok_or_bad_request("Missing X-Amz-Algorithm header")? diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs new file mode 100644 index 00000000..64362727 --- /dev/null +++ b/src/api/common/signature/streaming.rs @@ -0,0 +1,618 @@ +use std::pin::Pin; +use std::sync::Mutex; + +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use futures::prelude::*; +use futures::task; +use hmac::Mac; +use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING}; +use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; +use hyper::Request; + +use garage_util::data::Hash; + +use super::*; + +use crate::helpers::body_stream; +use crate::signature::checksum::*; +use crate::signature::payload::CheckedSignature; + +pub use crate::signature::body::ReqBody; + +pub fn parse_streaming_body( + mut req: Request<IncomingBody>, + checked_signature: &CheckedSignature, + region: &str, + service: &str, +) -> Result<Request<ReqBody>, Error> { + debug!( + "Content signature mode: {:?}", + checked_signature.content_sha256_header + ); + + match checked_signature.content_sha256_header { + ContentSha256Header::StreamingPayload { signed, trailer } => { + // Sanity checks + if !signed && !trailer { + return Err(Error::bad_request( + "STREAMING-UNSIGNED-PAYLOAD without trailer is not a valid combination", + )); + } + + // Remove the aws-chunked component in the content-encoding: header + // Note: this header is not properly sent by minio client, so don't fail + // if it is absent from the request. + if let Some(content_encoding) = req.headers_mut().remove(CONTENT_ENCODING) { + if let Some(rest) = content_encoding.as_bytes().strip_prefix(b"aws-chunked,") { + req.headers_mut() + .insert(CONTENT_ENCODING, HeaderValue::from_bytes(rest).unwrap()); + } else if content_encoding != "aws-chunked" { + return Err(Error::bad_request( + "content-encoding does not contain aws-chunked for STREAMING-*-PAYLOAD", + )); + } + } + + // If trailer header is announced, add the calculation of the requested checksum + let mut checksummer = Checksummer::init(&Default::default(), false); + let trailer_algorithm = if trailer { + let algo = Some( + request_trailer_checksum_algorithm(req.headers())? + .ok_or_bad_request("Missing x-amz-trailer header")?, + ); + checksummer = checksummer.add(algo); + algo + } else { + None + }; + + // For signed variants, determine signing parameters + let sign_params = if signed { + let signature = checked_signature + .signature_header + .clone() + .ok_or_bad_request("No signature provided")?; + let signature = hex::decode(signature) + .ok() + .and_then(|bytes| Hash::try_from(&bytes)) + .ok_or_bad_request("Invalid signature")?; + + let secret_key = checked_signature + .key + .as_ref() + .ok_or_bad_request("Cannot sign streaming payload without signing key")? + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key + .to_string(); + + let date = req + .headers() + .get(X_AMZ_DATE) + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = Utc.from_utc_datetime(&date); + + let scope = compute_scope(&date, region, service); + let signing_hmac = + crate::signature::signing_hmac(&date, &secret_key, region, service) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Some(SignParams { + datetime: date, + scope, + signing_hmac, + previous_signature: signature, + }) + } else { + None + }; + + Ok(req.map(move |body| { + let stream = body_stream::<_, Error>(body); + + let signed_payload_stream = + StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from); + ReqBody { + stream: Mutex::new(signed_payload_stream.boxed()), + checksummer, + expected_checksums: Default::default(), + trailer_algorithm, + } + })) + } + _ => Ok(req.map(|body| { + let expected_checksums = ExpectedChecksums { + sha256: match &checked_signature.content_sha256_header { + ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), + _ => None, + }, + ..Default::default() + }; + let checksummer = Checksummer::init(&expected_checksums, false); + + let stream = http_body_util::BodyStream::new(body).map_err(Error::from); + ReqBody { + stream: Mutex::new(stream.boxed()), + checksummer, + expected_checksums, + trailer_algorithm: None, + } + })), + } +} + +fn compute_streaming_payload_signature( + signing_hmac: &HmacSha256, + date: DateTime<Utc>, + scope: &str, + previous_signature: Hash, + content_sha256: Hash, +) -> Result<Hash, StreamingPayloadError> { + let string_to_sign = [ + AWS4_HMAC_SHA256_PAYLOAD, + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + EMPTY_STRING_HEX_DIGEST, + &hex::encode(content_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) +} + +fn compute_streaming_trailer_signature( + signing_hmac: &HmacSha256, + date: DateTime<Utc>, + scope: &str, + previous_signature: Hash, + trailer_sha256: Hash, +) -> Result<Hash, StreamingPayloadError> { + let string_to_sign = [ + AWS4_HMAC_SHA256_PAYLOAD, + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + &hex::encode(trailer_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) +} + +mod payload { + use http::{HeaderName, HeaderValue}; + + use garage_util::data::Hash; + + use nom::bytes::streaming::{tag, take_while}; + use nom::character::streaming::hex_digit1; + use nom::combinator::{map_res, opt}; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + pub enum Error<I> { + Parser(nom::error::Error<I>), + BadSignature, + } + + impl<I> Error<I> { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct ChunkHeader { + pub size: usize, + pub signature: Option<Hash>, + } + + impl ChunkHeader { + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = ChunkHeader { + size: size as usize, + signature: Some(signature), + }; + + Ok((input, header)) + } + + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = ChunkHeader { + size: size as usize, + signature: None, + }; + + Ok((input, header)) + } + } + + #[derive(Debug, Clone)] + pub struct TrailerChunk { + pub header_name: HeaderName, + pub header_value: HeaderValue, + pub signature: Option<Hash>, + } + + impl TrailerChunk { + fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, header_name) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'), + HeaderName::from_bytes + )(input)); + let (input, _) = try_parse!(tag(b":")(input)); + let (input, header_value) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)), + HeaderValue::from_bytes + )(input)); + + // Possible '\n' after the header value, depends on clients + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + let (input, _) = try_parse!(opt(tag(b"\n"))(input)); + + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok(( + input, + TrailerChunk { + header_name, + header_value, + signature: None, + }, + )) + } + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + + let (input, _) = try_parse!(tag(b"x-amz-trailer-signature:")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok(( + input, + TrailerChunk { + signature: Some(signature), + ..trailer + }, + )) + } + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok((input, trailer)) + } + } +} + +#[derive(Debug)] +pub enum StreamingPayloadError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl StreamingPayloadError { + fn message(msg: &str) -> Self { + StreamingPayloadError::Message(msg.into()) + } +} + +impl From<StreamingPayloadError> for Error { + fn from(err: StreamingPayloadError) -> Self { + match err { + StreamingPayloadError::Stream(e) => e, + StreamingPayloadError::InvalidSignature => { + Error::bad_request("Invalid payload signature") + } + StreamingPayloadError::Message(e) => { + Error::bad_request(format!("Chunk format error: {}", e)) + } + } + } +} + +impl<I> From<payload::Error<I>> for StreamingPayloadError { + fn from(err: payload::Error<I>) -> Self { + Self::message(err.description()) + } +} + +impl<I> From<nom::error::Error<I>> for StreamingPayloadError { + fn from(err: nom::error::Error<I>) -> Self { + Self::message(err.code.description()) + } +} + +enum StreamingPayloadChunk { + Chunk { + header: payload::ChunkHeader, + data: Bytes, + }, + Trailer(payload::TrailerChunk), +} + +struct SignParams { + datetime: DateTime<Utc>, + scope: String, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +#[pin_project::pin_project] +pub struct StreamingPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + signing: Option<SignParams>, + has_trailer: bool, + done: bool, +} + +impl<S> StreamingPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>>, +{ + fn new(stream: S, signing: Option<SignParams>, has_trailer: bool) -> Self { + Self { + stream, + buf: bytes::BytesMut::new(), + signing, + has_trailer, + done: false, + } + } + + fn parse_next( + input: &[u8], + is_signed: bool, + has_trailer: bool, + ) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = if is_signed { + try_parse!(payload::ChunkHeader::parse_signed(input)) + } else { + try_parse!(payload::ChunkHeader::parse_unsigned(input)) + }; + + // 0-sized chunk is the last + if header.size == 0 { + if has_trailer { + let (input, trailer) = if is_signed { + try_parse!(payload::TrailerChunk::parse_signed(input)) + } else { + try_parse!(payload::TrailerChunk::parse_unsigned(input)) + }; + return Ok((input, StreamingPayloadChunk::Trailer(trailer))); + } else { + return Ok(( + input, + StreamingPayloadChunk::Chunk { + header, + data: Bytes::new(), + }, + )); + } + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, StreamingPayloadChunk::Chunk { header, data })) + } +} + +impl<S> Stream for StreamingPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>> + Unpin, +{ + type Item = Result<Frame<Bytes>, StreamingPayloadError>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<Option<Self::Item>> { + use std::task::Poll; + + let mut this = self.project(); + + if *this.done { + return Poll::Ready(None); + } + + loop { + let (input, payload) = + match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(StreamingPayloadError::message( + "Unexpected EOF", + )))); + } + } + } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + match payload { + StreamingPayloadChunk::Chunk { data, header } => { + if let Some(signing) = this.signing.as_mut() { + let data_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_payload_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + data_sha256sum, + )?; + + if header.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); + } + + signing.previous_signature = header.signature.unwrap(); + } + + *this.buf = input.into(); + + // 0-sized chunk is the last + if data.is_empty() { + // if there was a trailer, it would have been returned by the parser + assert!(!*this.has_trailer); + *this.done = true; + return Poll::Ready(None); + } + + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + StreamingPayloadChunk::Trailer(trailer) => { + trace!( + "In StreamingPayloadStream::poll_next: got trailer {:?}", + trailer + ); + + if let Some(signing) = this.signing.as_mut() { + let data = [ + trailer.header_name.as_ref(), + &b":"[..], + trailer.header_value.as_ref(), + &b"\n"[..], + ] + .concat(); + let trailer_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_trailer_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + trailer_sha256sum, + )?; + + if trailer.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); + } + } + + *this.buf = input.into(); + *this.done = true; + + let mut trailers_map = HeaderMap::new(); + trailers_map.insert(trailer.header_name, trailer.header_value); + + return Poll::Ready(Some(Ok(Frame::trailers(trailers_map)))); + } + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} + +#[cfg(test)] +mod tests { + use futures::prelude::*; + + use super::{SignParams, StreamingPayloadError, StreamingPayloadStream}; + + #[tokio::test] + async fn test_interrupted_signed_payload_stream() { + use chrono::{DateTime, Utc}; + + use garage_util::data::Hash; + + let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 + .unwrap() + .with_timezone(&Utc); + let secret_key = "test"; + let region = "test"; + let scope = crate::signature::compute_scope(&datetime, region, "s3"); + let signing_hmac = + crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); + + let data: &[&[u8]] = &[b"1"]; + let body = futures::stream::iter(data.iter().map(|block| Ok(block.to_vec().into()))); + + let seed_signature = Hash::default(); + + let mut stream = StreamingPayloadStream::new( + body, + Some(SignParams { + signing_hmac, + datetime, + scope, + previous_signature: seed_signature, + }), + false, + ); + + assert!(stream.try_next().await.is_err()); + match stream.try_next().await { + Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {} + item => panic!( + "Unexpected result, expected early EOF error, got {:?}", + item + ), + } + } +} diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml new file mode 100644 index 00000000..e3ebedca --- /dev/null +++ b/src/api/k2v/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "garage_api_k2v" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "K2V API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model = { workspace = true, features = [ "k2v" ] } +garage_table.workspace = true +garage_util = { workspace = true, features = [ "k2v" ] } +garage_api_common.workspace = true + +base64.workspace = true +err-derive.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +percent-encoding.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index f2a3942e..de5775da 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use tokio::sync::watch; @@ -12,26 +10,25 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; -use crate::generic_server::*; -use crate::k2v::error::*; - -use crate::signature::verify_request; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; -use crate::helpers::*; -use crate::k2v::batch::*; -use crate::k2v::index::*; -use crate::k2v::item::*; -use crate::k2v::router::Endpoint; -use crate::s3::cors::*; +use crate::batch::*; +use crate::error::*; +use crate::index::*; +use crate::item::*; +use crate::router::Endpoint; -pub use crate::signature::streaming::ReqBody; +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct K2VApiServer { garage: Arc<Garage>, } -pub(crate) struct K2VApiEndpoint { +pub struct K2VApiEndpoint { bucket_name: String, endpoint: Endpoint, } @@ -49,7 +46,6 @@ impl K2VApiServer { } } -#[async_trait] impl ApiHandler for K2VApiServer { const API_NAME: &'static str = "k2v"; const API_NAME_DISPLAY: &'static str = "K2V"; @@ -85,16 +81,20 @@ impl ApiHandler for K2VApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?; + let verified_request = verify_request(&garage, req, "k2v").await?; + let req = verified_request.request; + let api_key = verified_request.access_key; let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) - .await?; + .await + .map_err(helper_error_as_internal)?; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 02b7ae8b..7a03d836 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize}; use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::item::parse_causality_token; +use crate::range::read_range; pub async fn handle_insert_batch( ctx: ReqCtx, @@ -19,11 +20,11 @@ pub async fn handle_insert_batch( let ReqCtx { garage, bucket_id, .. } = &ctx; - let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; + let items = req.into_body().json::<Vec<InsertBatchItem>>().await?; let mut items2 = vec![]; for it in items { - let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; + let ct = it.ct.map(|s| parse_causality_token(&s)).transpose()?; let v = match it.v { Some(vs) => DvvsValue::Value( BASE64_STANDARD @@ -46,7 +47,7 @@ pub async fn handle_read_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -140,7 +141,7 @@ pub async fn handle_delete_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -261,7 +262,7 @@ pub(crate) async fn handle_poll_range( } = ctx; use garage_model::k2v::sub::PollRange; - let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; + let query = req.into_body().json::<PollRangeQuery>().await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; @@ -281,7 +282,8 @@ pub(crate) async fn handle_poll_range( query.seen_marker, timeout_msec, ) - .await?; + .await + .map_err(pass_helper_error)?; if let Some((items, seen_marker)) = resp { let resp = PollRangeResponse { diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 16479227..257ff893 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -2,24 +2,31 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::signature::error::Error as SignatureError; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), + /// The provided digest (checksum) value was invalid + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), + /// The object requested don't exists #[error(display = "Key not found")] NoSuchKey, @@ -28,6 +35,10 @@ pub enum Error { #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), + /// Invalid causality token + #[error(display = "Invalid causality token")] + InvalidCausalityToken, + /// The client asked for an invalid return format (invalid Accept header) #[error(display = "Not acceptable: {}", _0)] NotAcceptable(String), @@ -37,16 +48,7 @@ pub enum Error { InvalidUtf8Str(#[error(source)] std::str::Utf8Error), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) - } -} - -impl CommonErrorDerivative for Error {} +commonErrorDerivative!(Error); impl From<SignatureError> for Error { fn from(err: SignatureError) -> Self { @@ -56,6 +58,7 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidDigest(d) => Self::InvalidDigest(d), } } } @@ -72,6 +75,8 @@ impl Error { Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InvalidBase64(_) => "InvalidBase64", Error::InvalidUtf8Str(_) => "InvalidUtf8String", + Error::InvalidCausalityToken => "CausalityToken", + Error::InvalidDigest(_) => "InvalidDigest", } } } @@ -85,7 +90,9 @@ impl ApiError for Error { Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) - | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8Str(_) + | Error::InvalidDigest(_) + | Error::InvalidCausalityToken => StatusCode::BAD_REQUEST, } } diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e3397238..fbfaad98 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,10 +5,11 @@ use garage_table::util::*; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; -use crate::helpers::*; -use crate::k2v::api_server::ResBody; -use crate::k2v::error::*; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::range::read_range; pub async fn handle_read_index( ctx: ReqCtx, diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index af3af4e4..0fb945d2 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -6,9 +6,10 @@ use hyper::{Request, Response, StatusCode}; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; @@ -18,6 +19,10 @@ pub enum ReturnFormat { Either, } +pub(crate) fn parse_causality_token(s: &str) -> Result<CausalContext, Error> { + CausalContext::parse(s).ok_or(Error::InvalidCausalityToken) +} + impl ReturnFormat { pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> { let accept = match req.headers().get(header::ACCEPT) { @@ -136,12 +141,10 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse_helper) + .map(parse_causality_token) .transpose()?; - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let body = req.into_body().collect().await?; let value = DvvsValue::Value(body.to_vec()); @@ -176,7 +179,7 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse_helper) + .map(parse_causality_token) .transpose()?; let value = DvvsValue::Deleted; diff --git a/src/api/k2v/mod.rs b/src/api/k2v/lib.rs index b6a8c5cf..334ae46b 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index bb9d3be5..eb4738db 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -7,8 +7,9 @@ use std::sync::Arc; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::helpers::key_after_prefix; -use crate::k2v::error::*; +use garage_api_common::helpers::key_after_prefix; + +use crate::error::*; /// Read range in a Garage table. /// Returns (entries, more?, nextStart) diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index 1cc58be5..a04b0f81 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -1,11 +1,11 @@ -use crate::k2v::error::*; +use crate::error::*; use std::borrow::Cow; use hyper::{Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; router_match! {@func diff --git a/src/api/lib.rs b/src/api/lib.rs deleted file mode 100644 index 370dfd7a..00000000 --- a/src/api/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Crate for serving a S3 compatible API -#[macro_use] -extern crate tracing; - -pub mod common_error; - -mod encoding; -pub mod generic_server; -pub mod helpers; -mod router_macros; -/// This mode is public only to help testing. Don't expect stability here -pub mod signature; - -pub mod admin; -#[cfg(feature = "k2v")] -pub mod k2v; -pub mod s3; diff --git a/src/api/Cargo.toml b/src/api/s3/Cargo.toml index 85b78a5b..387e45db 100644 --- a/src/api/Cargo.toml +++ b/src/api/s3/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "garage_api" +name = "garage_api_s3" version = "1.0.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" @@ -20,30 +20,24 @@ garage_block.workspace = true garage_net.workspace = true garage_util.workspace = true garage_rpc.workspace = true +garage_api_common.workspace = true aes-gcm.workspace = true -argon2.workspace = true async-compression.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true crc32fast.workspace = true crc32c.workspace = true -crypto-common.workspace = true err-derive.workspace = true hex.workspace = true -hmac.workspace = true -idna.workspace = true tracing.workspace = true md-5.workspace = true -nom.workspace = true pin-project.workspace = true sha1.workspace = true sha2.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true @@ -54,21 +48,13 @@ httpdate.workspace = true http-range.workspace = true http-body-util.workspace = true hyper = { workspace = true, default-features = false, features = ["server", "http1"] } -hyper-util.workspace = true multer.workspace = true percent-encoding.workspace = true roxmltree.workspace = true url.workspace = true serde.workspace = true -serde_bytes.workspace = true serde_json.workspace = true quick-xml.workspace = true opentelemetry.workspace = true -opentelemetry-prometheus = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } - -[features] -k2v = [ "garage_util/k2v", "garage_model/k2v" ] -metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 1737af33..e26c2b65 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::sync::watch; @@ -14,33 +12,33 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; use garage_model::key_table::Key; -use crate::generic_server::*; -use crate::s3::error::*; - -use crate::signature::verify_request; - -use crate::helpers::*; -use crate::s3::bucket::*; -use crate::s3::copy::*; -use crate::s3::cors::*; -use crate::s3::delete::*; -use crate::s3::get::*; -use crate::s3::lifecycle::*; -use crate::s3::list::*; -use crate::s3::multipart::*; -use crate::s3::post_object::handle_post_object; -use crate::s3::put::*; -use crate::s3::router::Endpoint; -use crate::s3::website::*; - -pub use crate::signature::streaming::ReqBody; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; + +use crate::bucket::*; +use crate::copy::*; +use crate::cors::*; +use crate::delete::*; +use crate::error::*; +use crate::get::*; +use crate::lifecycle::*; +use crate::list::*; +use crate::multipart::*; +use crate::post_object::handle_post_object; +use crate::put::*; +use crate::router::Endpoint; +use crate::website::*; + +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct S3ApiServer { garage: Arc<Garage>, } -pub(crate) struct S3ApiEndpoint { +pub struct S3ApiEndpoint { bucket_name: Option<String>, endpoint: Endpoint, } @@ -70,7 +68,6 @@ impl S3ApiServer { } } -#[async_trait] impl ApiHandler for S3ApiServer { const API_NAME: &'static str = "s3"; const API_NAME_DISPLAY: &'static str = "S3"; @@ -124,7 +121,9 @@ impl ApiHandler for S3ApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?; + let verified_request = verify_request(&garage, req, "s3").await?; + let req = verified_request.request; + let api_key = verified_request.access_key; let bucket_name = match bucket_name { None => { @@ -137,20 +136,14 @@ impl ApiHandler for S3ApiServer { // Special code path for CreateBucket API endpoint if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket( - &garage, - req, - content_sha256, - &api_key.key_id, - bucket_name, - ) - .await; + return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) @@ -181,7 +174,7 @@ impl ApiHandler for S3ApiServer { let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(ctx, &req, &key, part_number).await, + } => handle_head(ctx, &req.map(|_| ()), &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -201,20 +194,20 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(ctx, &req, &key, part_number, overrides).await + handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + } => handle_put_part(ctx, req, &key, part_number, &upload_id).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, - Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key).await, Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(ctx, &key, &upload_id).await } @@ -223,7 +216,7 @@ impl ApiHandler for S3ApiServer { handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await + handle_complete_multipart_upload(ctx, req, &key, &upload_id).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { @@ -319,7 +312,6 @@ impl ApiHandler for S3ApiServer { } => { let query = ListPartsQuery { bucket_name: ctx.bucket_name.clone(), - bucket_id, key, upload_id, part_number_marker: part_number_marker.map(|p| p.min(10000)), @@ -327,17 +319,15 @@ impl ApiHandler for S3ApiServer { }; handle_list_parts(ctx, req, &query).await } - Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, - Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await, Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, - Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await, Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, - Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(ctx, req, content_sha256).await - } + Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await, Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 6a12aa9c..3a09e769 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; @@ -10,15 +9,14 @@ use garage_model::key_table::Key; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; -use garage_util::data::*; use garage_util::time::*; -use crate::common_error::CommonError; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml as s3_xml; pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = ctx; @@ -121,15 +119,10 @@ pub async fn handle_list_buckets( pub async fn handle_create_bucket( garage: &Garage, req: Request<ReqBody>, - content_sha256: Option<Hash>, api_key_id: &String, bucket_name: String, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index e375a714..ff8019e6 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -20,15 +20,16 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::get::full_object_byte_stream; -use crate::s3::multipart; -use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; -use crate::s3::xml::{self as s3_xml, xmlns_tag}; +use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::get::full_object_byte_stream; +use crate::multipart; +use crate::put::{extract_metadata_headers, save_stream, ChecksumMode, SaveStreamResult}; +use crate::xml::{self as s3_xml, xmlns_tag}; // -------- CopyObject --------- @@ -72,7 +73,7 @@ pub async fn handle_copy( let dest_object_meta = ObjectVersionMetaInner { headers: match req.headers().get("x-amz-metadata-directive") { Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => { - get_headers(req.headers())? + extract_metadata_headers(req.headers())? } _ => source_object_meta_inner.into_owned().headers, }, @@ -655,7 +656,8 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, let source_bucket_id = garage .bucket_helper() .resolve_bucket(&source_bucket.to_string(), api_key) - .await?; + .await + .map_err(pass_helper_error)?; if !api_key.allow_read(&source_bucket_id) { return Err(Error::forbidden(format!( @@ -861,7 +863,7 @@ pub struct CopyPartResult { #[cfg(test)] mod tests { use super::*; - use crate::s3::xml::to_xml_with_header; + use crate::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 173b7ffe..fcfdb934 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -1,29 +1,16 @@ use quick_xml::de::from_reader; -use std::sync::Arc; -use http::header::{ - ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, - ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, -}; -use hyper::{ - body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, - StatusCode, -}; - -use http_body_util::BodyExt; +use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::common_error::CommonError; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; +use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; + +use garage_api_common::helpers::*; -use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; -use garage_model::garage::Garage; -use garage_util::data::*; +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -68,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> pub async fn handle_put_cors( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -77,11 +63,7 @@ pub async fn handle_put_cors( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; @@ -99,154 +81,6 @@ pub async fn handle_put_cors( .body(empty_body())?) } -pub async fn handle_options_api( - garage: Arc<Garage>, - req: &Request<IncomingBody>, - bucket_name: Option<String>, -) -> Result<Response<EmptyBody>, CommonError> { - // FIXME: CORS rules of buckets with local aliases are - // not taken into account. - - // If the bucket name is a global bucket name, - // we try to apply the CORS rules of that bucket. - // If a user has a local bucket name that has - // the same name, its CORS rules won't be applied - // and will be shadowed by the rules of the globally - // existing bucket (but this is inevitable because - // OPTIONS calls are not auhtenticated). - if let Some(bn) = bucket_name { - let helper = garage.bucket_helper(); - let bucket_id = helper.resolve_global_bucket_name(&bn).await?; - if let Some(id) = bucket_id { - let bucket = garage.bucket_helper().get_existing_bucket(id).await?; - let bucket_params = bucket.state.into_option().unwrap(); - handle_options_for_bucket(req, &bucket_params) - } else { - // If there is a bucket name in the request, but that name - // does not correspond to a global alias for a bucket, - // then it's either a non-existing bucket or a local bucket. - // We have no way of knowing, because the request is not - // authenticated and thus we can't resolve local aliases. - // We take the permissive approach of allowing everything, - // because we don't want to prevent web apps that use - // local bucket names from making API calls. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "*") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } - } else { - // If there is no bucket name in the request, - // we are doing a ListBuckets call, which we want to allow - // for all origins. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } -} - -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, - bucket_params: &BucketParams, -) -> Result<Response<EmptyBody>, CommonError> { - let origin = req - .headers() - .get("Origin") - .ok_or_bad_request("Missing Origin header")? - .to_str()?; - let request_method = req - .headers() - .get(ACCESS_CONTROL_REQUEST_METHOD) - .ok_or_bad_request("Missing Access-Control-Request-Method header")? - .to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - - if let Some(cors_config) = bucket_params.cors_config.get() { - let matching_rule = cors_config - .iter() - .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); - if let Some(rule) = matching_rule { - let mut resp = Response::builder() - .status(StatusCode::OK) - .body(EmptyBody::new())?; - add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; - return Ok(resp); - } - } - - Err(CommonError::Forbidden( - "This CORS request is not allowed.".into(), - )) -} - -pub fn find_matching_cors_rule<'a>( - bucket_params: &'a BucketParams, - req: &Request<impl Body>, -) -> Result<Option<&'a GarageCorsRule>, Error> { - if let Some(cors_config) = bucket_params.cors_config.get() { - if let Some(origin) = req.headers().get("Origin") { - let origin = origin.to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - return Ok(cors_config.iter().find(|rule| { - cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) - })); - } - } - Ok(None) -} - -fn cors_rule_matches<'a, HI, S>( - rule: &GarageCorsRule, - origin: &'a str, - method: &'a str, - mut request_headers: HI, -) -> bool -where - HI: Iterator<Item = S>, - S: AsRef<str>, -{ - rule.allow_origins.iter().any(|x| x == "*" || x == origin) - && rule.allow_methods.iter().any(|x| x == "*" || x == method) - && request_headers.all(|h| { - rule.allow_headers - .iter() - .any(|x| x == "*" || x == h.as_ref()) - }) -} - -pub fn add_cors_headers( - resp: &mut Response<impl Body>, - rule: &GarageCorsRule, -) -> Result<(), http::header::InvalidHeaderValue> { - let h = resp.headers_mut(); - h.insert( - ACCESS_CONTROL_ALLOW_ORIGIN, - rule.allow_origins.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_METHODS, - rule.allow_methods.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_HEADERS, - rule.allow_headers.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_EXPOSE_HEADERS, - rule.expose_headers.join(", ").parse()?, - ); - Ok(()) -} - // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 57f6f948..d785b9d8 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,16 +1,15 @@ -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::put::next_timestamp; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::put::next_timestamp; +use crate::xml as s3_xml; async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { let ReqCtx { @@ -67,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, pub async fn handle_delete_objects( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 2e6ed65c..fa7285ca 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -28,9 +28,10 @@ use garage_util::migrate::Migrate; use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; -use crate::common_error::*; -use crate::s3::checksum::Md5Checksum; -use crate::s3::error::Error; +use garage_api_common::common_error::*; +use garage_api_common::signature::checksum::Md5Checksum; + +use crate::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm"); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 2855e0b3..6d4b7a11 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -4,19 +4,30 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::s3::xml as s3_xml; -use crate::signature::error::Error as SignatureError; +use garage_model::helper::error::Error as HelperError; + +pub(crate) use garage_api_common::common_error::pass_helper_error; + +use garage_api_common::common_error::{ + commonErrorDerivative, helper_error_as_internal, CommonError, +}; + +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; + +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; + +use crate::xml as s3_xml; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed @@ -69,7 +80,7 @@ pub enum Error { #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)] InvalidEncryptionAlgorithm(String), - /// The client sent invalid XML data + /// The provided digest (checksum) value was invalid #[error(display = "Invalid digest: {}", _0)] InvalidDigest(String), @@ -78,17 +89,16 @@ pub enum Error { NotImplemented(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) +commonErrorDerivative!(Error); + +// Helper errors are always passed as internal errors by default. +// To pass the specific error code back to the client, use `pass_helper_error`. +impl From<HelperError> for Error { + fn from(err: HelperError) -> Error { + Error::Common(helper_error_as_internal(err)) } } -impl CommonErrorDerivative for Error {} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -109,6 +119,7 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidDigest(d) => Self::InvalidDigest(d), } } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index f5d3cf11..bcb72cc3 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -12,7 +12,7 @@ use http::header::{ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{body::Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; @@ -25,11 +25,12 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; + +use crate::api_server::ResBody; +use crate::encryption::EncryptionParams; +use crate::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -68,14 +69,11 @@ fn object_headers( // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html let mut headers_by_name = BTreeMap::new(); for (name, value) in meta_inner.headers.iter() { - match headers_by_name.get_mut(name) { - None => { - headers_by_name.insert(name, vec![value.as_str()]); - } - Some(headers) => { - headers.push(value.as_str()); - } - } + let name_lower = name.to_ascii_lowercase(); + headers_by_name + .entry(name_lower) + .or_insert(vec![]) + .push(value.as_str()); } for (name, values) in headers_by_name { @@ -120,7 +118,7 @@ fn getobject_override_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<impl Body>, + req: &Request<()>, ) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes @@ -159,7 +157,7 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, ) -> Result<Response<ResBody>, Error> { @@ -169,7 +167,7 @@ pub async fn handle_head( /// Handle HEAD request for website pub async fn handle_head_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -280,7 +278,7 @@ pub async fn handle_head_without_ctx( /// Handle GET request pub async fn handle_get( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, overrides: GetObjectOverrides, @@ -291,7 +289,7 @@ pub async fn handle_get( /// Handle GET request pub async fn handle_get_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -342,7 +340,12 @@ pub async fn handle_get_without_ctx( enc, &headers, pn, - checksum_mode, + ChecksumMode { + // TODO: for multipart uploads, checksums of each part should be stored + // so that we can return the corresponding checksum here + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -356,7 +359,12 @@ pub async fn handle_get_without_ctx( &headers, range.start, range.start + range.length, - checksum_mode, + ChecksumMode { + // TODO: for range queries that align with part boundaries, + // we should return the saved checksum of the part + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -579,7 +587,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<impl Body>, + req: &Request<()>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -620,7 +628,7 @@ struct ChecksumMode { enabled: bool, } -fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { +fn checksum_mode(req: &Request<()>) -> ChecksumMode { ChecksumMode { enabled: req .headers() diff --git a/src/api/s3/mod.rs b/src/api/s3/lib.rs index b9bb1a6f..83f684f8 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; pub mod error; @@ -11,9 +14,8 @@ mod list; mod multipart; mod post_object; mod put; -mod website; +pub mod website; -mod checksum; mod encryption; mod router; pub mod xml; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 7eb1c2cb..c140494e 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,21 +1,19 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_util::data::*; pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -55,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E pub async fn handle_put_lifecycle( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -64,11 +61,7 @@ pub async fn handle_put_lifecycle( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 68d6cbe6..94c2c895 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -13,13 +13,14 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; -use crate::encoding::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::multipart as s3_multipart; -use crate::s3::xml as s3_xml; +use garage_api_common::encoding::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::multipart as s3_multipart; +use crate::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; const DUMMY_KEY: &str = "GKDummyKey"; @@ -53,7 +54,6 @@ pub struct ListMultipartUploadsQuery { #[derive(Debug)] pub struct ListPartsQuery { pub bucket_name: String, - pub bucket_id: Uuid, pub key: String, pub upload_id: String, pub part_number_marker: Option<u64>, @@ -1244,10 +1244,8 @@ mod tests { #[test] fn test_fetch_part_info() -> Result<(), Error> { - let uuid = Uuid::from([0x08; 32]); let mut query = ListPartsQuery { bucket_name: "a".to_string(), - bucket_id: uuid, key: "a".to_string(), upload_id: "xx".to_string(), part_number_marker: None, diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 3db3e8aa..d6eb26cb 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,13 +1,20 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; use std::sync::Arc; use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; use futures::prelude::*; use hyper::{Request, Response}; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; use garage_table::*; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; @@ -15,14 +22,14 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::*; +use crate::xml as s3_xml; // ---- @@ -42,7 +49,7 @@ pub async fn handle_create_multipart_upload( let upload_id = gen_uuid(); let timestamp = next_timestamp(existing_object.as_ref()); - let headers = get_headers(req.headers())?; + let headers = extract_metadata_headers(req.headers())?; let meta = ObjectVersionMetaInner { headers, checksum: None, @@ -93,7 +100,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = &ctx; @@ -104,17 +110,30 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; - // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); - let stream = body_stream(req_body); + let (req_head, mut req_body) = req.into_parts(); + + // Before we stream the body, configure the needed checksums. + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as an etag of the + // part, which is in turn used in the etag computation of the whole object + req_body.add_md5(); + } + + let (stream, stream_checksums) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); + // Read first chuck, and at the same time try to get object to see if it exists let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; @@ -171,21 +190,21 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; - // Verify that checksums map - checksums.verify(&expected_checksums)?; + // Verify that checksums match + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); @@ -247,7 +266,6 @@ pub async fn handle_complete_multipart_upload( req: Request<ReqBody>, key: &str, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -259,11 +277,7 @@ pub async fn handle_complete_multipart_upload( let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req_body.collect().await?; let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) @@ -429,7 +443,16 @@ pub async fn handle_complete_multipart_upload( // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { xmlns: (), - location: None, + // FIXME: the location returned is not always correct: + // - we always return https, but maybe some people do http + // - if root_domain is not specified, a full URL is not returned + location: garage + .config + .s3_api + .root_domain + .as_ref() + .map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key))) + .or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))), bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), @@ -592,3 +615,99 @@ fn parse_complete_multipart_upload_body( Some(parts) } + +// ====== checksummer ==== + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 725f3847..b9bccae6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -16,15 +16,16 @@ use serde::Deserialize; use garage_model::garage::Garage; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::*; -use crate::s3::cors::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::{get_headers, save_stream, ChecksumMode}; -use crate::s3::xml as s3_xml; -use crate::signature::payload::{verify_v4, Authorization}; +use garage_api_common::cors::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; +use garage_api_common::signature::payload::{verify_v4, Authorization}; + +use crate::api_server::ResBody; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::{extract_metadata_headers, save_stream, ChecksumMode}; +use crate::xml as s3_xml; pub async fn handle_post_object( garage: Arc<Garage>, @@ -107,7 +108,8 @@ pub async fn handle_post_object( let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; if !api_key.allow_write(&bucket_id) { return Err(Error::forbidden("Operation is not allowed for this key.")); @@ -214,8 +216,9 @@ pub async fn handle_post_object( // if we ever start supporting ACLs, we likely want to map "acl" to x-amz-acl" somewhere // around here to make sure the rest of the machinery takes our acl into account. - let headers = get_headers(¶ms)?; + let headers = extract_metadata_headers(¶ms)?; + let checksum_algorithm = request_checksum_algorithm(¶ms)?; let expected_checksums = ExpectedChecksums { md5: params .get("content-md5") @@ -223,7 +226,9 @@ pub async fn handle_post_object( .transpose()? .map(str::to_string), sha256: None, - extra: request_checksum_algorithm_value(¶ms)?, + extra: checksum_algorithm + .map(|algo| extract_checksum_value(¶ms, algo)) + .transpose()?, }; let meta = ObjectVersionMetaInner { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 1e3b1b44..830a7998 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -30,11 +30,14 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; +use garage_api_common::signature::checksum::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::website::X_AMZ_WEBSITE_REDIRECT_LOCATION; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; @@ -47,6 +50,10 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom { + checksummer: StreamingChecksumReceiver, + trailer_algo: Option<ChecksumAlgorithm>, + }, Calculate(Option<ChecksumAlgorithm>), } @@ -54,10 +61,9 @@ pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, key: &String, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request - let headers = get_headers(req.headers())?; + let headers = extract_metadata_headers(req.headers())?; debug!("Object headers: {:?}", headers); let expected_checksums = ExpectedChecksums { @@ -65,9 +71,10 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; + let trailer_checksum_algorithm = request_trailer_checksum_algorithm(req.headers())?; let meta = ObjectVersionMetaInner { headers, @@ -77,7 +84,19 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let stream = body_stream(req.into_body()); + // The request body is a special ReqBody object (see garage_api_common::signature::body) + // which supports calculating checksums while streaming the data. + // Before we start streaming, we configure it to calculate all the checksums we need. + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as the object etag + req_body.add_md5(); + } + + let (stream, checksummer) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); let res = save_stream( &ctx, @@ -85,7 +104,10 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo: trailer_checksum_algorithm, + }, ) .await?; @@ -121,10 +143,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom { .. } => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -132,7 +159,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -141,6 +168,18 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; let size = first_block.len() as u64; @@ -212,13 +251,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -231,6 +270,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; // Verify quotas are respsected @@ -331,7 +381,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker<S>, + mut chunker: StreamChunker<S>, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); @@ -600,7 +650,9 @@ impl Drop for InterruptedCleanup { // ============ helpers ============ -pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> { +pub(crate) fn extract_metadata_headers( + headers: &HeaderMap<HeaderValue>, +) -> Result<HeaderList, Error> { let mut ret = Vec::new(); // Preserve standard headers @@ -622,10 +674,22 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList for (name, value) in headers.iter() { if name.as_str().starts_with("x-amz-meta-") { ret.push(( - name.to_string(), + name.as_str().to_ascii_lowercase(), std::str::from_utf8(value.as_bytes())?.to_string(), )); } + if name == X_AMZ_WEBSITE_REDIRECT_LOCATION { + let value = std::str::from_utf8(value.as_bytes())?.to_string(); + if !(value.starts_with("/") + || value.starts_with("http://") + || value.starts_with("https://")) + { + return Err(Error::bad_request(format!( + "Invalid {X_AMZ_WEBSITE_REDIRECT_LOCATION} header", + ))); + } + ret.push((X_AMZ_WEBSITE_REDIRECT_LOCATION.to_string(), value)); + } } Ok(ret) diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index e7ac1d77..e3f58490 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -3,9 +3,10 @@ use std::borrow::Cow; use hyper::header::HeaderValue; use hyper::{HeaderMap, Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; -use crate::s3::error::*; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; + +use crate::error::*; router_match! {@func @@ -351,6 +352,18 @@ impl Endpoint { _ => return Err(Error::bad_request("Unknown method")), }; + if let Some(x_id) = query.x_id.take() { + if x_id != res.name() { + // I think AWS ignores the x-id parameter. + // Let's make this at least be a warnin to help debugging. + warn!( + "x-id ({}) does not match parsed endpoint ({})", + x_id, + res.name() + ); + } + } + if let Some(message) = query.nonempty_message() { debug!("Unused query parameter: {}", message) } @@ -695,7 +708,8 @@ generateQueryParameters! { "uploadId" => upload_id, "upload-id-marker" => upload_id_marker, "versionId" => version_id, - "version-id-marker" => version_id_marker + "version-id-marker" => version_id_marker, + "x-id" => x_id ] } diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index fa36bc32..03cc01d8 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,17 +1,18 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; -use hyper::{Request, Response, StatusCode}; +use hyper::{header::HeaderName, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; - use garage_model::bucket_table::*; -use garage_util::data::*; + +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; + +pub const X_AMZ_WEBSITE_REDIRECT_LOCATION: HeaderName = + HeaderName::from_static("x-amz-website-redirect-location"); pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -60,7 +61,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err pub async fn handle_put_website( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -69,11 +69,7 @@ pub async fn handle_put_website( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 1e569ade..e8af3ec0 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -1,7 +1,7 @@ use quick_xml::se::to_string; use serde::{Deserialize, Serialize, Serializer}; -use crate::s3::error::Error as ApiError; +use crate::error::Error as ApiError; pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> { let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string(); diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs deleted file mode 100644 index 6514da43..00000000 --- a/src/api/signature/mod.rs +++ /dev/null @@ -1,78 +0,0 @@ -use chrono::{DateTime, Utc}; -use hmac::{Hmac, Mac}; -use sha2::Sha256; - -use hyper::{body::Incoming as IncomingBody, Request}; - -use garage_model::garage::Garage; -use garage_model::key_table::Key; -use garage_util::data::{sha256sum, Hash}; - -use error::*; - -pub mod error; -pub mod payload; -pub mod streaming; - -pub const SHORT_DATE: &str = "%Y%m%d"; -pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; - -type HmacSha256 = Hmac<Sha256>; - -pub async fn verify_request( - garage: &Garage, - mut req: Request<IncomingBody>, - service: &'static str, -) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> { - let (api_key, mut content_sha256) = - payload::check_payload_signature(&garage, &mut req, service).await?; - let api_key = - api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = streaming::parse_streaming_body( - &api_key, - req, - &mut content_sha256, - &garage.config.s3_api.s3_region, - service, - )?; - - Ok((req, api_key, content_sha256)) -} - -pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { - if expected_sha256 != sha256sum(body) { - return Err(Error::bad_request( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} - -pub fn signing_hmac( - datetime: &DateTime<Utc>, - secret_key: &str, - region: &str, - service: &str, -) -> Result<HmacSha256, crypto_common::InvalidLength> { - let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; - date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; - region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; - service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; - signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; - Ok(hmac) -} - -pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String { - format!( - "{}/{}/{}/aws4_request", - datetime.format(SHORT_DATE), - region, - service - ) -} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs deleted file mode 100644 index e223d1b1..00000000 --- a/src/api/signature/streaming.rs +++ /dev/null @@ -1,373 +0,0 @@ -use std::pin::Pin; - -use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; -use futures::prelude::*; -use futures::task; -use garage_model::key_table::Key; -use hmac::Mac; -use http_body_util::StreamBody; -use hyper::body::{Bytes, Incoming as IncomingBody}; -use hyper::Request; - -use garage_util::data::Hash; - -use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; - -use crate::helpers::*; -use crate::signature::error::*; -use crate::signature::payload::{ - STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE, -}; - -pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; - -pub type ReqBody = BoxBody<Error>; - -pub fn parse_streaming_body( - api_key: &Key, - req: Request<IncomingBody>, - content_sha256: &mut Option<Hash>, - region: &str, - service: &str, -) -> Result<Request<ReqBody>, Error> { - match req.headers().get(X_AMZ_CONTENT_SH256) { - Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => { - let signature = content_sha256 - .take() - .ok_or_bad_request("No signature provided")?; - - let secret_key = &api_key - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key; - - let date = req - .headers() - .get(X_AMZ_DATE) - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime<Utc> = Utc.from_utc_datetime(&date); - - let scope = compute_scope(&date, region, service); - let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) - .ok_or_internal_error("Unable to build signing HMAC")?; - - Ok(req.map(move |body| { - let stream = body_stream::<_, Error>(body); - let signed_payload_stream = - SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) - .map(|x| x.map(hyper::body::Frame::data)) - .map_err(Error::from); - ReqBody::new(StreamBody::new(signed_payload_stream)) - })) - } - _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), - } -} - -/// Result of `sha256("")` -const EMPTY_STRING_HEX_DIGEST: &str = - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - -fn compute_streaming_payload_signature( - signing_hmac: &HmacSha256, - date: DateTime<Utc>, - scope: &str, - previous_signature: Hash, - content_sha256: Hash, -) -> Result<Hash, Error> { - let string_to_sign = [ - AWS4_HMAC_SHA256_PAYLOAD, - &date.format(LONG_DATETIME).to_string(), - scope, - &hex::encode(previous_signature), - EMPTY_STRING_HEX_DIGEST, - &hex::encode(content_sha256), - ] - .join("\n"); - - let mut hmac = signing_hmac.clone(); - hmac.update(string_to_sign.as_bytes()); - - Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) -} - -mod payload { - use garage_util::data::Hash; - - pub enum Error<I> { - Parser(nom::error::Error<I>), - BadSignature, - } - - impl<I> Error<I> { - pub fn description(&self) -> &str { - match *self { - Error::Parser(ref e) => e.code.description(), - Error::BadSignature => "Bad signature", - } - } - } - - #[derive(Debug, Clone)] - pub struct Header { - pub size: usize, - pub signature: Hash, - } - - impl Header { - pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { - use nom::bytes::streaming::tag; - use nom::character::streaming::hex_digit1; - use nom::combinator::map_res; - use nom::number::streaming::hex_u32; - - macro_rules! try_parse { - ($expr:expr) => { - $expr.map_err(|e| e.map(Error::Parser))? - }; - } - - let (input, size) = try_parse!(hex_u32(input)); - let (input, _) = try_parse!(tag(";")(input)); - - let (input, _) = try_parse!(tag("chunk-signature=")(input)); - let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); - let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; - - let (input, _) = try_parse!(tag("\r\n")(input)); - - let header = Header { - size: size as usize, - signature, - }; - - Ok((input, header)) - } - } -} - -#[derive(Debug)] -pub enum SignedPayloadStreamError { - Stream(Error), - InvalidSignature, - Message(String), -} - -impl SignedPayloadStreamError { - fn message(msg: &str) -> Self { - SignedPayloadStreamError::Message(msg.into()) - } -} - -impl From<SignedPayloadStreamError> for Error { - fn from(err: SignedPayloadStreamError) -> Self { - match err { - SignedPayloadStreamError::Stream(e) => e, - SignedPayloadStreamError::InvalidSignature => { - Error::bad_request("Invalid payload signature") - } - SignedPayloadStreamError::Message(e) => { - Error::bad_request(format!("Chunk format error: {}", e)) - } - } - } -} - -impl<I> From<payload::Error<I>> for SignedPayloadStreamError { - fn from(err: payload::Error<I>) -> Self { - Self::message(err.description()) - } -} - -impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError { - fn from(err: nom::error::Error<I>) -> Self { - Self::message(err.code.description()) - } -} - -struct SignedPayload { - header: payload::Header, - data: Bytes, -} - -#[pin_project::pin_project] -pub struct SignedPayloadStream<S> -where - S: Stream<Item = Result<Bytes, Error>>, -{ - #[pin] - stream: S, - buf: bytes::BytesMut, - datetime: DateTime<Utc>, - scope: String, - signing_hmac: HmacSha256, - previous_signature: Hash, -} - -impl<S> SignedPayloadStream<S> -where - S: Stream<Item = Result<Bytes, Error>>, -{ - pub fn new( - stream: S, - signing_hmac: HmacSha256, - datetime: DateTime<Utc>, - scope: &str, - seed_signature: Hash, - ) -> Self { - Self { - stream, - buf: bytes::BytesMut::new(), - datetime, - scope: scope.into(), - signing_hmac, - previous_signature: seed_signature, - } - } - - fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { - use nom::bytes::streaming::{tag, take}; - - macro_rules! try_parse { - ($expr:expr) => { - $expr.map_err(nom::Err::convert)? - }; - } - - let (input, header) = try_parse!(payload::Header::parse(input)); - - // 0-sized chunk is the last - if header.size == 0 { - return Ok(( - input, - SignedPayload { - header, - data: Bytes::new(), - }, - )); - } - - let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); - let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); - - let data = Bytes::from(data.to_vec()); - - Ok((input, SignedPayload { header, data })) - } -} - -impl<S> Stream for SignedPayloadStream<S> -where - S: Stream<Item = Result<Bytes, Error>> + Unpin, -{ - type Item = Result<Bytes, SignedPayloadStreamError>; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll<Option<Self::Item>> { - use std::task::Poll; - - let mut this = self.project(); - - loop { - let (input, payload) = match Self::parse_next(this.buf) { - Ok(res) => res, - Err(nom::Err::Incomplete(_)) => { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - this.buf.extend(bytes); - continue; - } - Some(Err(e)) => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) - } - None => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( - "Unexpected EOF", - )))); - } - } - } - Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { - return Poll::Ready(Some(Err(e))) - } - }; - - // 0-sized chunk is the last - if payload.data.is_empty() { - return Poll::Ready(None); - } - - let data_sha256sum = sha256sum(&payload.data); - - let expected_signature = compute_streaming_payload_signature( - this.signing_hmac, - *this.datetime, - this.scope, - *this.previous_signature, - data_sha256sum, - ) - .map_err(|e| { - SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) - })?; - - if payload.header.signature != expected_signature { - return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); - } - - *this.buf = input.into(); - *this.previous_signature = payload.header.signature; - - return Poll::Ready(Some(Ok(payload.data))); - } - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -#[cfg(test)] -mod tests { - use futures::prelude::*; - - use super::{SignedPayloadStream, SignedPayloadStreamError}; - - #[tokio::test] - async fn test_interrupted_signed_payload_stream() { - use chrono::{DateTime, Utc}; - - use garage_util::data::Hash; - - let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 - .unwrap() - .with_timezone(&Utc); - let secret_key = "test"; - let region = "test"; - let scope = crate::signature::compute_scope(&datetime, region, "s3"); - let signing_hmac = - crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); - - let data: &[&[u8]] = &[b"1"]; - let body = futures::stream::iter(data.iter().map(|block| Ok(block.to_vec().into()))); - - let seed_signature = Hash::default(); - - let mut stream = - SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature); - - assert!(stream.try_next().await.is_err()); - match stream.try_next().await { - Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} - item => panic!( - "Unexpected result, expected early EOF error, got {:?}", - item - ), - } - } -} |