diff options
Diffstat (limited to 'src')
87 files changed, 829 insertions, 597 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml new file mode 100644 index 00000000..adddf306 --- /dev/null +++ b/src/api/admin/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "garage_api_admin" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Admin API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true +garage_rpc.workspace = true +garage_api_common.workspace = true + +argon2.workspace = true +async-trait.workspace = true +err-derive.workspace = true +hex.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } + +[features] +metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 0e4565bb..6f0c474f 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; @@ -20,15 +19,15 @@ use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; -use crate::generic_server::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; -use crate::admin::bucket::*; -use crate::admin::cluster::*; -use crate::admin::error::*; -use crate::admin::key::*; -use crate::admin::router_v0; -use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::*; +use crate::bucket::*; +use crate::cluster::*; +use crate::error::*; +use crate::key::*; +use crate::router_v0; +use crate::router_v1::{Authorization, Endpoint}; pub type ResBody = BoxBody<Error>; @@ -221,7 +220,6 @@ impl AdminApiServer { } } -#[async_trait] impl ApiHandler for AdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index ac3cba00..2537bfc9 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -17,11 +17,12 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::admin::key::ApiBucketKeyPerm; -use crate::common_error::CommonError; -use crate::helpers::*; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::key::ApiBucketKeyPerm; pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let buckets = garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 357ac600..ffa0fa71 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -12,9 +12,10 @@ use garage_rpc::layout; use garage_model::garage::Garage; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::{json_ok_response, parse_json_body}; +use garage_api_common::helpers::{json_ok_response, parse_json_body}; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let layout = garage.system.cluster_layout(); diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 40d686e3..201f9b40 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -6,17 +6,19 @@ use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; -use crate::common_error::CommonError; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// The API access key does not exist @@ -31,14 +33,7 @@ pub enum Error { KeyAlreadyExists(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) - } -} +commonErrorDerivative!(Error); /// FIXME: helper errors are transformed into their corresponding variants /// in the Error struct, but in many case a helper error should be considered @@ -53,8 +48,6 @@ impl From<HelperError> for Error { } } -impl CommonErrorDerivative for Error {} - impl Error { fn code(&self) -> &'static str { match self { diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 291b6d54..bebf3063 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -9,9 +9,10 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; -use crate::admin::api_server::ResBody; -use crate::admin::error::*; -use crate::helpers::*; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = garage diff --git a/src/api/admin/mod.rs b/src/api/admin/lib.rs index 43a8c59c..599e9b44 100644 --- a/src/api/admin/mod.rs +++ b/src/api/admin/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router_v0; diff --git a/src/api/admin/router_v0.rs b/src/api/admin/router_v0.rs index 68676445..9dd742ba 100644 --- a/src/api/admin/router_v0.rs +++ b/src/api/admin/router_v0.rs @@ -2,8 +2,9 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; router_match! {@func diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs index cc5ff2ec..0b4901ea 100644 --- a/src/api/admin/router_v1.rs +++ b/src/api/admin/router_v1.rs @@ -2,9 +2,10 @@ use std::borrow::Cow; use hyper::{Method, Request}; -use crate::admin::error::*; -use crate::admin::router_v0; -use crate::router_macros::*; +use garage_api_common::router_macros::*; + +use crate::error::*; +use crate::router_v0; pub enum Authorization { None, diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml new file mode 100644 index 00000000..5b9cf479 --- /dev/null +++ b/src/api/common/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "garage_api_common" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Common functions for the API server crates for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model.workspace = true +garage_table.workspace = true +garage_util.workspace = true + +bytes.workspace = true +chrono.workspace = true +crypto-common.workspace = true +err-derive.workspace = true +hex.workspace = true +hmac.workspace = true +idna.workspace = true +tracing.workspace = true +nom.workspace = true +pin-project.workspace = true +sha2.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +hyper-util.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/common_error.rs b/src/api/common/common_error.rs index 0c8006dc..597a3511 100644 --- a/src/api/common_error.rs +++ b/src/api/common/common_error.rs @@ -57,6 +57,35 @@ pub enum CommonError { InvalidBucketName(String), } +#[macro_export] +macro_rules! commonErrorDerivative { + ( $error_struct: ident ) => { + impl From<garage_util::error::Error> for $error_struct { + fn from(err: garage_util::error::Error) -> Self { + Self::Common(CommonError::InternalError(err)) + } + } + impl From<http::Error> for $error_struct { + fn from(err: http::Error) -> Self { + Self::Common(CommonError::Http(err)) + } + } + impl From<hyper::Error> for $error_struct { + fn from(err: hyper::Error) -> Self { + Self::Common(CommonError::Hyper(err)) + } + } + impl From<hyper::header::ToStrError> for $error_struct { + fn from(err: hyper::header::ToStrError) -> Self { + Self::Common(CommonError::InvalidHeader(err)) + } + } + impl CommonErrorDerivative for $error_struct {} + }; +} + +pub use commonErrorDerivative; + impl CommonError { pub fn http_status_code(&self) -> StatusCode { match self { @@ -118,14 +147,14 @@ impl TryFrom<HelperError> for CommonError { /// This is used for helper functions that might return InvalidBucketName /// or NoSuchBucket for instance, and we want to pass that error /// up to our caller. -pub(crate) fn pass_helper_error(err: HelperError) -> CommonError { +pub fn pass_helper_error(err: HelperError) -> CommonError { match CommonError::try_from(err) { Ok(e) => e, Err(e) => panic!("Helper error `{}` should hot have happenned here", e), } } -pub(crate) fn helper_error_as_internal(err: HelperError) -> CommonError { +pub fn helper_error_as_internal(err: HelperError) -> CommonError { match err { HelperError::Internal(e) => CommonError::InternalError(e), e => CommonError::InternalError(GarageError::Message(e.to_string())), diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs new file mode 100644 index 00000000..14369b56 --- /dev/null +++ b/src/api/common/cors.rs @@ -0,0 +1,170 @@ +use std::sync::Arc; + +use http::header::{ + ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, + ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, +}; +use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, StatusCode}; + +use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule}; +use garage_model::garage::Garage; + +use crate::common_error::{ + helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError, +}; +use crate::helpers::*; + +pub fn find_matching_cors_rule<'a>( + bucket_params: &'a BucketParams, + req: &Request<impl Body>, +) -> Result<Option<&'a GarageCorsRule>, CommonError> { + if let Some(cors_config) = bucket_params.cors_config.get() { + if let Some(origin) = req.headers().get("Origin") { + let origin = origin.to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + return Ok(cors_config.iter().find(|rule| { + cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) + })); + } + } + Ok(None) +} + +pub fn cors_rule_matches<'a, HI, S>( + rule: &GarageCorsRule, + origin: &'a str, + method: &'a str, + mut request_headers: HI, +) -> bool +where + HI: Iterator<Item = S>, + S: AsRef<str>, +{ + rule.allow_origins.iter().any(|x| x == "*" || x == origin) + && rule.allow_methods.iter().any(|x| x == "*" || x == method) + && request_headers.all(|h| { + rule.allow_headers + .iter() + .any(|x| x == "*" || x == h.as_ref()) + }) +} + +pub fn add_cors_headers( + resp: &mut Response<impl Body>, + rule: &GarageCorsRule, +) -> Result<(), http::header::InvalidHeaderValue> { + let h = resp.headers_mut(); + h.insert( + ACCESS_CONTROL_ALLOW_ORIGIN, + rule.allow_origins.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_METHODS, + rule.allow_methods.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_ALLOW_HEADERS, + rule.allow_headers.join(", ").parse()?, + ); + h.insert( + ACCESS_CONTROL_EXPOSE_HEADERS, + rule.expose_headers.join(", ").parse()?, + ); + Ok(()) +} + +pub async fn handle_options_api( + garage: Arc<Garage>, + req: &Request<IncomingBody>, + bucket_name: Option<String>, +) -> Result<Response<EmptyBody>, CommonError> { + // FIXME: CORS rules of buckets with local aliases are + // not taken into account. + + // If the bucket name is a global bucket name, + // we try to apply the CORS rules of that bucket. + // If a user has a local bucket name that has + // the same name, its CORS rules won't be applied + // and will be shadowed by the rules of the globally + // existing bucket (but this is inevitable because + // OPTIONS calls are not auhtenticated). + if let Some(bn) = bucket_name { + let helper = garage.bucket_helper(); + let bucket_id = helper + .resolve_global_bucket_name(&bn) + .await + .map_err(helper_error_as_internal)?; + if let Some(id) = bucket_id { + let bucket = garage + .bucket_helper() + .get_existing_bucket(id) + .await + .map_err(helper_error_as_internal)?; + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) + } else { + // If there is a bucket name in the request, but that name + // does not correspond to a global alias for a bucket, + // then it's either a non-existing bucket or a local bucket. + // We have no way of knowing, because the request is not + // authenticated and thus we can't resolve local aliases. + // We take the permissive approach of allowing everything, + // because we don't want to prevent web apps that use + // local bucket names from making API calls. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "*") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } + } else { + // If there is no bucket name in the request, + // we are doing a ListBuckets call, which we want to allow + // for all origins. + Ok(Response::builder() + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") + .status(StatusCode::OK) + .body(EmptyBody::new())?) + } +} + +pub fn handle_options_for_bucket( + req: &Request<IncomingBody>, + bucket_params: &BucketParams, +) -> Result<Response<EmptyBody>, CommonError> { + let origin = req + .headers() + .get("Origin") + .ok_or_bad_request("Missing Origin header")? + .to_str()?; + let request_method = req + .headers() + .get(ACCESS_CONTROL_REQUEST_METHOD) + .ok_or_bad_request("Missing Access-Control-Request-Method header")? + .to_str()?; + let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { + Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), + None => vec![], + }; + + if let Some(cors_config) = bucket_params.cors_config.get() { + let matching_rule = cors_config + .iter() + .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); + if let Some(rule) = matching_rule { + let mut resp = Response::builder() + .status(StatusCode::OK) + .body(EmptyBody::new())?; + add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; + return Ok(resp); + } + } + + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) +} diff --git a/src/api/encoding.rs b/src/api/common/encoding.rs index e286a784..e286a784 100644 --- a/src/api/encoding.rs +++ b/src/api/common/encoding.rs diff --git a/src/api/generic_server.rs b/src/api/common/generic_server.rs index 283abdd4..6ddc2ff2 100644 --- a/src/api/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; - use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; @@ -36,7 +34,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, ErrorBody}; -pub(crate) trait ApiEndpoint: Send + Sync + 'static { +pub trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); } @@ -47,8 +45,7 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } -#[async_trait] -pub(crate) trait ApiHandler: Send + Sync + 'static { +pub trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; @@ -56,14 +53,14 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Error: ApiError; fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; - async fn handle( + fn handle( &self, req: Request<IncomingBody>, endpoint: Self::Endpoint, - ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; + ) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send; } -pub(crate) struct ApiServer<A: ApiHandler> { +pub struct ApiServer<A: ApiHandler> { region: String, api_handler: A, @@ -248,13 +245,11 @@ impl<A: ApiHandler> ApiServer<A> { // ==== helper functions ==== -#[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; + fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send; } -#[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { @@ -266,7 +261,6 @@ impl Accept for TcpListener { pub struct UnixListenerOn(pub UnixListener, pub String); -#[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { diff --git a/src/api/helpers.rs b/src/api/common/helpers.rs index cf60005d..c8586de4 100644 --- a/src/api/helpers.rs +++ b/src/api/common/helpers.rs @@ -363,9 +363,9 @@ mod tests { } #[derive(Serialize)] -pub(crate) struct CustomApiErrorBody { - pub(crate) code: String, - pub(crate) message: String, - pub(crate) region: String, - pub(crate) path: String, +pub struct CustomApiErrorBody { + pub code: String, + pub message: String, + pub region: String, + pub path: String, } diff --git a/src/api/common/lib.rs b/src/api/common/lib.rs new file mode 100644 index 00000000..0e655a53 --- /dev/null +++ b/src/api/common/lib.rs @@ -0,0 +1,12 @@ +//! Crate for serving a S3 compatible API +#[macro_use] +extern crate tracing; + +pub mod common_error; + +pub mod cors; +pub mod encoding; +pub mod generic_server; +pub mod helpers; +pub mod router_macros; +pub mod signature; diff --git a/src/api/router_macros.rs b/src/api/common/router_macros.rs index 8f10a4f5..d9fe86db 100644 --- a/src/api/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -1,5 +1,6 @@ /// This macro is used to generate very repetitive match {} blocks in this module /// It is _not_ made to be used anywhere else +#[macro_export] macro_rules! router_match { (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } @@ -133,6 +134,7 @@ macro_rules! router_match { /// This macro is used to generate part of the code in this module. It must be called only one, and /// is useless outside of this module. +#[macro_export] macro_rules! generateQueryParameters { ( keywords: [ $($kw_param:expr => $kw_name: ident),* ], @@ -220,5 +222,5 @@ macro_rules! generateQueryParameters { } } -pub(crate) use generateQueryParameters; -pub(crate) use router_match; +pub use generateQueryParameters; +pub use router_match; diff --git a/src/api/signature/error.rs b/src/api/common/signature/error.rs index 2d92a072..2d92a072 100644 --- a/src/api/signature/error.rs +++ b/src/api/common/signature/error.rs diff --git a/src/api/signature/mod.rs b/src/api/common/signature/mod.rs index 6514da43..6514da43 100644 --- a/src/api/signature/mod.rs +++ b/src/api/common/signature/mod.rs diff --git a/src/api/signature/payload.rs b/src/api/common/signature/payload.rs index 9e5a6043..81541e4a 100644 --- a/src/api/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -518,7 +518,7 @@ impl Authorization { }) } - pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> { + pub fn parse_form(params: &HeaderMap) -> Result<Self, Error> { let algorithm = params .get(X_AMZ_ALGORITHM) .ok_or_bad_request("Missing X-Amz-Algorithm header")? diff --git a/src/api/signature/streaming.rs b/src/api/common/signature/streaming.rs index e223d1b1..e223d1b1 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml new file mode 100644 index 00000000..e3ebedca --- /dev/null +++ b/src/api/k2v/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "garage_api_k2v" +version = "1.0.1" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "K2V API server crate for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_model = { workspace = true, features = [ "k2v" ] } +garage_table.workspace = true +garage_util = { workspace = true, features = [ "k2v" ] } +garage_api_common.workspace = true + +base64.workspace = true +err-derive.workspace = true +tracing.workspace = true + +futures.workspace = true +tokio.workspace = true +http.workspace = true +http-body-util.workspace = true +hyper = { workspace = true, default-features = false, features = ["server", "http1"] } +percent-encoding.workspace = true +url.workspace = true + +serde.workspace = true +serde_json.workspace = true + +opentelemetry.workspace = true diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index de6e5f06..eb276f5b 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use tokio::sync::watch; @@ -12,26 +10,25 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; -use crate::generic_server::*; -use crate::k2v::error::*; - -use crate::signature::verify_request; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; -use crate::helpers::*; -use crate::k2v::batch::*; -use crate::k2v::index::*; -use crate::k2v::item::*; -use crate::k2v::router::Endpoint; -use crate::s3::cors::*; +use crate::batch::*; +use crate::error::*; +use crate::index::*; +use crate::item::*; +use crate::router::Endpoint; -pub use crate::signature::streaming::ReqBody; +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct K2VApiServer { garage: Arc<Garage>, } -pub(crate) struct K2VApiEndpoint { +pub struct K2VApiEndpoint { bucket_name: String, endpoint: Endpoint, } @@ -49,7 +46,6 @@ impl K2VApiServer { } } -#[async_trait] impl ApiHandler for K2VApiServer { const API_NAME: &'static str = "k2v"; const API_NAME_DISPLAY: &'static str = "K2V"; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index e4d0b0e5..c284dbd4 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -6,11 +6,12 @@ use garage_table::{EnumerationOrder, TableSchema}; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; -use crate::k2v::item::parse_causality_token; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::item::parse_causality_token; +use crate::range::read_range; pub async fn handle_insert_batch( ctx: ReqCtx, diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index dbe4be2c..3cd0e6f7 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -2,19 +2,21 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use crate::common_error::CommonError; -pub(crate) use crate::common_error::{helper_error_as_internal, pass_helper_error}; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::signature::error::Error as SignatureError; +use garage_api_common::common_error::{commonErrorDerivative, CommonError}; +pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error}; +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed @@ -42,16 +44,7 @@ pub enum Error { InvalidUtf8Str(#[error(source)] std::str::Utf8Error), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) - } -} - -impl CommonErrorDerivative for Error {} +commonErrorDerivative!(Error); impl From<SignatureError> for Error { fn from(err: SignatureError) -> Self { diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e3397238..fbfaad98 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,10 +5,11 @@ use garage_table::util::*; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; -use crate::helpers::*; -use crate::k2v::api_server::ResBody; -use crate::k2v::error::*; -use crate::k2v::range::read_range; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::error::*; +use crate::range::read_range; pub async fn handle_read_index( ctx: ReqCtx, diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 87371727..4e28b499 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -6,9 +6,10 @@ use hyper::{Request, Response, StatusCode}; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; -use crate::helpers::*; -use crate::k2v::api_server::{ReqBody, ResBody}; -use crate::k2v::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; diff --git a/src/api/k2v/mod.rs b/src/api/k2v/lib.rs index b6a8c5cf..334ae46b 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; mod error; mod router; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index bb9d3be5..eb4738db 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -7,8 +7,9 @@ use std::sync::Arc; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::helpers::key_after_prefix; -use crate::k2v::error::*; +use garage_api_common::helpers::key_after_prefix; + +use crate::error::*; /// Read range in a Garage table. /// Returns (entries, more?, nextStart) diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index 1cc58be5..a04b0f81 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -1,11 +1,11 @@ -use crate::k2v::error::*; +use crate::error::*; use std::borrow::Cow; use hyper::{Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; router_match! {@func diff --git a/src/api/lib.rs b/src/api/lib.rs deleted file mode 100644 index 370dfd7a..00000000 --- a/src/api/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Crate for serving a S3 compatible API -#[macro_use] -extern crate tracing; - -pub mod common_error; - -mod encoding; -pub mod generic_server; -pub mod helpers; -mod router_macros; -/// This mode is public only to help testing. Don't expect stability here -pub mod signature; - -pub mod admin; -#[cfg(feature = "k2v")] -pub mod k2v; -pub mod s3; diff --git a/src/api/Cargo.toml b/src/api/s3/Cargo.toml index 85b78a5b..387e45db 100644 --- a/src/api/Cargo.toml +++ b/src/api/s3/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "garage_api" +name = "garage_api_s3" version = "1.0.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" @@ -20,30 +20,24 @@ garage_block.workspace = true garage_net.workspace = true garage_util.workspace = true garage_rpc.workspace = true +garage_api_common.workspace = true aes-gcm.workspace = true -argon2.workspace = true async-compression.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true crc32fast.workspace = true crc32c.workspace = true -crypto-common.workspace = true err-derive.workspace = true hex.workspace = true -hmac.workspace = true -idna.workspace = true tracing.workspace = true md-5.workspace = true -nom.workspace = true pin-project.workspace = true sha1.workspace = true sha2.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true @@ -54,21 +48,13 @@ httpdate.workspace = true http-range.workspace = true http-body-util.workspace = true hyper = { workspace = true, default-features = false, features = ["server", "http1"] } -hyper-util.workspace = true multer.workspace = true percent-encoding.workspace = true roxmltree.workspace = true url.workspace = true serde.workspace = true -serde_bytes.workspace = true serde_json.workspace = true quick-xml.workspace = true opentelemetry.workspace = true -opentelemetry-prometheus = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } - -[features] -k2v = [ "garage_util/k2v", "garage_model/k2v" ] -metrics = [ "opentelemetry-prometheus", "prometheus" ] diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index f9dafa10..14fd03c3 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::sync::watch; @@ -14,33 +12,33 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; use garage_model::key_table::Key; -use crate::generic_server::*; -use crate::s3::error::*; - -use crate::signature::verify_request; - -use crate::helpers::*; -use crate::s3::bucket::*; -use crate::s3::copy::*; -use crate::s3::cors::*; -use crate::s3::delete::*; -use crate::s3::get::*; -use crate::s3::lifecycle::*; -use crate::s3::list::*; -use crate::s3::multipart::*; -use crate::s3::post_object::handle_post_object; -use crate::s3::put::*; -use crate::s3::router::Endpoint; -use crate::s3::website::*; - -pub use crate::signature::streaming::ReqBody; +use garage_api_common::cors::*; +use garage_api_common::generic_server::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_request; + +use crate::bucket::*; +use crate::copy::*; +use crate::cors::*; +use crate::delete::*; +use crate::error::*; +use crate::get::*; +use crate::lifecycle::*; +use crate::list::*; +use crate::multipart::*; +use crate::post_object::handle_post_object; +use crate::put::*; +use crate::router::Endpoint; +use crate::website::*; + +pub use garage_api_common::signature::streaming::ReqBody; pub type ResBody = BoxBody<Error>; pub struct S3ApiServer { garage: Arc<Garage>, } -pub(crate) struct S3ApiEndpoint { +pub struct S3ApiEndpoint { bucket_name: Option<String>, endpoint: Endpoint, } @@ -70,7 +68,6 @@ impl S3ApiServer { } } -#[async_trait] impl ApiHandler for S3ApiServer { const API_NAME: &'static str = "s3"; const API_NAME_DISPLAY: &'static str = "S3"; @@ -320,7 +317,6 @@ impl ApiHandler for S3ApiServer { } => { let query = ListPartsQuery { bucket_name: ctx.bucket_name.clone(), - bucket_id, key, upload_id, part_number_marker: part_number_marker.map(|p| p.min(10000)), diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 6a12aa9c..0a192ba6 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -13,12 +13,13 @@ use garage_util::crdt::*; use garage_util::data::*; use garage_util::time::*; -use crate::common_error::CommonError; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::common_error::CommonError; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml as s3_xml; pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = ctx; diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs index c7527163..02fb55ec 100644 --- a/src/api/s3/checksum.rs +++ b/src/api/s3/checksum.rs @@ -15,7 +15,7 @@ use garage_util::error::OkOrMessage; use garage_model::s3::object_table::*; -use crate::s3::error::*; +use crate::error::*; pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index b67ace88..07d50ea5 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -20,15 +20,16 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::get::full_object_byte_stream; -use crate::s3::multipart; -use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; -use crate::s3::xml::{self as s3_xml, xmlns_tag}; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::get::full_object_byte_stream; +use crate::multipart; +use crate::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult}; +use crate::xml::{self as s3_xml, xmlns_tag}; // -------- CopyObject --------- @@ -862,7 +863,7 @@ pub struct CopyPartResult { #[cfg(test)] mod tests { use super::*; - use crate::s3::xml::to_xml_with_header; + use crate::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 32dcc0d5..625b84db 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -1,31 +1,21 @@ -use std::sync::Arc; - use quick_xml::de::from_reader; -use http::header::{ - ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, - ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, -}; -use hyper::{ - body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, - StatusCode, -}; +use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; -use crate::common_error::{helper_error_as_internal, CommonError}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; - -use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; -use garage_model::garage::Garage; +use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_util::data::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; + pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; if let Some(cors) = bucket_params.cors_config.get() { @@ -100,161 +90,6 @@ pub async fn handle_put_cors( .body(empty_body())?) } -pub async fn handle_options_api( - garage: Arc<Garage>, - req: &Request<IncomingBody>, - bucket_name: Option<String>, -) -> Result<Response<EmptyBody>, CommonError> { - // FIXME: CORS rules of buckets with local aliases are - // not taken into account. - - // If the bucket name is a global bucket name, - // we try to apply the CORS rules of that bucket. - // If a user has a local bucket name that has - // the same name, its CORS rules won't be applied - // and will be shadowed by the rules of the globally - // existing bucket (but this is inevitable because - // OPTIONS calls are not auhtenticated). - if let Some(bn) = bucket_name { - let helper = garage.bucket_helper(); - let bucket_id = helper - .resolve_global_bucket_name(&bn) - .await - .map_err(helper_error_as_internal)?; - if let Some(id) = bucket_id { - let bucket = garage - .bucket_helper() - .get_existing_bucket(id) - .await - .map_err(helper_error_as_internal)?; - let bucket_params = bucket.state.into_option().unwrap(); - handle_options_for_bucket(req, &bucket_params) - } else { - // If there is a bucket name in the request, but that name - // does not correspond to a global alias for a bucket, - // then it's either a non-existing bucket or a local bucket. - // We have no way of knowing, because the request is not - // authenticated and thus we can't resolve local aliases. - // We take the permissive approach of allowing everything, - // because we don't want to prevent web apps that use - // local bucket names from making API calls. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "*") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } - } else { - // If there is no bucket name in the request, - // we are doing a ListBuckets call, which we want to allow - // for all origins. - Ok(Response::builder() - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") - .status(StatusCode::OK) - .body(EmptyBody::new())?) - } -} - -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, - bucket_params: &BucketParams, -) -> Result<Response<EmptyBody>, CommonError> { - let origin = req - .headers() - .get("Origin") - .ok_or_bad_request("Missing Origin header")? - .to_str()?; - let request_method = req - .headers() - .get(ACCESS_CONTROL_REQUEST_METHOD) - .ok_or_bad_request("Missing Access-Control-Request-Method header")? - .to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - - if let Some(cors_config) = bucket_params.cors_config.get() { - let matching_rule = cors_config - .iter() - .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); - if let Some(rule) = matching_rule { - let mut resp = Response::builder() - .status(StatusCode::OK) - .body(EmptyBody::new())?; - add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; - return Ok(resp); - } - } - - Err(CommonError::Forbidden( - "This CORS request is not allowed.".into(), - )) -} - -pub fn find_matching_cors_rule<'a>( - bucket_params: &'a BucketParams, - req: &Request<impl Body>, -) -> Result<Option<&'a GarageCorsRule>, Error> { - if let Some(cors_config) = bucket_params.cors_config.get() { - if let Some(origin) = req.headers().get("Origin") { - let origin = origin.to_str()?; - let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { - Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(), - None => vec![], - }; - return Ok(cors_config.iter().find(|rule| { - cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) - })); - } - } - Ok(None) -} - -fn cors_rule_matches<'a, HI, S>( - rule: &GarageCorsRule, - origin: &'a str, - method: &'a str, - mut request_headers: HI, -) -> bool -where - HI: Iterator<Item = S>, - S: AsRef<str>, -{ - rule.allow_origins.iter().any(|x| x == "*" || x == origin) - && rule.allow_methods.iter().any(|x| x == "*" || x == method) - && request_headers.all(|h| { - rule.allow_headers - .iter() - .any(|x| x == "*" || x == h.as_ref()) - }) -} - -pub fn add_cors_headers( - resp: &mut Response<impl Body>, - rule: &GarageCorsRule, -) -> Result<(), http::header::InvalidHeaderValue> { - let h = resp.headers_mut(); - h.insert( - ACCESS_CONTROL_ALLOW_ORIGIN, - rule.allow_origins.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_METHODS, - rule.allow_methods.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_ALLOW_HEADERS, - rule.allow_headers.join(", ").parse()?, - ); - h.insert( - ACCESS_CONTROL_EXPOSE_HEADERS, - rule.expose_headers.join(", ").parse()?, - ); - Ok(()) -} - // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 57f6f948..b799e67a 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -5,12 +5,13 @@ use garage_util::data::*; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::put::next_timestamp; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::put::next_timestamp; +use crate::xml as s3_xml; async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { let ReqCtx { diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 2e6ed65c..b38d7792 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -28,9 +28,10 @@ use garage_util::migrate::Migrate; use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; -use crate::common_error::*; -use crate::s3::checksum::Md5Checksum; -use crate::s3::error::Error; +use garage_api_common::common_error::*; + +use crate::checksum::Md5Checksum; +use crate::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm"); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 22d2fe14..1bb8909c 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -6,20 +6,28 @@ use hyper::{HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; -pub(crate) use crate::common_error::pass_helper_error; -use crate::common_error::{helper_error_as_internal, CommonError}; -pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; -use crate::generic_server::ApiError; -use crate::helpers::*; -use crate::s3::xml as s3_xml; -use crate::signature::error::Error as SignatureError; +pub(crate) use garage_api_common::common_error::pass_helper_error; + +use garage_api_common::common_error::{ + commonErrorDerivative, helper_error_as_internal, CommonError, +}; + +pub use garage_api_common::common_error::{ + CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, +}; + +use garage_api_common::generic_server::ApiError; +use garage_api_common::helpers::*; +use garage_api_common::signature::error::Error as SignatureError; + +use crate::xml as s3_xml; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { #[error(display = "{}", _0)] /// Error from common error - Common(CommonError), + Common(#[error(source)] CommonError), // Category: cannot process /// Authorization Header Malformed @@ -81,14 +89,7 @@ pub enum Error { NotImplemented(String), } -impl<T> From<T> for Error -where - CommonError: From<T>, -{ - fn from(err: T) -> Self { - Error::Common(CommonError::from(err)) - } -} +commonErrorDerivative!(Error); // Helper errors are always passed as internal errors by default. // To pass the specific error code back to the client, use `pass_helper_error`. @@ -98,8 +99,6 @@ impl From<HelperError> for Error { } } -impl CommonErrorDerivative for Error {} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index f61aae11..c2393a51 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -25,11 +25,12 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::ResBody; +use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; +use crate::encryption::EncryptionParams; +use crate::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; diff --git a/src/api/s3/mod.rs b/src/api/s3/lib.rs index b9bb1a6f..fd99b443 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod api_server; pub mod error; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 7eb1c2cb..c35047ed 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -5,11 +5,12 @@ use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 68d6cbe6..94c2c895 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -13,13 +13,14 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; -use crate::encoding::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::multipart as s3_multipart; -use crate::s3::xml as s3_xml; +use garage_api_common::encoding::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::multipart as s3_multipart; +use crate::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; const DUMMY_KEY: &str = "GKDummyKey"; @@ -53,7 +54,6 @@ pub struct ListMultipartUploadsQuery { #[derive(Debug)] pub struct ListPartsQuery { pub bucket_name: String, - pub bucket_id: Uuid, pub key: String, pub upload_id: String, pub part_number_marker: Option<u64>, @@ -1244,10 +1244,8 @@ mod tests { #[test] fn test_fetch_part_info() -> Result<(), Error> { - let uuid = Uuid::from([0x08; 32]); let mut query = ListPartsQuery { bucket_name: "a".to_string(), - bucket_id: uuid, key: "a".to_string(), upload_id: "xx".to_string(), part_number_marker: None, diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 3db3e8aa..fa053df2 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -15,14 +15,15 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::*; +use crate::xml as s3_xml; // ---- @@ -429,7 +430,16 @@ pub async fn handle_complete_multipart_upload( // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { xmlns: (), - location: None, + // FIXME: the location returned is not always correct: + // - we always return https, but maybe some people do http + // - if root_domain is not specified, a full URL is not returned + location: garage + .config + .s3_api + .root_domain + .as_ref() + .map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key))) + .or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))), bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 5279ec6a..6c0e73d4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -16,15 +16,16 @@ use serde::Deserialize; use garage_model::garage::Garage; use garage_model::s3::object_table::*; -use crate::helpers::*; -use crate::s3::api_server::ResBody; -use crate::s3::checksum::*; -use crate::s3::cors::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::{get_headers, save_stream, ChecksumMode}; -use crate::s3::xml as s3_xml; -use crate::signature::payload::{verify_v4, Authorization}; +use garage_api_common::cors::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::payload::{verify_v4, Authorization}; + +use crate::api_server::ResBody; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::{get_headers, save_stream, ChecksumMode}; +use crate::xml as s3_xml; pub async fn handle_post_object( garage: Arc<Garage>, diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index bfb0dc9b..530b4e7b 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -30,11 +30,12 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; +use garage_api_common::helpers::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::checksum::*; +use crate::encryption::EncryptionParams; +use crate::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index e7ac1d77..e3f58490 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -3,9 +3,10 @@ use std::borrow::Cow; use hyper::header::HeaderValue; use hyper::{HeaderMap, Method, Request}; -use crate::helpers::Authorization; -use crate::router_macros::{generateQueryParameters, router_match}; -use crate::s3::error::*; +use garage_api_common::helpers::Authorization; +use garage_api_common::router_macros::{generateQueryParameters, router_match}; + +use crate::error::*; router_match! {@func @@ -351,6 +352,18 @@ impl Endpoint { _ => return Err(Error::bad_request("Unknown method")), }; + if let Some(x_id) = query.x_id.take() { + if x_id != res.name() { + // I think AWS ignores the x-id parameter. + // Let's make this at least be a warnin to help debugging. + warn!( + "x-id ({}) does not match parsed endpoint ({})", + x_id, + res.name() + ); + } + } + if let Some(message) = query.nonempty_message() { debug!("Unused query parameter: {}", message) } @@ -695,7 +708,8 @@ generateQueryParameters! { "uploadId" => upload_id, "upload-id-marker" => upload_id_marker, "versionId" => version_id, - "version-id-marker" => version_id_marker + "version-id-marker" => version_id_marker, + "x-id" => x_id ] } diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index fa36bc32..b55bb345 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -4,15 +4,16 @@ use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::error::*; -use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; -use crate::signature::verify_signed_content; - use garage_model::bucket_table::*; use garage_util::data::*; +use garage_api_common::helpers::*; +use garage_api_common::signature::verify_signed_content; + +use crate::api_server::{ReqBody, ResBody}; +use crate::error::*; +use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; + pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; if let Some(website) = bucket_params.website_config.get() { diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs index 1e569ade..e8af3ec0 100644 --- a/src/api/s3/xml.rs +++ b/src/api/s3/xml.rs @@ -1,7 +1,7 @@ use quick_xml::se::to_string; use serde::{Deserialize, Serialize, Serializer}; -use crate::s3::error::Error as ApiError; +use crate::error::Error as ApiError; pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> { let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string(); diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1af4d7f5..3358a3e7 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -34,10 +34,8 @@ async-compression.workspace = true zstd.workspace = true serde.workspace = true -serde_bytes.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/src/block/manager.rs b/src/block/manager.rs index 40b177a2..41b2f02a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::{ArcSwap, ArcSwapOption}; -use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; @@ -371,7 +370,7 @@ impl BlockManager { prevent_compression: bool, order_tag: Option<OrderTag>, ) -> Result<(), Error> { - let who = self.replication.write_sets(&hash); + let who = self.system.cluster_layout().current_storage_nodes_of(&hash); let compression_level = self.compression_level.filter(|_| !prevent_compression); let (header, bytes) = DataBlock::from_buffer(data, compression_level) @@ -397,7 +396,7 @@ impl BlockManager { .rpc_helper() .try_write_many_sets( &self.endpoint, - who.as_ref(), + &[who], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_drop_on_completion(permit) @@ -669,10 +668,12 @@ impl BlockManager { hash: &Hash, wrong_path: DataBlockPath, ) -> Result<usize, Error> { + let data = self.read_block_from(hash, &wrong_path).await?; self.lock_mutate(hash) .await - .fix_block_location(hash, wrong_path, self) - .await + .write_block_inner(hash, &data, self, Some(wrong_path)) + .await?; + Ok(data.as_parts_ref().1.len()) } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { @@ -688,7 +689,6 @@ impl BlockManager { } } -#[async_trait] impl StreamingEndpointHandler<BlockRpc> for BlockManager { async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> { match message.msg() { @@ -829,18 +829,6 @@ impl BlockManagerLocked { } Ok(()) } - - async fn fix_block_location( - &self, - hash: &Hash, - wrong_path: DataBlockPath, - mgr: &BlockManager, - ) -> Result<usize, Error> { - let data = mgr.read_block_from(hash, &wrong_path).await?; - self.write_block_inner(hash, &data, mgr, Some(wrong_path)) - .await?; - Ok(data.as_parts_ref().1.len()) - } } struct DeleteOnDrop(Option<PathBuf>); diff --git a/src/block/resync.rs b/src/block/resync.rs index 947c68de..b476a0b8 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -377,7 +377,10 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.storage_nodes(hash); + let mut who = manager + .system + .cluster_layout() + .current_storage_nodes_of(hash); if who.len() < manager.replication.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } @@ -455,6 +458,25 @@ impl BlockResyncManager { } if rc.is_nonzero() && !exists { + // The refcount is > 0, and the block is not present locally. + // We might need to fetch it from another node. + + // First, check whether we are still supposed to store that + // block in the latest cluster layout version. + let storage_nodes = manager + .system + .cluster_layout() + .current_storage_nodes_of(&hash); + + if !storage_nodes.contains(&manager.system.id) { + info!( + "Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.", + hash + ); + return Ok(()); + } + + // We know we need the block. Fetch it. info!( "Resync block {:?}: fetching absent but needed block (refcount > 0)", hash diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 0a278bc0..3ef51fae 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -13,7 +13,6 @@ path = "lib.rs" [dependencies] err-derive.workspace = true -hexdump.workspace = true tracing.workspace = true heed = { workspace = true, optional = true } diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 483e33c0..c036f000 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -23,7 +23,9 @@ path = "tests/lib.rs" [dependencies] format_table.workspace = true garage_db.workspace = true -garage_api.workspace = true +garage_api_admin.workspace = true +garage_api_s3.workspace = true +garage_api_k2v = { workspace = true, optional = true } garage_block.workspace = true garage_model.workspace = true garage_net.workspace = true @@ -40,7 +42,6 @@ parse_duration.workspace = true hex.workspace = true tracing.workspace = true tracing-subscriber.workspace = true -rand.workspace = true async-trait.workspace = true sha1.workspace = true sodiumoxide.workspace = true @@ -48,21 +49,18 @@ structopt.workspace = true git-version.workspace = true serde.workspace = true -serde_bytes.workspace = true -toml.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true opentelemetry.workspace = true opentelemetry-prometheus = { workspace = true, optional = true } opentelemetry-otlp = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } syslog-tracing = { workspace = true, optional = true } [dev-dependencies] -aws-config.workspace = true +garage_api_common.workspace = true + aws-sdk-s3.workspace = true chrono.workspace = true http.workspace = true @@ -84,7 +82,7 @@ k2v-client.workspace = true [features] default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ] -k2v = [ "garage_util/k2v", "garage_api/k2v" ] +k2v = [ "garage_util/k2v", "garage_api_k2v" ] # Database engines lmdb = [ "garage_model/lmdb" ] @@ -95,7 +93,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). -metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ] +metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ] # Exporter for the OpenTelemetry Collector. telemetry-otlp = [ "opentelemetry-otlp" ] # Logging to syslog diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e2468143..3bbc2b86 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -4,9 +4,11 @@ mod key; use std::collections::HashMap; use std::fmt::Write; +use std::future::Future; use std::sync::Arc; -use async_trait::async_trait; +use futures::future::FutureExt; + use serde::{Deserialize, Serialize}; use format_table::format_table_to_string; @@ -144,7 +146,12 @@ impl AdminRpcHandler { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { if opt.all_nodes { let mut ret = String::new(); - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in self.garage.system.get_known_nodes().iter() { + if node.is_up && !all_nodes.contains(&node.id) { + all_nodes.push(node.id); + } + } for node in all_nodes.iter() { let mut opt = opt.clone(); @@ -482,7 +489,7 @@ impl AdminRpcHandler { AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), PRIO_NORMAL, ) - .await + .await? })) .await; @@ -495,7 +502,11 @@ impl AdminRpcHandler { ret.push(format!("{:?}\t{}", to, res_str)); } - Ok(AdminRpc::Ok(format_table_to_string(ret))) + if resps.iter().any(Result::is_err) { + Err(GarageError::Message(format_table_to_string(ret)).into()) + } else { + Ok(AdminRpc::Ok(format_table_to_string(ret))) + } } MetaOperation::Snapshot { all: false } => { garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; @@ -505,22 +516,25 @@ impl AdminRpcHandler { } } -#[async_trait] impl EndpointHandler<AdminRpc> for AdminRpcHandler { - async fn handle( + fn handle( self: &Arc<Self>, message: &AdminRpc, _from: NodeID, - ) -> Result<AdminRpc, Error> { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), + ) -> impl Future<Output = Result<AdminRpc, Error>> + Send { + let self2 = self.clone(); + async move { + match message { + AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await, + AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await, + AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await, + AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await, + m => Err(GarageError::unexpected_rpc_message(m).into()), + } } + .boxed() } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2c5227d2..47883f97 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -93,17 +94,16 @@ pub async fn launch_online_repair( // ---- -#[async_trait] trait TableRepair: Send + Sync + 'static { type T: TableSchema; fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; - async fn process( + fn process( &mut self, garage: &Garage, entry: <<Self as TableRepair>::T as TableSchema>::E, - ) -> Result<bool, Error>; + ) -> impl Future<Output = Result<bool, Error>> + Send; } struct TableRepairWorker<T: TableRepair> { @@ -174,7 +174,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> { struct RepairVersions; -#[async_trait] impl TableRepair for RepairVersions { type T = VersionTable; @@ -221,7 +220,6 @@ impl TableRepair for RepairVersions { struct RepairBlockRefs; -#[async_trait] impl TableRepair for RepairBlockRefs { type T = BlockRefTable; @@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs { struct RepairMpu; -#[async_trait] impl TableRepair for RepairMpu { type T = MultipartUploadTable; diff --git a/src/garage/server.rs b/src/garage/server.rs index 65bf34db..1dc86fd3 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,13 +6,13 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_api::admin::api_server::AdminApiServer; -use garage_api::s3::api_server::S3ApiServer; +use garage_api_admin::api_server::AdminApiServer; +use garage_api_s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::WebServer; #[cfg(feature = "k2v")] -use garage_api::k2v::api_server::K2VApiServer; +use garage_api_k2v::api_server::K2VApiServer; use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; @@ -113,7 +113,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er if let Some(web_config) = &config.s3_web { info!("Initializing web server..."); - let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone()); + let web_server = WebServer::new(garage.clone(), &web_config); servers.push(( "Web", tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())), diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 42368976..2db72e9f 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client}; use hyper_util::rt::TokioExecutor; use super::garage::{Instance, Key}; -use garage_api::signature; +use garage_api_common::signature; pub type Body = FullBody<hyper::body::Bytes>; diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index db23d316..da6c624b 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -13,7 +13,6 @@ static GARAGE_TEST_SECRET: &str = #[derive(Debug, Default, Clone)] pub struct Key { - pub name: Option<String>, pub id: String, pub secret: String, } @@ -213,10 +212,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" assert!(!key.id.is_empty(), "Invalid key: Key ID is empty"); assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty"); - Key { - name: maybe_name.map(String::from), - ..key - } + key } } diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 694be1f8..bbd09b19 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -29,12 +29,11 @@ tokio.workspace = true # cli deps clap = { workspace = true, optional = true } format_table = { workspace = true, optional = true } -tracing = { workspace = true, optional = true } tracing-subscriber = { workspace = true, optional = true } [features] -cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"] +cli = ["clap", "tokio/fs", "tokio/io-std", "tracing-subscriber", "format_table"] [lib] path = "lib.rs" diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 12931a4c..b58ad43b 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -22,7 +22,6 @@ garage_util.workspace = true garage_net.workspace = true async-trait.workspace = true -arc-swap.workspace = true blake2.workspace = true chrono.workspace = true err-derive.workspace = true @@ -38,9 +37,7 @@ serde.workspace = true serde_bytes.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true -opentelemetry.workspace = true [features] default = [ "lmdb", "sqlite" ] diff --git a/src/model/garage.rs b/src/model/garage.rs index 29e0bddd..11c0d90f 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -329,7 +329,7 @@ impl Garage { pub async fn locked_helper(&self) -> helper::locked::LockedHelper { let lock = self.bucket_lock.lock().await; - helper::locked::LockedHelper(self, lock) + helper::locked::LockedHelper(self, Some(lock)) } } diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index 43f4f363..482e91b0 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -27,9 +27,16 @@ use crate::permission::BucketKeyPerm; /// See issues: #649, #723 pub struct LockedHelper<'a>( pub(crate) &'a Garage, - pub(crate) tokio::sync::MutexGuard<'a, ()>, + pub(crate) Option<tokio::sync::MutexGuard<'a, ()>>, ); +impl<'a> Drop for LockedHelper<'a> { + fn drop(&mut self) { + // make it explicit that the mutexguard lives until here + drop(self.1.take()) + } +} + #[allow(clippy::ptr_arg)] impl<'a> LockedHelper<'a> { pub fn bucket(&self) -> BucketHelper<'a> { diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a1bf6ee0..821f4549 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -10,7 +10,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; -use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -537,7 +536,6 @@ impl K2VRpcHandler { } } -#[async_trait] impl EndpointHandler<K2VRpc> for K2VRpcHandler { async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> { match message { diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 38212a1c..bb10ba48 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -395,13 +395,13 @@ fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 { .expect("bad local midnight") .timestamp_millis() as u64; } - midnight.timestamp_millis() as u64 + midnight.and_utc().timestamp_millis() as u64 } fn next_date(ts: u64) -> NaiveDate { - NaiveDateTime::from_timestamp_millis(ts as i64) + DateTime::<Utc>::from_timestamp_millis(ts as i64) .expect("bad timestamp") - .date() + .date_naive() .succ_opt() .expect("no next day") } diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 686aaaea..c0b47a6e 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -30,7 +30,6 @@ rand.workspace = true log.workspace = true arc-swap.workspace = true -async-trait.workspace = true err-derive.workspace = true bytes.workspace = true cfg-if.workspace = true diff --git a/src/net/client.rs b/src/net/client.rs index 607dd173..20e1dacd 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; @@ -220,7 +219,6 @@ impl ClientConn { impl SendLoop for ClientConn {} -#[async_trait] impl RecvLoop for ClientConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 3cafafeb..d46acc42 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; -use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; use crate::error::Error; use crate::message::*; @@ -14,19 +15,17 @@ use crate::netapp::*; /// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` -#[async_trait] pub trait StreamingEndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; + fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send; } /// If one simply wants to use an endpoint in a client fashion, /// without locally serving requests to that endpoint, /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. -#[async_trait] impl<M: Message> EndpointHandler<M> for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); @@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () { /// This trait should be implemented by an object of your application /// that can handle a message of type `M`, in the cases where it doesn't /// care about attached stream in the request nor in the response. -#[async_trait] pub trait EndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response; + fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send; } -#[async_trait] impl<T, M> StreamingEndpointHandler<M> for T where T: EndpointHandler<M>, @@ -161,9 +158,8 @@ where pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; -#[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -174,21 +170,23 @@ where M: Message, H: StreamingEndpointHandler<M>; -#[async_trait] impl<M, H> GenericEndpoint for EndpointArc<M, H> where M: Message, H: StreamingEndpointHandler<M> + 'static, { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { - match self.0.handler.load_full() { - None => Err(Error::NoHandler), - Some(h) => { - let req = Req::from_enc(req_enc)?; - let res = h.handle(req, from).await; - Ok(res.into_enc()?) + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> { + async move { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } } } + .boxed() } fn drop_handler(&self) { diff --git a/src/net/netapp.rs b/src/net/netapp.rs index 77e55774..36c6fc88 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; @@ -457,7 +456,6 @@ impl NetApp { } } -#[async_trait] impl EndpointHandler<HelloMessage> for NetApp { async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); diff --git a/src/net/peering.rs b/src/net/peering.rs index a8d271ec..08378a08 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; -use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -592,7 +591,6 @@ impl PeeringManager { } } -#[async_trait] impl EndpointHandler<PingMessage> for PeeringManager { async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { @@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager { } } -#[async_trait] impl EndpointHandler<PeerListMessage> for PeeringManager { async fn handle( self: &Arc<Self>, diff --git a/src/net/recv.rs b/src/net/recv.rs index 0de7bef2..35a6d71a 100644 --- a/src/net/recv.rs +++ b/src/net/recv.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use bytes::Bytes; use log::*; @@ -50,7 +49,6 @@ impl Drop for Sender { /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. -#[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); fn cancel_handler(self: &Arc<Self>, _id: RequestID) {} diff --git a/src/net/send.rs b/src/net/send.rs index 1454eeb7..6f1ac02c 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; @@ -273,7 +272,6 @@ impl DataFrame { /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. -#[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, diff --git a/src/net/server.rs b/src/net/server.rs index 36dccb2f..fb6c6366 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; @@ -174,7 +173,6 @@ impl ServerConn { impl SendLoop for ServerConn {} -#[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index acde0911..fcc1c304 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,12 +15,10 @@ path = "lib.rs" [dependencies] format_table.workspace = true -garage_db.workspace = true garage_util.workspace = true garage_net.workspace = true arc-swap.workspace = true -bytes.workspace = true bytesize.workspace = true gethostname.workspace = true hex.workspace = true @@ -46,9 +44,7 @@ reqwest = { workspace = true, optional = true } pnet_datalink.workspace = true futures.workspace = true -futures-util.workspace = true tokio.workspace = true -tokio-stream.workspace = true opentelemetry.workspace = true [features] diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 44c826f9..c08a5629 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -219,6 +219,11 @@ impl LayoutHelper { ret } + pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> { + let ver = self.current(); + ver.nodes_of(position, ver.replication_factor).collect() + } + pub fn trackers_hash(&self) -> Hash { self.trackers_hash } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index b8ca8120..2505c2ce 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -540,19 +540,73 @@ impl RpcHelper { // ---- functions not related to MAKING RPCs, but just determining to what nodes // they should be made and in which order ---- + /// Determine to what nodes, and in what order, requests to read a data block + /// should be sent. All nodes in the Vec returned by this function are tried + /// one by one until there is one that returns the block (in block/manager.rs). + /// + /// We want to have the best chance of finding the block in as few requests + /// as possible, and we want to avoid nodes that answer slowly. + /// + /// Note that when there are several active layout versions, the block might + /// be stored only by nodes of the latest version (in case of a block that was + /// written after the layout change), or only by nodes of the oldest active + /// version (for all blocks that were written before). So we have to try nodes + /// of all layout versions. We also want to try nodes of all layout versions + /// fast, so as to optimize the chance of finding the block fast. + /// + /// Therefore, the strategy is the following: + /// + /// 1. ask first all nodes of all currently active layout versions + /// -> ask the preferred node in all layout versions (older to newer), + /// then the second preferred onde in all verions, etc. + /// -> we start by the oldest active layout version first, because a majority + /// of blocks will have been saved before the layout change + /// 2. ask all nodes of historical layout versions, for blocks which have not + /// yet been transferred to their new storage nodes + /// + /// The preference order, for each layout version, is given by `request_order`, + /// based on factors such as nodes being in the same datacenter, + /// having low ping, etc. pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> { let layout = self.0.layout.read().unwrap(); - let mut ret = Vec::with_capacity(12); - let ver_iter = layout - .versions() - .iter() - .rev() - .chain(layout.inner().old_versions.iter().rev()); - for ver in ver_iter { - if ver.version > layout.sync_map_min() { - continue; + // Compute, for each layout version, the set of nodes that might store + // the block, and put them in their preferred order as of `request_order`. + let mut vernodes = layout.versions().iter().map(|ver| { + let nodes = ver.nodes_of(position, ver.replication_factor); + rpc_helper.request_order(layout.current(), nodes) + }); + + let mut ret = if layout.versions().len() == 1 { + // If we have only one active layout version, then these are the + // only nodes we ask in step 1 + vernodes.next().unwrap() + } else { + let vernodes = vernodes.collect::<Vec<_>>(); + + let mut nodes = Vec::<Uuid>::with_capacity(12); + for i in 0..layout.current().replication_factor { + for vn in vernodes.iter() { + if let Some(n) = vn.get(i) { + if !nodes.contains(&n) { + if *n == self.0.our_node_id { + // it's always fast (almost free) to ask locally, + // so always put that as first choice + nodes.insert(0, *n); + } else { + nodes.push(*n); + } + } + } + } } + + nodes + }; + + // Second step: add nodes of older layout versions + let old_ver_iter = layout.inner().old_versions.iter().rev(); + for ver in old_ver_iter { let nodes = ver.nodes_of(position, ver.replication_factor); for node in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { @@ -560,6 +614,7 @@ impl RpcHelper { } } } + ret } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 0fa68218..2a52ae5d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; @@ -749,7 +748,6 @@ impl System { } } -#[async_trait] impl EndpointHandler<SystemRpc> for System { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { match msg { diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e704cd3c..fad6ea08 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -22,7 +22,6 @@ opentelemetry.workspace = true async-trait.workspace = true arc-swap.workspace = true -bytes.workspace = true hex.workspace = true hexdump.workspace = true tracing.workspace = true diff --git a/src/table/gc.rs b/src/table/gc.rs index 9e060390..28ea119d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; + use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> { } } -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { diff --git a/src/table/sync.rs b/src/table/sync.rs index 234ee8ea..2d43b9fc 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { diff --git a/src/table/table.rs b/src/table/table.rs index ea8471d0..c96f4731 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -204,6 +203,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { entries_vec.push((write_sets, e_enc)); } + if entries_vec.is_empty() { + return Ok(()); + } + // Compute a deduplicated list of all of the write sets, // and compute an index from each node to the position of the sets in which // it takes part, to optimize the detection of a quorum. @@ -496,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { } } -#[async_trait] impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index da3e39b8..fec5b1ed 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -20,9 +20,7 @@ garage_net.workspace = true arc-swap.workspace = true async-trait.workspace = true blake2.workspace = true -bytes.workspace = true bytesize.workspace = true -digest.workspace = true err-derive.workspace = true hexdump.workspace = true xxhash-rust.workspace = true diff --git a/src/util/config.rs b/src/util/config.rs index b4e2b008..73fc4ff4 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -183,6 +183,9 @@ pub struct WebConfig { pub bind_addr: UnixOrTCPSocketAddress, /// Suffix to remove from domain name to find bucket pub root_domain: String, + /// Whether to add the requested domain to exported Prometheus metrics + #[serde(default)] + pub add_host_to_metrics: bool, } /// Configuration for the admin and monitoring HTTP API diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index d810d6f9..a0a3e566 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -14,7 +14,8 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api.workspace = true +garage_api_common.workspace = true +garage_api_s3.workspace = true garage_model.workspace = true garage_util.workspace = true garage_table.workspace = true @@ -23,12 +24,9 @@ err-derive.workspace = true tracing.workspace = true percent-encoding.workspace = true -futures.workspace = true - http.workspace = true http-body-util.workspace = true hyper.workspace = true -hyper-util.workspace = true tokio.workspace = true diff --git a/src/web/error.rs b/src/web/error.rs index bd8f17b5..7e6d4542 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -2,14 +2,14 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use garage_api::generic_server::ApiError; +use garage_api_common::generic_server::ApiError; /// Errors of this crate #[derive(Debug, Error)] pub enum Error { /// An error received from the API crate #[error(display = "API error: {}", _0)] - ApiError(garage_api::s3::error::Error), + ApiError(garage_api_s3::error::Error), /// The file does not exist #[error(display = "Not found")] @@ -22,10 +22,10 @@ pub enum Error { impl<T> From<T> for Error where - garage_api::s3::error::Error: From<T>, + garage_api_s3::error::Error: From<T>, { fn from(err: T) -> Self { - Error::ApiError(garage_api::s3::error::Error::from(err)) + Error::ApiError(garage_api_s3::error::Error::from(err)) } } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 69939f65..e73dab48 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -20,17 +20,20 @@ use opentelemetry::{ use crate::error::*; -use garage_api::generic_server::{server_loop, UnixListenerOn}; -use garage_api::helpers::*; -use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; -use garage_api::s3::error::{ +use garage_api_common::cors::{ + add_cors_headers, find_matching_cors_rule, handle_options_for_bucket, +}; +use garage_api_common::generic_server::{server_loop, UnixListenerOn}; +use garage_api_common::helpers::*; +use garage_api_s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, }; -use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; +use garage_api_s3::get::{handle_get_without_ctx, handle_head_without_ctx}; use garage_model::garage::Garage; use garage_table::*; +use garage_util::config::WebConfig; use garage_util::data::Uuid; use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; @@ -67,16 +70,18 @@ pub struct WebServer { garage: Arc<Garage>, metrics: Arc<WebMetrics>, root_domain: String, + add_host_to_metrics: bool, } impl WebServer { /// Run a web server - pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> { + pub fn new(garage: Arc<Garage>, config: &WebConfig) -> Arc<Self> { let metrics = Arc::new(WebMetrics::new()); Arc::new(WebServer { garage, metrics, - root_domain, + root_domain: config.root_domain.clone(), + add_host_to_metrics: config.add_host_to_metrics, }) } @@ -118,18 +123,27 @@ impl WebServer { req: Request<IncomingBody>, addr: String, ) -> Result<Response<BoxBody<Error>>, http::Error> { + let host_header = req + .headers() + .get(HOST) + .and_then(|x| x.to_str().ok()) + .unwrap_or("<unknown>") + .to_string(); + if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { + // uri() below has a preceding '/', so no space with host info!( - "{} (via {}) {} {}", + "{} (via {}) {} {}{}", forwarded_for_ip_addr, addr, req.method(), + host_header, req.uri() ); } else { - info!("{} {} {}", addr, req.method(), req.uri()); + info!("{} {} {}{}", addr, req.method(), host_header, req.uri()); } // Lots of instrumentation @@ -138,12 +152,16 @@ impl WebServer { .span_builder(format!("Web {} request", req.method())) .with_trace_id(gen_trace_id()) .with_attributes(vec![ + KeyValue::new("host", format!("{}", host_header.clone())), KeyValue::new("method", format!("{}", req.method())), KeyValue::new("uri", req.uri().to_string()), ]) .start(&tracer); - let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; + let mut metrics_tags = vec![KeyValue::new("method", req.method().to_string())]; + if self.add_host_to_metrics { + metrics_tags.push(KeyValue::new("host", host_header.clone())); + } // The actual handler let res = self @@ -158,25 +176,30 @@ impl WebServer { // Returning the result match res { Ok(res) => { - debug!("{} {} {}", req.method(), res.status(), req.uri()); + debug!( + "{} {} {}{}", + req.method(), + res.status(), + host_header, + req.uri() + ); Ok(res .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from)))) } Err(error) => { info!( - "{} {} {} {}", + "{} {} {}{} {}", req.method(), error.http_status_code(), + host_header, req.uri(), error ); - self.metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", error.http_status_code().to_string()), - ], - ); + metrics_tags.push(KeyValue::new( + "status_code", + error.http_status_code().to_string(), + )); + self.metrics.error_counter.add(1, &metrics_tags); Ok(error_to_res(error)) } } |