diff options
Diffstat (limited to 'src')
83 files changed, 739 insertions, 556 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml new file mode 100644 index 00000000..adddf306 --- /dev/null +++ b/src/api/admin/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "garage_api_admin" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Admin API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true +garage_rpc.workspace = true +garage_api_common.workspace = true + +argon2.workspace = true +async-trait.workspace = true +err-derive.workspace = true +hex.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } + +[features] +metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 0e4565bb..6f0c474f 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; @@ -20,15 +19,15 @@ use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; -use crate::generic_server::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; -use crate::admin::bucket::*; -use crate::admin::cluster::*; -use crate::admin::error::*; -use crate::admin::key::*; -use crate::admin::router_v0; -use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::*; +use crate::bucket::*; +use crate::cluster::*; +use crate::error::*; +use crate::key::*; +use crate::router_v0; +use crate::router_v1::{Authorization, Endpoint}; pub type ResBody = BoxBody<Error>; @@ -221,7 +220,6 @@ impl AdminApiServer { } } -#[async_trait] impl ApiHandler for AdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index ac3cba00..2537bfc9 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -17,11 +17,12 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::admin::key::ApiBucketKeyPerm; -use crate::common_error::CommonError; -use crate::helpers::*; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::key::ApiBucketKeyPerm; pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let buckets = garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 357ac600..ffa0fa71 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -12,9 +12,10 @@ use garage_rpc::layout; use garage_model::garage::Garage; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::{json_ok_response, parse_json_body}; +use garage_api_common::helpers::{json_ok_response, parse_json_body}; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let layout = garage.system.cluster_layout(); diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 2668b42d..201f9b40 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -1,20 +1,24 @@ +use std::convert::TryFrom; + use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// The API access key does not exist @@ -29,17 +33,21 @@ pub enum Error { KeyAlreadyExists(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) +commonErrorDerivative!(Error); + +/// FIXME: helper errors are transformed into their corresponding variants +/// in the Error struct, but in many case a helper error should be considered +/// an internal error. +impl From<HelperError> for Error { + fn from(err: HelperError) -> Error { + match CommonError::try_from(err) { + Ok(ce) => Self::Common(ce), + Err(HelperError::NoSuchAccessKey(k)) => Self::NoSuchAccessKey(k), + Err(_) => unreachable!(), + } } } -impl CommonErrorDerivative for Error {} - impl Error { fn code(&self) -> &'static str { match self { diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 291b6d54..bebf3063 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -9,9 +9,10 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::*; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = garage diff --git a/src/api/admin/mod.rs b/src/api/admin/lib.rs index 43a8c59c..599e9b44 100644 --- a/src/api/admin/mod.rs +++ b/src/api/admin/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router_v0; diff --git a/src/api/admin/router_v0.rs b/src/api/admin/router_v0.rs index 68676445..9dd742ba 100644 --- a/src/api/admin/router_v0.rs +++ b/src/api/admin/router_v0.rs @@ -2,8 +2,9 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; router_match! {@func diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs index cc5ff2ec..0b4901ea 100644 --- a/src/api/admin/router_v1.rs +++ b/src/api/admin/router_v1.rs @@ -2,9 +2,10 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::admin::router_v0; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; +use crate::router_v0; pub enum Authorization { None, diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml new file mode 100644 index 00000000..5b9cf479 --- /dev/null +++ b/src/api/common/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "garage_api_common" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Common functions for the API server crates for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true + +bytes.workspace = true +chrono.workspace = true +crypto-common.workspace = true +err-derive.workspace = true +hex.workspace = true +hmac.workspace = true +idna.workspace = true +tracing.workspace = true +nom.workspace = true +pin-project.workspace = true +sha2.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +hyper-util.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/common_error.rs b/src/api/common/common_error.rs index c47555d4..597a3511 100644 --- a/src/api/common_error.rs +++ b/src/api/common/common_error.rs @@ -1,3 +1,5 @@ +use std::convert::TryFrom; + use err_derive::Error; use hyper::StatusCode; @@ -55,6 +57,35 @@ pub enum CommonError { InvalidBucketName(String), } +#[macro_export] +macro_rules! commonErrorDerivative { + ( $error_struct: ident ) => { + impl From<garage_util::error::Error> for $error_struct { + fn from(err: garage_util::error::Error) -> Self { + Self::Common(CommonError::InternalError(err)) + } + } + impl From<http::Error> for $error_struct { + fn from(err: http::Error) -> Self { + Self::Common(CommonError::Http(err)) + } + } + impl From<hyper::Error> for $error_struct { + fn from(err: hyper::Error) -> Self { + Self::Common(CommonError::Hyper(err)) + } + } + impl From<hyper::header::ToStrError> for $error_struct { + fn from(err: hyper::header::ToStrError) -> Self { + Self::Common(CommonError::InvalidHeader(err)) + } + } + impl CommonErrorDerivative for $error_struct {} + }; +} + +pub use commonErrorDerivative; + impl CommonError { pub fn http_status_code(&self) -> StatusCode { match self { @@ -97,18 +128,39 @@ impl CommonError { } } -impl From<HelperError> for CommonError { - fn from(err: HelperError) -> Self { +impl TryFrom<HelperError> for CommonError { + type Error = HelperError; + + fn try_from(err: HelperError) -> Result<Self, HelperError> { match err { - HelperError::Internal(i) => Self::InternalError(i), - HelperError::BadRequest(b) => Self::BadRequest(b), - HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n), - HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n), - e => Self::bad_request(format!("{}", e)), + HelperError::Internal(i) => Ok(Self::InternalError(i)), + HelperError::BadRequest(b) => Ok(Self::BadRequest(b)), + HelperError::InvalidBucketName(n) => Ok(Self::InvalidBucketName(n)), + HelperError::NoSuchBucket(n) => Ok(Self::NoSuchBucket(n)), + e => Err(e), } } } +/// This function converts HelperErrors into CommonErrors, +/// for variants that exist in CommonError. +/// This is used for helper functions that might return InvalidBucketName +/// or NoSuchBucket for instance, and we want to pass that error +/// up to our caller. +pub fn pass_helper_error(err: HelperError) -> CommonError { + match CommonError::try_from(err) { + Ok(e) => e, + Err(e) => panic!("Helper error `{}` should hot have happenned here", e), + } +} + +pub fn helper_error_as_internal(err: HelperError) -> CommonError { + match err { + HelperError::Internal(e) => CommonError::InternalError(e), + e => CommonError::InternalError(GarageError::Message(e.to_string())), + } +} + pub trait CommonErrorDerivative: From<CommonError> { fn internal_error<M: ToString>(msg: M) -> Self { Self::from(CommonError::InternalError(GarageError::Message( diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs new file mode 100644 index 00000000..14369b56 --- /dev/null +++ b/src/api/common/cors.rs @@ -0,0 +1,170 @@ +use std::sync::Arc; + +use http::header::{ + ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, + ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, +}; +use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, StatusCode}; + +use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule}; +use garage_model::garage::Garage; + +use crate::common_error::{ + helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError, +}; +use crate::helpers::*; + +pub fn find_matching_cors_rule<'a>( + bucket_params: &'a BucketParams, + req: &Request<impl Body>, +) -> Result<Option<&'a GarageCorsRule>, CommonError> { + if let Some(cors_config) = bucket_params.cors_config.get() { + if let Some(origin) = req.headers().get("Origin") { + let origin = origin.to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + return Ok(cors_config.iter().find(|rule| { + cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) + })); + } + } + Ok(None) +} + +pub fn cors_rule_matches<'a, HI, S>( + rule: &GarageCorsRule, + origin: &'a str, + method: &'a str, + mut request_headers: HI, +) -> bool +where + HI: Iterator<Item = S>, + S: AsRef<str>, +{ + rule.allow_origins.iter().any(|x| x == "*" || x == origin) + && rule.allow_methods.iter().any(|x| x == "*" || x == method) + && request_headers.all(|h| { + rule.allow_headers + .iter() + .any(|x| x == "*" || x == h.as_ref()) + }) +} + +pub fn add_cors_headers( + resp: &mut Response<impl Body>, + rule: &GarageCorsRule, +) -> Result<(), http::header::InvalidHeaderValue> { + let h = resp.headers_mut(); + h.insert( + ACCESS_CONTROL_ALLOW_ORIGIN, + rule.allow_origins.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_METHODS, + rule.allow_methods.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_HEADERS, + rule.allow_headers.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_EXPOSE_HEADERS, + rule.expose_headers.join(", ").parse()?, + ); + Ok(()) +} + +pub async fn handle_options_api( + garage: Arc<Garage>, + req: &Request<IncomingBody>, + bucket_name: Option<String>, +) -> Result<Response<EmptyBody>, CommonError> { + // FIXME: CORS rules of buckets with local aliases are + // not taken into account. + + // If the bucket name is a global bucket name, + // we try to apply the CORS rules of that bucket. + // If a user has a local bucket name that has + // the same name, its CORS rules won't be applied + // and will be shadowed by the rules of the globally + // existing bucket (but this is inevitable because + // OPTIONS calls are not auhtenticated). + if let Some(bn) = bucket_name { + let helper = garage.bucket_helper(); + let bucket_id = helper + .resolve_global_bucket_name(&bn) + .await + .map_err(helper_error_as_internal)?; + if let Some(id) = bucket_id { + let bucket = garage + .bucket_helper() + .get_existing_bucket(id) + .await + .map_err(helper_error_as_internal)?; + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) + } else { + // If there is a bucket name in the request, but that name + // does not correspond to a global alias for a bucket, + // then it's either a non-existing bucket or a local bucket. + // We have no way of knowing, because the request is not + // authenticated and thus we can't resolve local aliases. + // We take the permissive approach of allowing everything, + // because we don't want to prevent web apps that use + // local bucket names from making API calls. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "*") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } + } else { + // If there is no bucket name in the request, + // we are doing a ListBuckets call, which we want to allow + // for all origins. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } +} + +pub fn handle_options_for_bucket( + req: &Request<IncomingBody>, + bucket_params: &BucketParams, +) -> Result<Response<EmptyBody>, CommonError> { + let origin = req + .headers() + .get("Origin") + .ok_or_bad_request("Missing Origin header")? + .to_str()?; + let request_method = req + .headers() + .get(ACCESS_CONTROL_REQUEST_METHOD) + .ok_or_bad_request("Missing Access-Control-Request-Method header")? + .to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + + if let Some(cors_config) = bucket_params.cors_config.get() { + let matching_rule = cors_config + .iter() + .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); + if let Some(rule) = matching_rule { + let mut resp = Response::builder() + .status(StatusCode::OK) + .body(EmptyBody::new())?; + add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; + return Ok(resp); + } + } + + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) +} diff --git a/src/api/encoding.rs b/src/api/common/encoding.rs index e286a784..e286a784 100644 --- a/src/api/encoding.rs +++ b/src/api/common/encoding.rs diff --git a/src/api/generic_server.rs b/src/api/common/generic_server.rs index 283abdd4..6ddc2ff2 100644 --- a/src/api/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; - use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; @@ -36,7 +34,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, ErrorBody}; -pub(crate) trait ApiEndpoint: Send + Sync + 'static { +pub trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); } @@ -47,8 +45,7 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } -#[async_trait] -pub(crate) trait ApiHandler: Send + Sync + 'static { +pub trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; @@ -56,14 +53,14 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Error: ApiError; fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; - async fn handle( + fn handle( &self, req: Request<IncomingBody>, endpoint: Self::Endpoint, - ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; + ) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send; } -pub(crate) struct ApiServer<A: ApiHandler> { +pub struct ApiServer<A: ApiHandler> { region: String, api_handler: A, @@ -248,13 +245,11 @@ impl<A: ApiHandler> ApiServer<A> { // ==== helper functions ==== -#[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; + fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send; } -#[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { @@ -266,7 +261,6 @@ impl Accept for TcpListener { pub struct UnixListenerOn(pub UnixListener, pub String); -#[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { diff --git a/src/api/helpers.rs b/src/api/common/helpers.rs index cf60005d..c8586de4 100644 --- a/src/api/helpers.rs +++ b/src/api/common/helpers.rs @@ -363,9 +363,9 @@ mod tests { } #[derive(Serialize)] -pub(crate) struct CustomApiErrorBody { - pub(crate) code: String, - pub(crate) message: String, - pub(crate) region: String, - pub(crate) path: String, +pub struct CustomApiErrorBody { + pub code: String, + pub message: String, + pub region: String, + pub path: String, } diff --git a/src/api/common/lib.rs b/src/api/common/lib.rs new file mode 100644 index 00000000..0e655a53 --- /dev/null +++ b/src/api/common/lib.rs @@ -0,0 +1,12 @@ +//! Crate for serving a S3 compatible API +#[macro_use] +extern crate tracing; + +pub mod common_error; + +pub mod cors; +pub mod encoding; +pub mod generic_server; +pub mod helpers; +pub mod router_macros; +pub mod signature; diff --git a/src/api/router_macros.rs b/src/api/common/router_macros.rs index 8f10a4f5..d9fe86db 100644 --- a/src/api/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -1,5 +1,6 @@ /// This macro is used to generate very repetitive match {} blocks in this module /// It is _not_ made to be used anywhere else +#[macro_export] macro_rules! router_match { (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } @@ -133,6 +134,7 @@ macro_rules! router_match { /// This macro is used to generate part of the code in this module. It must be called only one, and /// is useless outside of this module. +#[macro_export] macro_rules! generateQueryParameters { ( keywords: [ $($kw_param:expr => $kw_name: ident),* ], @@ -220,5 +222,5 @@ macro_rules! generateQueryParameters { } } -pub(crate) use generateQueryParameters; -pub(crate) use router_match; +pub use generateQueryParameters; +pub use router_match; diff --git a/src/api/signature/error.rs b/src/api/common/signature/error.rs index 2d92a072..2d92a072 100644 --- a/src/api/signature/error.rs +++ b/src/api/common/signature/error.rs diff --git a/src/api/signature/mod.rs b/src/api/common/signature/mod.rs index 6514da43..6514da43 100644 --- a/src/api/signature/mod.rs +++ b/src/api/common/signature/mod.rs diff --git a/src/api/signature/payload.rs b/src/api/common/signature/payload.rs index 9e5a6043..81541e4a 100644 --- a/src/api/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -518,7 +518,7 @@ impl Authorization { }) } - pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> { + pub fn parse_form(params: &HeaderMap) -> Result<Self, Error> { let algorithm = params .get(X_AMZ_ALGORITHM) .ok_or_bad_request("Missing X-Amz-Algorithm header")? diff --git a/src/api/signature/streaming.rs b/src/api/common/signature/streaming.rs index e223d1b1..e223d1b1 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml new file mode 100644 index 00000000..e3ebedca --- /dev/null +++ b/src/api/k2v/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "garage_api_k2v" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "K2V API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model = { workspace = true, features = [ "k2v" ] } +garage_table.workspace = true +garage_util = { workspace = true, features = [ "k2v" ] } +garage_api_common.workspace = true + +base64.workspace = true +err-derive.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +percent-encoding.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index f2a3942e..eb276f5b 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use tokio::sync::watch; @@ -12,26 +10,25 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; -use crate::generic_server::*; -use crate::k2v::error::*; - -use crate::signature::verify_request; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; -use crate::helpers::*; -use crate::k2v::batch::*; -use crate::k2v::index::*; -use crate::k2v::item::*; -use crate::k2v::router::Endpoint; -use crate::s3::cors::*; +use crate::batch::*; +use crate::error::*; +use crate::index::*; +use crate::item::*; +use crate::router::Endpoint; -pub use crate::signature::streaming::ReqBody; +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct K2VApiServer { garage: Arc<Garage>, } -pub(crate) struct K2VApiEndpoint { +pub struct K2VApiEndpoint { bucket_name: String, endpoint: Endpoint, } @@ -49,7 +46,6 @@ impl K2VApiServer { } } -#[async_trait] impl ApiHandler for K2VApiServer { const API_NAME: &'static str = "k2v"; const API_NAME_DISPLAY: &'static str = "K2V"; @@ -90,11 +86,13 @@ impl ApiHandler for K2VApiServer { let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) - .await?; + .await + .map_err(helper_error_as_internal)?; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 02b7ae8b..c284dbd4 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize}; use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::item::parse_causality_token; +use crate::range::read_range; pub async fn handle_insert_batch( ctx: ReqCtx, @@ -23,7 +24,7 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; + let ct = it.ct.map(|s| parse_causality_token(&s)).transpose()?; let v = match it.v { Some(vs) => DvvsValue::Value( BASE64_STANDARD @@ -281,7 +282,8 @@ pub(crate) async fn handle_poll_range( query.seen_marker, timeout_msec, ) - .await?; + .await + .map_err(pass_helper_error)?; if let Some((items, seen_marker)) = resp { let resp = PollRangeResponse { diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 16479227..3cd0e6f7 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -2,18 +2,21 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::signature::error::Error as SignatureError; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed @@ -28,6 +31,10 @@ pub enum Error { #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), + /// Invalid causality token + #[error(display = "Invalid causality token")] + InvalidCausalityToken, + /// The client asked for an invalid return format (invalid Accept header) #[error(display = "Not acceptable: {}", _0)] NotAcceptable(String), @@ -37,16 +44,7 @@ pub enum Error { InvalidUtf8Str(#[error(source)] std::str::Utf8Error), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) - } -} - -impl CommonErrorDerivative for Error {} +commonErrorDerivative!(Error); impl From<SignatureError> for Error { fn from(err: SignatureError) -> Self { @@ -72,6 +70,7 @@ impl Error { Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InvalidBase64(_) => "InvalidBase64", Error::InvalidUtf8Str(_) => "InvalidUtf8String", + Error::InvalidCausalityToken => "CausalityToken", } } } @@ -85,7 +84,8 @@ impl ApiError for Error { Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) - | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8Str(_) + | Error::InvalidCausalityToken => StatusCode::BAD_REQUEST, } } diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e3397238..fbfaad98 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,10 +5,11 @@ use garage_table::util::*; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; -use crate::helpers::*; -use crate::k2v::api_server::ResBody; -use crate::k2v::error::*; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::range::read_range; pub async fn handle_read_index( ctx: ReqCtx, diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index af3af4e4..4e28b499 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -6,9 +6,10 @@ use hyper::{Request, Response, StatusCode}; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; @@ -18,6 +19,10 @@ pub enum ReturnFormat { Either, } +pub(crate) fn parse_causality_token(s: &str) -> Result<CausalContext, Error> { + CausalContext::parse(s).ok_or(Error::InvalidCausalityToken) +} + impl ReturnFormat { pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> { let accept = match req.headers().get(header::ACCEPT) { @@ -136,7 +141,7 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse_helper) + .map(parse_causality_token) .transpose()?; let body = http_body_util::BodyExt::collect(req.into_body()) @@ -176,7 +181,7 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse_helper) + .map(parse_causality_token) .transpose()?; let value = DvvsValue::Deleted; diff --git a/src/api/k2v/mod.rs b/src/api/k2v/lib.rs index b6a8c5cf..334ae46b 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index bb9d3be5..eb4738db 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -7,8 +7,9 @@ use std::sync::Arc; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::helpers::key_after_prefix; -use crate::k2v::error::*; +use garage_api_common::helpers::key_after_prefix; + +use crate::error::*; /// Read range in a Garage table. /// Returns (entries, more?, nextStart) diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index 1cc58be5..a04b0f81 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -1,11 +1,11 @@ -use crate::k2v::error::*; +use crate::error::*; use std::borrow::Cow; use hyper::{Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; router_match! {@func diff --git a/src/api/lib.rs b/src/api/lib.rs deleted file mode 100644 index 370dfd7a..00000000 --- a/src/api/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Crate for serving a S3 compatible API -#[macro_use] -extern crate tracing; - -pub mod common_error; - -mod encoding; -pub mod generic_server; -pub mod helpers; -mod router_macros; -/// This mode is public only to help testing. Don't expect stability here -pub mod signature; - -pub mod admin; -#[cfg(feature = "k2v")] -pub mod k2v; -pub mod s3; diff --git a/src/api/Cargo.toml b/src/api/s3/Cargo.toml index 85b78a5b..387e45db 100644 --- a/src/api/Cargo.toml +++ b/src/api/s3/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "garage_api" +name = "garage_api_s3" version = "1.0.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" @@ -20,30 +20,24 @@ garage_block.workspace = true garage_net.workspace = true garage_util.workspace = true garage_rpc.workspace = true +garage_api_common.workspace = true aes-gcm.workspace = true -argon2.workspace = true async-compression.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true crc32fast.workspace = true crc32c.workspace = true -crypto-common.workspace = true err-derive.workspace = true hex.workspace = true -hmac.workspace = true -idna.workspace = true tracing.workspace = true md-5.workspace = true -nom.workspace = true pin-project.workspace = true sha1.workspace = true sha2.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true @@ -54,21 +48,13 @@ httpdate.workspace = true http-range.workspace = true http-body-util.workspace = true hyper = { workspace = true, default-features = false, features = ["server", "http1"] } -hyper-util.workspace = true multer.workspace = true percent-encoding.workspace = true roxmltree.workspace = true url.workspace = true serde.workspace = true -serde_bytes.workspace = true serde_json.workspace = true quick-xml.workspace = true opentelemetry.workspace = true -opentelemetry-prometheus = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } - -[features] -k2v = [ "garage_util/k2v", "garage_model/k2v" ] -metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 1737af33..bf48bba1 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::sync::watch; @@ -14,33 +12,33 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; use garage_model::key_table::Key; -use crate::generic_server::*; -use crate::s3::error::*; - -use crate::signature::verify_request; - -use crate::helpers::*; -use crate::s3::bucket::*; -use crate::s3::copy::*; -use crate::s3::cors::*; -use crate::s3::delete::*; -use crate::s3::get::*; -use crate::s3::lifecycle::*; -use crate::s3::list::*; -use crate::s3::multipart::*; -use crate::s3::post_object::handle_post_object; -use crate::s3::put::*; -use crate::s3::router::Endpoint; -use crate::s3::website::*; - -pub use crate::signature::streaming::ReqBody; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; + +use crate::bucket::*; +use crate::copy::*; +use crate::cors::*; +use crate::delete::*; +use crate::error::*; +use crate::get::*; +use crate::lifecycle::*; +use crate::list::*; +use crate::multipart::*; +use crate::post_object::handle_post_object; +use crate::put::*; +use crate::router::Endpoint; +use crate::website::*; + +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct S3ApiServer { garage: Arc<Garage>, } -pub(crate) struct S3ApiEndpoint { +pub struct S3ApiEndpoint { bucket_name: Option<String>, endpoint: Endpoint, } @@ -70,7 +68,6 @@ impl S3ApiServer { } } -#[async_trait] impl ApiHandler for S3ApiServer { const API_NAME: &'static str = "s3"; const API_NAME_DISPLAY: &'static str = "S3"; @@ -150,7 +147,8 @@ impl ApiHandler for S3ApiServer { let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 6a12aa9c..0a192ba6 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -13,12 +13,13 @@ use garage_util::crdt::*; use garage_util::data::*; use garage_util::time::*; -use crate::common_error::CommonError; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml as s3_xml; pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = ctx; diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs index c7527163..02fb55ec 100644 --- a/src/api/s3/checksum.rs +++ b/src/api/s3/checksum.rs @@ -15,7 +15,7 @@ use garage_util::error::OkOrMessage; use garage_model::s3::object_table::*; -use crate::s3::error::*; +use crate::error::*; pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index e375a714..07d50ea5 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -20,15 +20,16 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::get::full_object_byte_stream; -use crate::s3::multipart; -use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; -use crate::s3::xml::{self as s3_xml, xmlns_tag}; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::get::full_object_byte_stream; +use crate::multipart; +use crate::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; +use crate::xml::{self as s3_xml, xmlns_tag}; // -------- CopyObject --------- @@ -655,7 +656,8 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, let source_bucket_id = garage .bucket_helper() .resolve_bucket(&source_bucket.to_string(), api_key) - .await?; + .await + .map_err(pass_helper_error)?; if !api_key.allow_read(&source_bucket_id) { return Err(Error::forbidden(format!( @@ -861,7 +863,7 @@ pub struct CopyPartResult { #[cfg(test)] mod tests { use super::*; - use crate::s3::xml::to_xml_with_header; + use crate::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 173b7ffe..625b84db 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -1,30 +1,21 @@ use quick_xml::de::from_reader; -use std::sync::Arc; -use http::header::{ - ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, - ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, -}; -use hyper::{ - body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, - StatusCode, -}; +use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; -use crate::common_error::CommonError; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; - -use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; -use garage_model::garage::Garage; +use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_util::data::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; + pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; if let Some(cors) = bucket_params.cors_config.get() { @@ -99,154 +90,6 @@ pub async fn handle_put_cors( .body(empty_body())?) } -pub async fn handle_options_api( - garage: Arc<Garage>, - req: &Request<IncomingBody>, - bucket_name: Option<String>, -) -> Result<Response<EmptyBody>, CommonError> { - // FIXME: CORS rules of buckets with local aliases are - // not taken into account. - - // If the bucket name is a global bucket name, - // we try to apply the CORS rules of that bucket. - // If a user has a local bucket name that has - // the same name, its CORS rules won't be applied - // and will be shadowed by the rules of the globally - // existing bucket (but this is inevitable because - // OPTIONS calls are not auhtenticated). - if let Some(bn) = bucket_name { - let helper = garage.bucket_helper(); - let bucket_id = helper.resolve_global_bucket_name(&bn).await?; - if let Some(id) = bucket_id { - let bucket = garage.bucket_helper().get_existing_bucket(id).await?; - let bucket_params = bucket.state.into_option().unwrap(); - handle_options_for_bucket(req, &bucket_params) - } else { - // If there is a bucket name in the request, but that name - // does not correspond to a global alias for a bucket, - // then it's either a non-existing bucket or a local bucket. - // We have no way of knowing, because the request is not - // authenticated and thus we can't resolve local aliases. - // We take the permissive approach of allowing everything, - // because we don't want to prevent web apps that use - // local bucket names from making API calls. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "*") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } - } else { - // If there is no bucket name in the request, - // we are doing a ListBuckets call, which we want to allow - // for all origins. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } -} - -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, - bucket_params: &BucketParams, -) -> Result<Response<EmptyBody>, CommonError> { - let origin = req - .headers() - .get("Origin") - .ok_or_bad_request("Missing Origin header")? - .to_str()?; - let request_method = req - .headers() - .get(ACCESS_CONTROL_REQUEST_METHOD) - .ok_or_bad_request("Missing Access-Control-Request-Method header")? - .to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - - if let Some(cors_config) = bucket_params.cors_config.get() { - let matching_rule = cors_config - .iter() - .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); - if let Some(rule) = matching_rule { - let mut resp = Response::builder() - .status(StatusCode::OK) - .body(EmptyBody::new())?; - add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; - return Ok(resp); - } - } - - Err(CommonError::Forbidden( - "This CORS request is not allowed.".into(), - )) -} - -pub fn find_matching_cors_rule<'a>( - bucket_params: &'a BucketParams, - req: &Request<impl Body>, -) -> Result<Option<&'a GarageCorsRule>, Error> { - if let Some(cors_config) = bucket_params.cors_config.get() { - if let Some(origin) = req.headers().get("Origin") { - let origin = origin.to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - return Ok(cors_config.iter().find(|rule| { - cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) - })); - } - } - Ok(None) -} - -fn cors_rule_matches<'a, HI, S>( - rule: &GarageCorsRule, - origin: &'a str, - method: &'a str, - mut request_headers: HI, -) -> bool -where - HI: Iterator<Item = S>, - S: AsRef<str>, -{ - rule.allow_origins.iter().any(|x| x == "*" || x == origin) - && rule.allow_methods.iter().any(|x| x == "*" || x == method) - && request_headers.all(|h| { - rule.allow_headers - .iter() - .any(|x| x == "*" || x == h.as_ref()) - }) -} - -pub fn add_cors_headers( - resp: &mut Response<impl Body>, - rule: &GarageCorsRule, -) -> Result<(), http::header::InvalidHeaderValue> { - let h = resp.headers_mut(); - h.insert( - ACCESS_CONTROL_ALLOW_ORIGIN, - rule.allow_origins.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_METHODS, - rule.allow_methods.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_HEADERS, - rule.allow_headers.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_EXPOSE_HEADERS, - rule.expose_headers.join(", ").parse()?, - ); - Ok(()) -} - // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 57f6f948..b799e67a 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -5,12 +5,13 @@ use garage_util::data::*; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::put::next_timestamp; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::put::next_timestamp; +use crate::xml as s3_xml; async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { let ReqCtx { diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 2e6ed65c..b38d7792 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -28,9 +28,10 @@ use garage_util::migrate::Migrate; use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; -use crate::common_error::*; -use crate::s3::checksum::Md5Checksum; -use crate::s3::error::Error; +use garage_api_common::common_error::*; + +use crate::checksum::Md5Checksum; +use crate::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm"); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 2855e0b3..1bb8909c 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -4,19 +4,30 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::s3::xml as s3_xml; -use crate::signature::error::Error as SignatureError; +use garage_model::helper::error::Error as HelperError; + +pub(crate) use garage_api_common::common_error::pass_helper_error; + +use garage_api_common::common_error::{ + commonErrorDerivative, helper_error_as_internal, CommonError, +}; + +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; + +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; + +use crate::xml as s3_xml; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed @@ -78,17 +89,16 @@ pub enum Error { NotImplemented(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) +commonErrorDerivative!(Error); + +// Helper errors are always passed as internal errors by default. +// To pass the specific error code back to the client, use `pass_helper_error`. +impl From<HelperError> for Error { + fn from(err: HelperError) -> Error { + Error::Common(helper_error_as_internal(err)) } } -impl CommonErrorDerivative for Error {} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index f5d3cf11..c2393a51 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -25,11 +25,12 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; +use crate::encryption::EncryptionParams; +use crate::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -68,14 +69,11 @@ fn object_headers( // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html let mut headers_by_name = BTreeMap::new(); for (name, value) in meta_inner.headers.iter() { - match headers_by_name.get_mut(name) { - None => { - headers_by_name.insert(name, vec![value.as_str()]); - } - Some(headers) => { - headers.push(value.as_str()); - } - } + let name_lower = name.to_ascii_lowercase(); + headers_by_name + .entry(name_lower) + .or_insert(vec![]) + .push(value.as_str()); } for (name, values) in headers_by_name { diff --git a/src/api/s3/mod.rs b/src/api/s3/lib.rs index b9bb1a6f..fd99b443 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; pub mod error; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 7eb1c2cb..c35047ed 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -5,11 +5,12 @@ use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 68d6cbe6..a5cc03b0 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -13,13 +13,14 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; -use crate::encoding::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::multipart as s3_multipart; -use crate::s3::xml as s3_xml; +use garage_api_common::encoding::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::multipart as s3_multipart; +use crate::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; const DUMMY_KEY: &str = "GKDummyKey"; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 3db3e8aa..fe39fc93 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -15,14 +15,15 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::*; +use crate::xml as s3_xml; // ---- diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 725f3847..6c0e73d4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -16,15 +16,16 @@ use serde::Deserialize; use garage_model::garage::Garage; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::*; -use crate::s3::cors::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::{get_headers, save_stream, ChecksumMode}; -use crate::s3::xml as s3_xml; -use crate::signature::payload::{verify_v4, Authorization}; +use garage_api_common::cors::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::payload::{verify_v4, Authorization}; + +use crate::api_server::ResBody; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::{get_headers, save_stream, ChecksumMode}; +use crate::xml as s3_xml; pub async fn handle_post_object( garage: Arc<Garage>, @@ -107,7 +108,8 @@ pub async fn handle_post_object( let bucket_id = garage .bucket_helper() .resolve_bucket(&bucket_name, &api_key) - .await?; + .await + .map_err(pass_helper_error)?; if !api_key.allow_write(&bucket_id) { return Err(Error::forbidden("Operation is not allowed for this key.")); diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 1e3b1b44..530b4e7b 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -30,11 +30,12 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; @@ -622,7 +623,7 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList for (name, value) in headers.iter() { if name.as_str().starts_with("x-amz-meta-") { ret.push(( - name.to_string(), + name.as_str().to_ascii_lowercase(), std::str::from_utf8(value.as_bytes())?.to_string(), )); } diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index e7ac1d77..9de84b2b 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -3,9 +3,10 @@ use std::borrow::Cow; use hyper::header::HeaderValue; use hyper::{HeaderMap, Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; -use crate::s3::error::*; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; + +use crate::error::*; router_match! {@func diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index fa36bc32..b55bb345 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -4,15 +4,16 @@ use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; - use garage_model::bucket_table::*; use garage_util::data::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; + pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; if let Some(website) = bucket_params.website_config.get() { diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 1e569ade..e8af3ec0 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -1,7 +1,7 @@ use quick_xml::se::to_string; use serde::{Deserialize, Serialize, Serializer}; -use crate::s3::error::Error as ApiError; +use crate::error::Error as ApiError; pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> { let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string(); diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1af4d7f5..3358a3e7 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -34,10 +34,8 @@ async-compression.workspace = true zstd.workspace = true serde.workspace = true -serde_bytes.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/src/block/manager.rs b/src/block/manager.rs index 40b177a2..537e1fc1 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::{ArcSwap, ArcSwapOption}; -use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; @@ -688,7 +687,6 @@ impl BlockManager { } } -#[async_trait] impl StreamingEndpointHandler<BlockRpc> for BlockManager { async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> { match message.msg() { diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 0a278bc0..3ef51fae 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -13,7 +13,6 @@ path = "lib.rs" [dependencies] err-derive.workspace = true -hexdump.workspace = true tracing.workspace = true heed = { workspace = true, optional = true } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index f3aa35d1..ce6412b6 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -144,9 +144,12 @@ impl IDb for SqliteDb { let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; info!("Sqlite snapshot progress: {}%", percent); } + std::fs::create_dir_all(to)?; + let mut path = to.clone(); + path.push("db.sqlite"); self.db .get()? - .backup(rusqlite::DatabaseName::Main, to, Some(progress))?; + .backup(rusqlite::DatabaseName::Main, path, Some(progress))?; Ok(()) } diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 483e33c0..c036f000 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -23,7 +23,9 @@ path = "tests/lib.rs" [dependencies] format_table.workspace = true garage_db.workspace = true -garage_api.workspace = true +garage_api_admin.workspace = true +garage_api_s3.workspace = true +garage_api_k2v = { workspace = true, optional = true } garage_block.workspace = true garage_model.workspace = true garage_net.workspace = true @@ -40,7 +42,6 @@ parse_duration.workspace = true hex.workspace = true tracing.workspace = true tracing-subscriber.workspace = true -rand.workspace = true async-trait.workspace = true sha1.workspace = true sodiumoxide.workspace = true @@ -48,21 +49,18 @@ structopt.workspace = true git-version.workspace = true serde.workspace = true -serde_bytes.workspace = true -toml.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true opentelemetry.workspace = true opentelemetry-prometheus = { workspace = true, optional = true } opentelemetry-otlp = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } syslog-tracing = { workspace = true, optional = true } [dev-dependencies] -aws-config.workspace = true +garage_api_common.workspace = true + aws-sdk-s3.workspace = true chrono.workspace = true http.workspace = true @@ -84,7 +82,7 @@ k2v-client.workspace = true [features] default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ] -k2v = [ "garage_util/k2v", "garage_api/k2v" ] +k2v = [ "garage_util/k2v", "garage_api_k2v" ] # Database engines lmdb = [ "garage_model/lmdb" ] @@ -95,7 +93,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). -metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ] +metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ] # Exporter for the OpenTelemetry Collector. telemetry-otlp = [ "opentelemetry-otlp" ] # Logging to syslog diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e2468143..ea414b56 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -4,9 +4,11 @@ mod key; use std::collections::HashMap; use std::fmt::Write; +use std::future::Future; use std::sync::Arc; -use async_trait::async_trait; +use futures::future::FutureExt; + use serde::{Deserialize, Serialize}; use format_table::format_table_to_string; @@ -505,22 +507,25 @@ impl AdminRpcHandler { } } -#[async_trait] impl EndpointHandler<AdminRpc> for AdminRpcHandler { - async fn handle( + fn handle( self: &Arc<Self>, message: &AdminRpc, _from: NodeID, - ) -> Result<AdminRpc, Error> { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), + ) -> impl Future<Output = Result<AdminRpc, Error>> + Send { + let self2 = self.clone(); + async move { + match message { + AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await, + AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await, + AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await, + AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await, + m => Err(GarageError::unexpected_rpc_message(m).into()), + } } + .boxed() } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2c5227d2..47883f97 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -93,17 +94,16 @@ pub async fn launch_online_repair( // ---- -#[async_trait] trait TableRepair: Send + Sync + 'static { type T: TableSchema; fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; - async fn process( + fn process( &mut self, garage: &Garage, entry: <<Self as TableRepair>::T as TableSchema>::E, - ) -> Result<bool, Error>; + ) -> impl Future<Output = Result<bool, Error>> + Send; } struct TableRepairWorker<T: TableRepair> { @@ -174,7 +174,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> { struct RepairVersions; -#[async_trait] impl TableRepair for RepairVersions { type T = VersionTable; @@ -221,7 +220,6 @@ impl TableRepair for RepairVersions { struct RepairBlockRefs; -#[async_trait] impl TableRepair for RepairBlockRefs { type T = BlockRefTable; @@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs { struct RepairMpu; -#[async_trait] impl TableRepair for RepairMpu { type T = MultipartUploadTable; diff --git a/src/garage/server.rs b/src/garage/server.rs index 65bf34db..9e58fa6d 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,13 +6,13 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_api::admin::api_server::AdminApiServer; -use garage_api::s3::api_server::S3ApiServer; +use garage_api_admin::api_server::AdminApiServer; +use garage_api_s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::WebServer; #[cfg(feature = "k2v")] -use garage_api::k2v::api_server::K2VApiServer; +use garage_api_k2v::api_server::K2VApiServer; use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 42368976..2db72e9f 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client}; use hyper_util::rt::TokioExecutor; use super::garage::{Instance, Key}; -use garage_api::signature; +use garage_api_common::signature; pub type Body = FullBody<hyper::body::Bytes>; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 694be1f8..bbd09b19 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -29,12 +29,11 @@ tokio.workspace = true # cli deps clap = { workspace = true, optional = true } format_table = { workspace = true, optional = true } -tracing = { workspace = true, optional = true } tracing-subscriber = { workspace = true, optional = true } [features] -cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"] +cli = ["clap", "tokio/fs", "tokio/io-std", "tracing-subscriber", "format_table"] [lib] path = "lib.rs" diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 12931a4c..b58ad43b 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -22,7 +22,6 @@ garage_util.workspace = true garage_net.workspace = true async-trait.workspace = true -arc-swap.workspace = true blake2.workspace = true chrono.workspace = true err-derive.workspace = true @@ -38,9 +37,7 @@ serde.workspace = true serde_bytes.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true -opentelemetry.workspace = true [features] default = [ "lmdb", "sqlite" ] diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index c80ebd39..7d311ede 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -16,8 +16,6 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -use crate::helper::error::{Error as HelperError, OkOrBadRequest}; - /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; @@ -99,10 +97,6 @@ impl CausalContext { Some(ret) } - pub fn parse_helper(s: &str) -> Result<Self, HelperError> { - Self::parse(s).ok_or_bad_request("Invalid causality token") - } - /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { vclock_gt(&self.vector_clock, &other.vector_clock) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a1bf6ee0..821f4549 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -10,7 +10,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; -use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -537,7 +536,6 @@ impl K2VRpcHandler { } } -#[async_trait] impl EndpointHandler<K2VRpc> for K2VRpcHandler { async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> { match message { diff --git a/src/model/snapshot.rs b/src/model/snapshot.rs index 87756f60..8e8995f9 100644 --- a/src/model/snapshot.rs +++ b/src/model/snapshot.rs @@ -41,8 +41,14 @@ pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> { } }; - let mut snapshots_dir = garage.config.metadata_dir.clone(); - snapshots_dir.push("snapshots"); + let snapshots_dir = match &garage.config.metadata_snapshots_dir { + Some(d) => d.clone(), + None => { + let mut default_snapshots_dir = garage.config.metadata_dir.clone(); + default_snapshots_dir.push("snapshots"); + default_snapshots_dir + } + }; fs::create_dir_all(&snapshots_dir)?; let mut new_path = snapshots_dir.clone(); diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 686aaaea..c0b47a6e 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -30,7 +30,6 @@ rand.workspace = true log.workspace = true arc-swap.workspace = true -async-trait.workspace = true err-derive.workspace = true bytes.workspace = true cfg-if.workspace = true diff --git a/src/net/client.rs b/src/net/client.rs index 607dd173..20e1dacd 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; @@ -220,7 +219,6 @@ impl ClientConn { impl SendLoop for ClientConn {} -#[async_trait] impl RecvLoop for ClientConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 3cafafeb..d46acc42 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; -use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; use crate::error::Error; use crate::message::*; @@ -14,19 +15,17 @@ use crate::netapp::*; /// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` -#[async_trait] pub trait StreamingEndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; + fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send; } /// If one simply wants to use an endpoint in a client fashion, /// without locally serving requests to that endpoint, /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. -#[async_trait] impl<M: Message> EndpointHandler<M> for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); @@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () { /// This trait should be implemented by an object of your application /// that can handle a message of type `M`, in the cases where it doesn't /// care about attached stream in the request nor in the response. -#[async_trait] pub trait EndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response; + fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send; } -#[async_trait] impl<T, M> StreamingEndpointHandler<M> for T where T: EndpointHandler<M>, @@ -161,9 +158,8 @@ where pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; -#[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -174,21 +170,23 @@ where M: Message, H: StreamingEndpointHandler<M>; -#[async_trait] impl<M, H> GenericEndpoint for EndpointArc<M, H> where M: Message, H: StreamingEndpointHandler<M> + 'static, { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { - match self.0.handler.load_full() { - None => Err(Error::NoHandler), - Some(h) => { - let req = Req::from_enc(req_enc)?; - let res = h.handle(req, from).await; - Ok(res.into_enc()?) + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> { + async move { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } } } + .boxed() } fn drop_handler(&self) { diff --git a/src/net/netapp.rs b/src/net/netapp.rs index 77e55774..36c6fc88 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; @@ -457,7 +456,6 @@ impl NetApp { } } -#[async_trait] impl EndpointHandler<HelloMessage> for NetApp { async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); diff --git a/src/net/peering.rs b/src/net/peering.rs index a8d271ec..08378a08 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; -use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -592,7 +591,6 @@ impl PeeringManager { } } -#[async_trait] impl EndpointHandler<PingMessage> for PeeringManager { async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { @@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager { } } -#[async_trait] impl EndpointHandler<PeerListMessage> for PeeringManager { async fn handle( self: &Arc<Self>, diff --git a/src/net/recv.rs b/src/net/recv.rs index 0de7bef2..35a6d71a 100644 --- a/src/net/recv.rs +++ b/src/net/recv.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use bytes::Bytes; use log::*; @@ -50,7 +49,6 @@ impl Drop for Sender { /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. -#[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); fn cancel_handler(self: &Arc<Self>, _id: RequestID) {} diff --git a/src/net/send.rs b/src/net/send.rs index 1454eeb7..6f1ac02c 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; @@ -273,7 +272,6 @@ impl DataFrame { /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. -#[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, diff --git a/src/net/server.rs b/src/net/server.rs index 36dccb2f..fb6c6366 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; @@ -174,7 +173,6 @@ impl ServerConn { impl SendLoop for ServerConn {} -#[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index acde0911..fcc1c304 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,12 +15,10 @@ path = "lib.rs" [dependencies] format_table.workspace = true -garage_db.workspace = true garage_util.workspace = true garage_net.workspace = true arc-swap.workspace = true -bytes.workspace = true bytesize.workspace = true gethostname.workspace = true hex.workspace = true @@ -46,9 +44,7 @@ reqwest = { workspace = true, optional = true } pnet_datalink.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true -tokio-stream.workspace = true opentelemetry.workspace = true [features] diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 0fa68218..2a52ae5d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; @@ -749,7 +748,6 @@ impl System { } } -#[async_trait] impl EndpointHandler<SystemRpc> for System { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { match msg { diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e704cd3c..fad6ea08 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -22,7 +22,6 @@ opentelemetry.workspace = true async-trait.workspace = true arc-swap.workspace = true -bytes.workspace = true hex.workspace = true hexdump.workspace = true tracing.workspace = true diff --git a/src/table/gc.rs b/src/table/gc.rs index 9e060390..28ea119d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; + use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> { } } -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { diff --git a/src/table/sync.rs b/src/table/sync.rs index 234ee8ea..2d43b9fc 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { diff --git a/src/table/table.rs b/src/table/table.rs index ea8471d0..c96f4731 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -204,6 +203,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { entries_vec.push((write_sets, e_enc)); } + if entries_vec.is_empty() { + return Ok(()); + } + // Compute a deduplicated list of all of the write sets, // and compute an index from each node to the position of the sets in which // it takes part, to optimize the detection of a quorum. @@ -496,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { } } -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index da3e39b8..fec5b1ed 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -20,9 +20,7 @@ garage_net.workspace = true arc-swap.workspace = true async-trait.workspace = true blake2.workspace = true -bytes.workspace = true bytesize.workspace = true -digest.workspace = true err-derive.workspace = true hexdump.workspace = true xxhash-rust.workspace = true diff --git a/src/util/config.rs b/src/util/config.rs index 01f7350a..b4e2b008 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -31,6 +31,9 @@ pub struct Config { #[serde(default)] pub use_local_tz: bool, + /// Optional directory where metadata snapshots will be store + pub metadata_snapshots_dir: Option<PathBuf>, + /// Automatic snapshot interval for metadata #[serde(default)] pub metadata_auto_snapshot_interval: Option<String>, diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index d810d6f9..a0a3e566 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -14,7 +14,8 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api.workspace = true +garage_api_common.workspace = true +garage_api_s3.workspace = true garage_model.workspace = true garage_util.workspace = true garage_table.workspace = true @@ -23,12 +24,9 @@ err-derive.workspace = true tracing.workspace = true percent-encoding.workspace = true -futures.workspace = true - http.workspace = true http-body-util.workspace = true hyper.workspace = true -hyper-util.workspace = true tokio.workspace = true diff --git a/src/web/error.rs b/src/web/error.rs index bd8f17b5..7e6d4542 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -2,14 +2,14 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use garage_api::generic_server::ApiError; +use garage_api_common::generic_server::ApiError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { /// An error received from the API crate #[error(display = "API error: {}", _0)] - ApiError(garage_api::s3::error::Error), + ApiError(garage_api_s3::error::Error), /// The file does not exist #[error(display = "Not found")] @@ -22,10 +22,10 @@ pub enum Error { impl<T> From<T> for Error where - garage_api::s3::error::Error: From<T>, + garage_api_s3::error::Error: From<T>, { fn from(err: T) -> Self { - Error::ApiError(garage_api::s3::error::Error::from(err)) + Error::ApiError(garage_api_s3::error::Error::from(err)) } } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 69939f65..48dcb5b1 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -20,13 +20,15 @@ use opentelemetry::{ use crate::error::*; -use garage_api::generic_server::{server_loop, UnixListenerOn}; -use garage_api::helpers::*; -use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; -use garage_api::s3::error::{ +use garage_api_common::cors::{ + add_cors_headers, find_matching_cors_rule, handle_options_for_bucket, +}; +use garage_api_common::generic_server::{server_loop, UnixListenerOn}; +use garage_api_common::helpers::*; +use garage_api_s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, }; -use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; +use garage_api_s3::get::{handle_get_without_ctx, handle_head_without_ctx}; use garage_model::garage::Garage; |