aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/Cargo.toml43
-rw-r--r--src/api/admin/api_server.rs18
-rw-r--r--src/api/admin/bucket.rs11
-rw-r--r--src/api/admin/cluster.rs7
-rw-r--r--src/api/admin/error.rs23
-rw-r--r--src/api/admin/key.rs7
-rw-r--r--src/api/admin/lib.rs (renamed from src/api/admin/mod.rs)3
-rw-r--r--src/api/admin/router_v0.rs5
-rw-r--r--src/api/admin/router_v1.rs7
-rw-r--r--src/api/common/Cargo.toml44
-rw-r--r--src/api/common/common_error.rs (renamed from src/api/common_error.rs)33
-rw-r--r--src/api/common/cors.rs170
-rw-r--r--src/api/common/encoding.rs (renamed from src/api/encoding.rs)0
-rw-r--r--src/api/common/generic_server.rs (renamed from src/api/generic_server.rs)18
-rw-r--r--src/api/common/helpers.rs (renamed from src/api/helpers.rs)10
-rw-r--r--src/api/common/lib.rs12
-rw-r--r--src/api/common/router_macros.rs (renamed from src/api/router_macros.rs)6
-rw-r--r--src/api/common/signature/error.rs (renamed from src/api/signature/error.rs)0
-rw-r--r--src/api/common/signature/mod.rs (renamed from src/api/signature/mod.rs)0
-rw-r--r--src/api/common/signature/payload.rs (renamed from src/api/signature/payload.rs)2
-rw-r--r--src/api/common/signature/streaming.rs (renamed from src/api/signature/streaming.rs)0
-rw-r--r--src/api/k2v/Cargo.toml37
-rw-r--r--src/api/k2v/api_server.rs26
-rw-r--r--src/api/k2v/batch.rs11
-rw-r--r--src/api/k2v/error.rs27
-rw-r--r--src/api/k2v/index.rs9
-rw-r--r--src/api/k2v/item.rs7
-rw-r--r--src/api/k2v/lib.rs (renamed from src/api/k2v/mod.rs)3
-rw-r--r--src/api/k2v/range.rs5
-rw-r--r--src/api/k2v/router.rs6
-rw-r--r--src/api/lib.rs17
-rw-r--r--src/api/s3/Cargo.toml (renamed from src/api/Cargo.toml)18
-rw-r--r--src/api/s3/api_server.rs46
-rw-r--r--src/api/s3/bucket.rs13
-rw-r--r--src/api/s3/checksum.rs2
-rw-r--r--src/api/s3/copy.rs21
-rw-r--r--src/api/s3/cors.rs183
-rw-r--r--src/api/s3/delete.rs13
-rw-r--r--src/api/s3/encryption.rs7
-rw-r--r--src/api/s3/error.rs35
-rw-r--r--src/api/s3/get.rs11
-rw-r--r--src/api/s3/lib.rs (renamed from src/api/s3/mod.rs)3
-rw-r--r--src/api/s3/lifecycle.rs11
-rw-r--r--src/api/s3/list.rs18
-rw-r--r--src/api/s3/multipart.rs28
-rw-r--r--src/api/s3/post_object.rs19
-rw-r--r--src/api/s3/put.rs11
-rw-r--r--src/api/s3/router.rs22
-rw-r--r--src/api/s3/website.rs13
-rw-r--r--src/api/s3/xml.rs2
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/manager.rs24
-rw-r--r--src/block/resync.rs24
-rw-r--r--src/db/Cargo.toml1
-rw-r--r--src/garage/Cargo.toml16
-rw-r--r--src/garage/admin/mod.rs46
-rw-r--r--src/garage/repair/online.rs9
-rw-r--r--src/garage/server.rs8
-rw-r--r--src/garage/tests/common/custom_requester.rs2
-rw-r--r--src/garage/tests/common/garage.rs6
-rw-r--r--src/k2v-client/Cargo.toml3
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/garage.rs2
-rw-r--r--src/model/helper/locked.rs9
-rw-r--r--src/model/k2v/rpc.rs2
-rw-r--r--src/model/s3/lifecycle_worker.rs6
-rw-r--r--src/net/Cargo.toml1
-rw-r--r--src/net/client.rs2
-rw-r--r--src/net/endpoint.rs32
-rw-r--r--src/net/netapp.rs2
-rw-r--r--src/net/peering.rs3
-rw-r--r--src/net/recv.rs2
-rw-r--r--src/net/send.rs2
-rw-r--r--src/net/server.rs2
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/layout/helper.rs5
-rw-r--r--src/rpc/rpc_helper.rs73
-rw-r--r--src/rpc/system.rs2
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/sync.rs1
-rw-r--r--src/table/table.rs6
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/config.rs3
-rw-r--r--src/web/Cargo.toml6
-rw-r--r--src/web/error.rs8
-rw-r--r--src/web/web_server.rs61
87 files changed, 829 insertions, 597 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml
new file mode 100644
index 00000000..adddf306
--- /dev/null
+++ b/src/api/admin/Cargo.toml
@@ -0,0 +1,43 @@
+[package]
+name = "garage_api_admin"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Admin API server crate for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_model.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+garage_rpc.workspace = true
+garage_api_common.workspace = true
+
+argon2.workspace = true
+async-trait.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+tracing.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
+opentelemetry-prometheus = { workspace = true, optional = true }
+prometheus = { workspace = true, optional = true }
+
+[features]
+metrics = [ "opentelemetry-prometheus", "prometheus" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 0e4565bb..6f0c474f 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use argon2::password_hash::PasswordHash;
-use async_trait::async_trait;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
@@ -20,15 +19,15 @@ use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
-use crate::generic_server::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
-use crate::admin::bucket::*;
-use crate::admin::cluster::*;
-use crate::admin::error::*;
-use crate::admin::key::*;
-use crate::admin::router_v0;
-use crate::admin::router_v1::{Authorization, Endpoint};
-use crate::helpers::*;
+use crate::bucket::*;
+use crate::cluster::*;
+use crate::error::*;
+use crate::key::*;
+use crate::router_v0;
+use crate::router_v1::{Authorization, Endpoint};
pub type ResBody = BoxBody<Error>;
@@ -221,7 +220,6 @@ impl AdminApiServer {
}
}
-#[async_trait]
impl ApiHandler for AdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index ac3cba00..2537bfc9 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -17,11 +17,12 @@ use garage_model::permission::*;
use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*;
-use crate::admin::api_server::ResBody;
-use crate::admin::error::*;
-use crate::admin::key::ApiBucketKeyPerm;
-use crate::common_error::CommonError;
-use crate::helpers::*;
+use garage_api_common::common_error::CommonError;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::error::*;
+use crate::key::ApiBucketKeyPerm;
pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let buckets = garage
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 357ac600..ffa0fa71 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -12,9 +12,10 @@ use garage_rpc::layout;
use garage_model::garage::Garage;
-use crate::admin::api_server::ResBody;
-use crate::admin::error::*;
-use crate::helpers::{json_ok_response, parse_json_body};
+use garage_api_common::helpers::{json_ok_response, parse_json_body};
+
+use crate::api_server::ResBody;
+use crate::error::*;
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let layout = garage.system.cluster_layout();
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 40d686e3..201f9b40 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -6,17 +6,19 @@ use hyper::{HeaderMap, StatusCode};
pub use garage_model::helper::error::Error as HelperError;
-use crate::common_error::CommonError;
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
+use garage_api_common::common_error::{commonErrorDerivative, CommonError};
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// The API access key does not exist
@@ -31,14 +33,7 @@ pub enum Error {
KeyAlreadyExists(String),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
+commonErrorDerivative!(Error);
/// FIXME: helper errors are transformed into their corresponding variants
/// in the Error struct, but in many case a helper error should be considered
@@ -53,8 +48,6 @@ impl From<HelperError> for Error {
}
}
-impl CommonErrorDerivative for Error {}
-
impl Error {
fn code(&self) -> &'static str {
match self {
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 291b6d54..bebf3063 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -9,9 +9,10 @@ use garage_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
-use crate::admin::api_server::ResBody;
-use crate::admin::error::*;
-use crate::helpers::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::error::*;
pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let res = garage
diff --git a/src/api/admin/mod.rs b/src/api/admin/lib.rs
index 43a8c59c..599e9b44 100644
--- a/src/api/admin/mod.rs
+++ b/src/api/admin/lib.rs
@@ -1,3 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
pub mod api_server;
mod error;
mod router_v0;
diff --git a/src/api/admin/router_v0.rs b/src/api/admin/router_v0.rs
index 68676445..9dd742ba 100644
--- a/src/api/admin/router_v0.rs
+++ b/src/api/admin/router_v0.rs
@@ -2,8 +2,9 @@ use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::admin::error::*;
-use crate::router_macros::*;
+use garage_api_common::router_macros::*;
+
+use crate::error::*;
router_match! {@func
diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs
index cc5ff2ec..0b4901ea 100644
--- a/src/api/admin/router_v1.rs
+++ b/src/api/admin/router_v1.rs
@@ -2,9 +2,10 @@ use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::admin::error::*;
-use crate::admin::router_v0;
-use crate::router_macros::*;
+use garage_api_common::router_macros::*;
+
+use crate::error::*;
+use crate::router_v0;
pub enum Authorization {
None,
diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml
new file mode 100644
index 00000000..5b9cf479
--- /dev/null
+++ b/src/api/common/Cargo.toml
@@ -0,0 +1,44 @@
+[package]
+name = "garage_api_common"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Common functions for the API server crates for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_model.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+
+bytes.workspace = true
+chrono.workspace = true
+crypto-common.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+hmac.workspace = true
+idna.workspace = true
+tracing.workspace = true
+nom.workspace = true
+pin-project.workspace = true
+sha2.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+http-body-util.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+hyper-util.workspace = true
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
diff --git a/src/api/common_error.rs b/src/api/common/common_error.rs
index 0c8006dc..597a3511 100644
--- a/src/api/common_error.rs
+++ b/src/api/common/common_error.rs
@@ -57,6 +57,35 @@ pub enum CommonError {
InvalidBucketName(String),
}
+#[macro_export]
+macro_rules! commonErrorDerivative {
+ ( $error_struct: ident ) => {
+ impl From<garage_util::error::Error> for $error_struct {
+ fn from(err: garage_util::error::Error) -> Self {
+ Self::Common(CommonError::InternalError(err))
+ }
+ }
+ impl From<http::Error> for $error_struct {
+ fn from(err: http::Error) -> Self {
+ Self::Common(CommonError::Http(err))
+ }
+ }
+ impl From<hyper::Error> for $error_struct {
+ fn from(err: hyper::Error) -> Self {
+ Self::Common(CommonError::Hyper(err))
+ }
+ }
+ impl From<hyper::header::ToStrError> for $error_struct {
+ fn from(err: hyper::header::ToStrError) -> Self {
+ Self::Common(CommonError::InvalidHeader(err))
+ }
+ }
+ impl CommonErrorDerivative for $error_struct {}
+ };
+}
+
+pub use commonErrorDerivative;
+
impl CommonError {
pub fn http_status_code(&self) -> StatusCode {
match self {
@@ -118,14 +147,14 @@ impl TryFrom<HelperError> for CommonError {
/// This is used for helper functions that might return InvalidBucketName
/// or NoSuchBucket for instance, and we want to pass that error
/// up to our caller.
-pub(crate) fn pass_helper_error(err: HelperError) -> CommonError {
+pub fn pass_helper_error(err: HelperError) -> CommonError {
match CommonError::try_from(err) {
Ok(e) => e,
Err(e) => panic!("Helper error `{}` should hot have happenned here", e),
}
}
-pub(crate) fn helper_error_as_internal(err: HelperError) -> CommonError {
+pub fn helper_error_as_internal(err: HelperError) -> CommonError {
match err {
HelperError::Internal(e) => CommonError::InternalError(e),
e => CommonError::InternalError(GarageError::Message(e.to_string())),
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs
new file mode 100644
index 00000000..14369b56
--- /dev/null
+++ b/src/api/common/cors.rs
@@ -0,0 +1,170 @@
+use std::sync::Arc;
+
+use http::header::{
+ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
+ ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
+};
+use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, StatusCode};
+
+use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule};
+use garage_model::garage::Garage;
+
+use crate::common_error::{
+ helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError,
+};
+use crate::helpers::*;
+
+pub fn find_matching_cors_rule<'a>(
+ bucket_params: &'a BucketParams,
+ req: &Request<impl Body>,
+) -> Result<Option<&'a GarageCorsRule>, CommonError> {
+ if let Some(cors_config) = bucket_params.cors_config.get() {
+ if let Some(origin) = req.headers().get("Origin") {
+ let origin = origin.to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+ return Ok(cors_config.iter().find(|rule| {
+ cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
+ }));
+ }
+ }
+ Ok(None)
+}
+
+pub fn cors_rule_matches<'a, HI, S>(
+ rule: &GarageCorsRule,
+ origin: &'a str,
+ method: &'a str,
+ mut request_headers: HI,
+) -> bool
+where
+ HI: Iterator<Item = S>,
+ S: AsRef<str>,
+{
+ rule.allow_origins.iter().any(|x| x == "*" || x == origin)
+ && rule.allow_methods.iter().any(|x| x == "*" || x == method)
+ && request_headers.all(|h| {
+ rule.allow_headers
+ .iter()
+ .any(|x| x == "*" || x == h.as_ref())
+ })
+}
+
+pub fn add_cors_headers(
+ resp: &mut Response<impl Body>,
+ rule: &GarageCorsRule,
+) -> Result<(), http::header::InvalidHeaderValue> {
+ let h = resp.headers_mut();
+ h.insert(
+ ACCESS_CONTROL_ALLOW_ORIGIN,
+ rule.allow_origins.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_METHODS,
+ rule.allow_methods.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_HEADERS,
+ rule.allow_headers.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_EXPOSE_HEADERS,
+ rule.expose_headers.join(", ").parse()?,
+ );
+ Ok(())
+}
+
+pub async fn handle_options_api(
+ garage: Arc<Garage>,
+ req: &Request<IncomingBody>,
+ bucket_name: Option<String>,
+) -> Result<Response<EmptyBody>, CommonError> {
+ // FIXME: CORS rules of buckets with local aliases are
+ // not taken into account.
+
+ // If the bucket name is a global bucket name,
+ // we try to apply the CORS rules of that bucket.
+ // If a user has a local bucket name that has
+ // the same name, its CORS rules won't be applied
+ // and will be shadowed by the rules of the globally
+ // existing bucket (but this is inevitable because
+ // OPTIONS calls are not auhtenticated).
+ if let Some(bn) = bucket_name {
+ let helper = garage.bucket_helper();
+ let bucket_id = helper
+ .resolve_global_bucket_name(&bn)
+ .await
+ .map_err(helper_error_as_internal)?;
+ if let Some(id) = bucket_id {
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(id)
+ .await
+ .map_err(helper_error_as_internal)?;
+ let bucket_params = bucket.state.into_option().unwrap();
+ handle_options_for_bucket(req, &bucket_params)
+ } else {
+ // If there is a bucket name in the request, but that name
+ // does not correspond to a global alias for a bucket,
+ // then it's either a non-existing bucket or a local bucket.
+ // We have no way of knowing, because the request is not
+ // authenticated and thus we can't resolve local aliases.
+ // We take the permissive approach of allowing everything,
+ // because we don't want to prevent web apps that use
+ // local bucket names from making API calls.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?)
+ }
+ } else {
+ // If there is no bucket name in the request,
+ // we are doing a ListBuckets call, which we want to allow
+ // for all origins.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?)
+ }
+}
+
+pub fn handle_options_for_bucket(
+ req: &Request<IncomingBody>,
+ bucket_params: &BucketParams,
+) -> Result<Response<EmptyBody>, CommonError> {
+ let origin = req
+ .headers()
+ .get("Origin")
+ .ok_or_bad_request("Missing Origin header")?
+ .to_str()?;
+ let request_method = req
+ .headers()
+ .get(ACCESS_CONTROL_REQUEST_METHOD)
+ .ok_or_bad_request("Missing Access-Control-Request-Method header")?
+ .to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+
+ if let Some(cors_config) = bucket_params.cors_config.get() {
+ let matching_rule = cors_config
+ .iter()
+ .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
+ if let Some(rule) = matching_rule {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?;
+ add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
+ return Ok(resp);
+ }
+ }
+
+ Err(CommonError::Forbidden(
+ "This CORS request is not allowed.".into(),
+ ))
+}
diff --git a/src/api/encoding.rs b/src/api/common/encoding.rs
index e286a784..e286a784 100644
--- a/src/api/encoding.rs
+++ b/src/api/common/encoding.rs
diff --git a/src/api/generic_server.rs b/src/api/common/generic_server.rs
index 283abdd4..6ddc2ff2 100644
--- a/src/api/generic_server.rs
+++ b/src/api/common/generic_server.rs
@@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::time::Duration;
-use async_trait::async_trait;
-
use futures::future::Future;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
@@ -36,7 +34,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::helpers::{BoxBody, ErrorBody};
-pub(crate) trait ApiEndpoint: Send + Sync + 'static {
+pub trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn add_span_attributes(&self, span: SpanRef<'_>);
}
@@ -47,8 +45,7 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static {
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
}
-#[async_trait]
-pub(crate) trait ApiHandler: Send + Sync + 'static {
+pub trait ApiHandler: Send + Sync + 'static {
const API_NAME: &'static str;
const API_NAME_DISPLAY: &'static str;
@@ -56,14 +53,14 @@ pub(crate) trait ApiHandler: Send + Sync + 'static {
type Error: ApiError;
fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
- async fn handle(
+ fn handle(
&self,
req: Request<IncomingBody>,
endpoint: Self::Endpoint,
- ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>;
+ ) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send;
}
-pub(crate) struct ApiServer<A: ApiHandler> {
+pub struct ApiServer<A: ApiHandler> {
region: String,
api_handler: A,
@@ -248,13 +245,11 @@ impl<A: ApiHandler> ApiServer<A> {
// ==== helper functions ====
-#[async_trait]
pub trait Accept: Send + Sync + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
- async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
+ fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send;
}
-#[async_trait]
impl Accept for TcpListener {
type Stream = TcpStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
@@ -266,7 +261,6 @@ impl Accept for TcpListener {
pub struct UnixListenerOn(pub UnixListener, pub String);
-#[async_trait]
impl Accept for UnixListenerOn {
type Stream = UnixStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
diff --git a/src/api/helpers.rs b/src/api/common/helpers.rs
index cf60005d..c8586de4 100644
--- a/src/api/helpers.rs
+++ b/src/api/common/helpers.rs
@@ -363,9 +363,9 @@ mod tests {
}
#[derive(Serialize)]
-pub(crate) struct CustomApiErrorBody {
- pub(crate) code: String,
- pub(crate) message: String,
- pub(crate) region: String,
- pub(crate) path: String,
+pub struct CustomApiErrorBody {
+ pub code: String,
+ pub message: String,
+ pub region: String,
+ pub path: String,
}
diff --git a/src/api/common/lib.rs b/src/api/common/lib.rs
new file mode 100644
index 00000000..0e655a53
--- /dev/null
+++ b/src/api/common/lib.rs
@@ -0,0 +1,12 @@
+//! Crate for serving a S3 compatible API
+#[macro_use]
+extern crate tracing;
+
+pub mod common_error;
+
+pub mod cors;
+pub mod encoding;
+pub mod generic_server;
+pub mod helpers;
+pub mod router_macros;
+pub mod signature;
diff --git a/src/api/router_macros.rs b/src/api/common/router_macros.rs
index 8f10a4f5..d9fe86db 100644
--- a/src/api/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -1,5 +1,6 @@
/// This macro is used to generate very repetitive match {} blocks in this module
/// It is _not_ made to be used anywhere else
+#[macro_export]
macro_rules! router_match {
(@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
// usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
@@ -133,6 +134,7 @@ macro_rules! router_match {
/// This macro is used to generate part of the code in this module. It must be called only one, and
/// is useless outside of this module.
+#[macro_export]
macro_rules! generateQueryParameters {
(
keywords: [ $($kw_param:expr => $kw_name: ident),* ],
@@ -220,5 +222,5 @@ macro_rules! generateQueryParameters {
}
}
-pub(crate) use generateQueryParameters;
-pub(crate) use router_match;
+pub use generateQueryParameters;
+pub use router_match;
diff --git a/src/api/signature/error.rs b/src/api/common/signature/error.rs
index 2d92a072..2d92a072 100644
--- a/src/api/signature/error.rs
+++ b/src/api/common/signature/error.rs
diff --git a/src/api/signature/mod.rs b/src/api/common/signature/mod.rs
index 6514da43..6514da43 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/common/signature/mod.rs
diff --git a/src/api/signature/payload.rs b/src/api/common/signature/payload.rs
index 9e5a6043..81541e4a 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/common/signature/payload.rs
@@ -518,7 +518,7 @@ impl Authorization {
})
}
- pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
+ pub fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
let algorithm = params
.get(X_AMZ_ALGORITHM)
.ok_or_bad_request("Missing X-Amz-Algorithm header")?
diff --git a/src/api/signature/streaming.rs b/src/api/common/signature/streaming.rs
index e223d1b1..e223d1b1 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml
new file mode 100644
index 00000000..e3ebedca
--- /dev/null
+++ b/src/api/k2v/Cargo.toml
@@ -0,0 +1,37 @@
+[package]
+name = "garage_api_k2v"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "K2V API server crate for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_model = { workspace = true, features = [ "k2v" ] }
+garage_table.workspace = true
+garage_util = { workspace = true, features = [ "k2v" ] }
+garage_api_common.workspace = true
+
+base64.workspace = true
+err-derive.workspace = true
+tracing.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+http-body-util.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+percent-encoding.workspace = true
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index de6e5f06..eb276f5b 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -1,7 +1,5 @@
use std::sync::Arc;
-use async_trait::async_trait;
-
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
use tokio::sync::watch;
@@ -12,26 +10,25 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
-use crate::generic_server::*;
-use crate::k2v::error::*;
-
-use crate::signature::verify_request;
+use garage_api_common::cors::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_request;
-use crate::helpers::*;
-use crate::k2v::batch::*;
-use crate::k2v::index::*;
-use crate::k2v::item::*;
-use crate::k2v::router::Endpoint;
-use crate::s3::cors::*;
+use crate::batch::*;
+use crate::error::*;
+use crate::index::*;
+use crate::item::*;
+use crate::router::Endpoint;
-pub use crate::signature::streaming::ReqBody;
+pub use garage_api_common::signature::streaming::ReqBody;
pub type ResBody = BoxBody<Error>;
pub struct K2VApiServer {
garage: Arc<Garage>,
}
-pub(crate) struct K2VApiEndpoint {
+pub struct K2VApiEndpoint {
bucket_name: String,
endpoint: Endpoint,
}
@@ -49,7 +46,6 @@ impl K2VApiServer {
}
}
-#[async_trait]
impl ApiHandler for K2VApiServer {
const API_NAME: &'static str = "k2v";
const API_NAME_DISPLAY: &'static str = "K2V";
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index e4d0b0e5..c284dbd4 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -6,11 +6,12 @@ use garage_table::{EnumerationOrder, TableSchema};
use garage_model::k2v::item_table::*;
-use crate::helpers::*;
-use crate::k2v::api_server::{ReqBody, ResBody};
-use crate::k2v::error::*;
-use crate::k2v::item::parse_causality_token;
-use crate::k2v::range::read_range;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::item::parse_causality_token;
+use crate::range::read_range;
pub async fn handle_insert_batch(
ctx: ReqCtx,
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
index dbe4be2c..3cd0e6f7 100644
--- a/src/api/k2v/error.rs
+++ b/src/api/k2v/error.rs
@@ -2,19 +2,21 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
-use crate::common_error::CommonError;
-pub(crate) use crate::common_error::{helper_error_as_internal, pass_helper_error};
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
-use crate::signature::error::Error as SignatureError;
+use garage_api_common::common_error::{commonErrorDerivative, CommonError};
+pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error};
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::error::Error as SignatureError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// Authorization Header Malformed
@@ -42,16 +44,7 @@ pub enum Error {
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
-
-impl CommonErrorDerivative for Error {}
+commonErrorDerivative!(Error);
impl From<SignatureError> for Error {
fn from(err: SignatureError) -> Self {
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index e3397238..fbfaad98 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -5,10 +5,11 @@ use garage_table::util::*;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
-use crate::helpers::*;
-use crate::k2v::api_server::ResBody;
-use crate::k2v::error::*;
-use crate::k2v::range::read_range;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::error::*;
+use crate::range::read_range;
pub async fn handle_read_index(
ctx: ReqCtx,
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 87371727..4e28b499 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -6,9 +6,10 @@ use hyper::{Request, Response, StatusCode};
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::helpers::*;
-use crate::k2v::api_server::{ReqBody, ResBody};
-use crate::k2v::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/lib.rs
index b6a8c5cf..334ae46b 100644
--- a/src/api/k2v/mod.rs
+++ b/src/api/k2v/lib.rs
@@ -1,3 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
pub mod api_server;
mod error;
mod router;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
index bb9d3be5..eb4738db 100644
--- a/src/api/k2v/range.rs
+++ b/src/api/k2v/range.rs
@@ -7,8 +7,9 @@ use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::helpers::key_after_prefix;
-use crate::k2v::error::*;
+use garage_api_common::helpers::key_after_prefix;
+
+use crate::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index 1cc58be5..a04b0f81 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -1,11 +1,11 @@
-use crate::k2v::error::*;
+use crate::error::*;
use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::helpers::Authorization;
-use crate::router_macros::{generateQueryParameters, router_match};
+use garage_api_common::helpers::Authorization;
+use garage_api_common::router_macros::{generateQueryParameters, router_match};
router_match! {@func
diff --git a/src/api/lib.rs b/src/api/lib.rs
deleted file mode 100644
index 370dfd7a..00000000
--- a/src/api/lib.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-//! Crate for serving a S3 compatible API
-#[macro_use]
-extern crate tracing;
-
-pub mod common_error;
-
-mod encoding;
-pub mod generic_server;
-pub mod helpers;
-mod router_macros;
-/// This mode is public only to help testing. Don't expect stability here
-pub mod signature;
-
-pub mod admin;
-#[cfg(feature = "k2v")]
-pub mod k2v;
-pub mod s3;
diff --git a/src/api/Cargo.toml b/src/api/s3/Cargo.toml
index 85b78a5b..387e45db 100644
--- a/src/api/Cargo.toml
+++ b/src/api/s3/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "garage_api"
+name = "garage_api_s3"
version = "1.0.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
@@ -20,30 +20,24 @@ garage_block.workspace = true
garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
+garage_api_common.workspace = true
aes-gcm.workspace = true
-argon2.workspace = true
async-compression.workspace = true
-async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
crc32fast.workspace = true
crc32c.workspace = true
-crypto-common.workspace = true
err-derive.workspace = true
hex.workspace = true
-hmac.workspace = true
-idna.workspace = true
tracing.workspace = true
md-5.workspace = true
-nom.workspace = true
pin-project.workspace = true
sha1.workspace = true
sha2.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
@@ -54,21 +48,13 @@ httpdate.workspace = true
http-range.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
-hyper-util.workspace = true
multer.workspace = true
percent-encoding.workspace = true
roxmltree.workspace = true
url.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
serde_json.workspace = true
quick-xml.workspace = true
opentelemetry.workspace = true
-opentelemetry-prometheus = { workspace = true, optional = true }
-prometheus = { workspace = true, optional = true }
-
-[features]
-k2v = [ "garage_util/k2v", "garage_model/k2v" ]
-metrics = [ "opentelemetry-prometheus", "prometheus" ]
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index f9dafa10..14fd03c3 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -1,7 +1,5 @@
use std::sync::Arc;
-use async_trait::async_trait;
-
use hyper::header;
use hyper::{body::Incoming as IncomingBody, Request, Response};
use tokio::sync::watch;
@@ -14,33 +12,33 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use crate::generic_server::*;
-use crate::s3::error::*;
-
-use crate::signature::verify_request;
-
-use crate::helpers::*;
-use crate::s3::bucket::*;
-use crate::s3::copy::*;
-use crate::s3::cors::*;
-use crate::s3::delete::*;
-use crate::s3::get::*;
-use crate::s3::lifecycle::*;
-use crate::s3::list::*;
-use crate::s3::multipart::*;
-use crate::s3::post_object::handle_post_object;
-use crate::s3::put::*;
-use crate::s3::router::Endpoint;
-use crate::s3::website::*;
-
-pub use crate::signature::streaming::ReqBody;
+use garage_api_common::cors::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_request;
+
+use crate::bucket::*;
+use crate::copy::*;
+use crate::cors::*;
+use crate::delete::*;
+use crate::error::*;
+use crate::get::*;
+use crate::lifecycle::*;
+use crate::list::*;
+use crate::multipart::*;
+use crate::post_object::handle_post_object;
+use crate::put::*;
+use crate::router::Endpoint;
+use crate::website::*;
+
+pub use garage_api_common::signature::streaming::ReqBody;
pub type ResBody = BoxBody<Error>;
pub struct S3ApiServer {
garage: Arc<Garage>,
}
-pub(crate) struct S3ApiEndpoint {
+pub struct S3ApiEndpoint {
bucket_name: Option<String>,
endpoint: Endpoint,
}
@@ -70,7 +68,6 @@ impl S3ApiServer {
}
}
-#[async_trait]
impl ApiHandler for S3ApiServer {
const API_NAME: &'static str = "s3";
const API_NAME_DISPLAY: &'static str = "S3";
@@ -320,7 +317,6 @@ impl ApiHandler for S3ApiServer {
} => {
let query = ListPartsQuery {
bucket_name: ctx.bucket_name.clone(),
- bucket_id,
key,
upload_id,
part_number_marker: part_number_marker.map(|p| p.min(10000)),
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 6a12aa9c..0a192ba6 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -13,12 +13,13 @@ use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
-use crate::common_error::CommonError;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::common_error::CommonError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml as s3_xml;
pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = ctx;
diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs
index c7527163..02fb55ec 100644
--- a/src/api/s3/checksum.rs
+++ b/src/api/s3/checksum.rs
@@ -15,7 +15,7 @@ use garage_util::error::OkOrMessage;
use garage_model::s3::object_table::*;
-use crate::s3::error::*;
+use crate::error::*;
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index b67ace88..07d50ea5 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -20,15 +20,16 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::get::full_object_byte_stream;
-use crate::s3::multipart;
-use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
-use crate::s3::xml::{self as s3_xml, xmlns_tag};
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::get::full_object_byte_stream;
+use crate::multipart;
+use crate::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
+use crate::xml::{self as s3_xml, xmlns_tag};
// -------- CopyObject ---------
@@ -862,7 +863,7 @@ pub struct CopyPartResult {
#[cfg(test)]
mod tests {
use super::*;
- use crate::s3::xml::to_xml_with_header;
+ use crate::xml::to_xml_with_header;
#[test]
fn copy_object_result() -> Result<(), Error> {
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 32dcc0d5..625b84db 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -1,31 +1,21 @@
-use std::sync::Arc;
-
use quick_xml::de::from_reader;
-use http::header::{
- ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
- ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
-};
-use hyper::{
- body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response,
- StatusCode,
-};
+use hyper::{header::HeaderName, Method, Request, Response, StatusCode};
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
-use crate::common_error::{helper_error_as_internal, CommonError};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
-
-use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule};
-use garage_model::garage::Garage;
+use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_util::data::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+
pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(cors) = bucket_params.cors_config.get() {
@@ -100,161 +90,6 @@ pub async fn handle_put_cors(
.body(empty_body())?)
}
-pub async fn handle_options_api(
- garage: Arc<Garage>,
- req: &Request<IncomingBody>,
- bucket_name: Option<String>,
-) -> Result<Response<EmptyBody>, CommonError> {
- // FIXME: CORS rules of buckets with local aliases are
- // not taken into account.
-
- // If the bucket name is a global bucket name,
- // we try to apply the CORS rules of that bucket.
- // If a user has a local bucket name that has
- // the same name, its CORS rules won't be applied
- // and will be shadowed by the rules of the globally
- // existing bucket (but this is inevitable because
- // OPTIONS calls are not auhtenticated).
- if let Some(bn) = bucket_name {
- let helper = garage.bucket_helper();
- let bucket_id = helper
- .resolve_global_bucket_name(&bn)
- .await
- .map_err(helper_error_as_internal)?;
- if let Some(id) = bucket_id {
- let bucket = garage
- .bucket_helper()
- .get_existing_bucket(id)
- .await
- .map_err(helper_error_as_internal)?;
- let bucket_params = bucket.state.into_option().unwrap();
- handle_options_for_bucket(req, &bucket_params)
- } else {
- // If there is a bucket name in the request, but that name
- // does not correspond to a global alias for a bucket,
- // then it's either a non-existing bucket or a local bucket.
- // We have no way of knowing, because the request is not
- // authenticated and thus we can't resolve local aliases.
- // We take the permissive approach of allowing everything,
- // because we don't want to prevent web apps that use
- // local bucket names from making API calls.
- Ok(Response::builder()
- .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
- .status(StatusCode::OK)
- .body(EmptyBody::new())?)
- }
- } else {
- // If there is no bucket name in the request,
- // we are doing a ListBuckets call, which we want to allow
- // for all origins.
- Ok(Response::builder()
- .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
- .status(StatusCode::OK)
- .body(EmptyBody::new())?)
- }
-}
-
-pub fn handle_options_for_bucket(
- req: &Request<IncomingBody>,
- bucket_params: &BucketParams,
-) -> Result<Response<EmptyBody>, CommonError> {
- let origin = req
- .headers()
- .get("Origin")
- .ok_or_bad_request("Missing Origin header")?
- .to_str()?;
- let request_method = req
- .headers()
- .get(ACCESS_CONTROL_REQUEST_METHOD)
- .ok_or_bad_request("Missing Access-Control-Request-Method header")?
- .to_str()?;
- let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
- Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
- None => vec![],
- };
-
- if let Some(cors_config) = bucket_params.cors_config.get() {
- let matching_rule = cors_config
- .iter()
- .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
- if let Some(rule) = matching_rule {
- let mut resp = Response::builder()
- .status(StatusCode::OK)
- .body(EmptyBody::new())?;
- add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
- return Ok(resp);
- }
- }
-
- Err(CommonError::Forbidden(
- "This CORS request is not allowed.".into(),
- ))
-}
-
-pub fn find_matching_cors_rule<'a>(
- bucket_params: &'a BucketParams,
- req: &Request<impl Body>,
-) -> Result<Option<&'a GarageCorsRule>, Error> {
- if let Some(cors_config) = bucket_params.cors_config.get() {
- if let Some(origin) = req.headers().get("Origin") {
- let origin = origin.to_str()?;
- let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
- Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
- None => vec![],
- };
- return Ok(cors_config.iter().find(|rule| {
- cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
- }));
- }
- }
- Ok(None)
-}
-
-fn cors_rule_matches<'a, HI, S>(
- rule: &GarageCorsRule,
- origin: &'a str,
- method: &'a str,
- mut request_headers: HI,
-) -> bool
-where
- HI: Iterator<Item = S>,
- S: AsRef<str>,
-{
- rule.allow_origins.iter().any(|x| x == "*" || x == origin)
- && rule.allow_methods.iter().any(|x| x == "*" || x == method)
- && request_headers.all(|h| {
- rule.allow_headers
- .iter()
- .any(|x| x == "*" || x == h.as_ref())
- })
-}
-
-pub fn add_cors_headers(
- resp: &mut Response<impl Body>,
- rule: &GarageCorsRule,
-) -> Result<(), http::header::InvalidHeaderValue> {
- let h = resp.headers_mut();
- h.insert(
- ACCESS_CONTROL_ALLOW_ORIGIN,
- rule.allow_origins.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_ALLOW_METHODS,
- rule.allow_methods.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_ALLOW_HEADERS,
- rule.allow_headers.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_EXPOSE_HEADERS,
- rule.expose_headers.join(", ").parse()?,
- );
- Ok(())
-}
-
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 57f6f948..b799e67a 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -5,12 +5,13 @@ use garage_util::data::*;
use garage_model::s3::object_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::put::next_timestamp;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::put::next_timestamp;
+use crate::xml as s3_xml;
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
let ReqCtx {
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs
index 2e6ed65c..b38d7792 100644
--- a/src/api/s3/encryption.rs
+++ b/src/api/s3/encryption.rs
@@ -28,9 +28,10 @@ use garage_util::migrate::Migrate;
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner};
-use crate::common_error::*;
-use crate::s3::checksum::Md5Checksum;
-use crate::s3::error::Error;
+use garage_api_common::common_error::*;
+
+use crate::checksum::Md5Checksum;
+use crate::error::Error;
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm");
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index 22d2fe14..1bb8909c 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -6,20 +6,28 @@ use hyper::{HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
-pub(crate) use crate::common_error::pass_helper_error;
-use crate::common_error::{helper_error_as_internal, CommonError};
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::error::Error as SignatureError;
+pub(crate) use garage_api_common::common_error::pass_helper_error;
+
+use garage_api_common::common_error::{
+ commonErrorDerivative, helper_error_as_internal, CommonError,
+};
+
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::error::Error as SignatureError;
+
+use crate::xml as s3_xml;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// Authorization Header Malformed
@@ -81,14 +89,7 @@ pub enum Error {
NotImplemented(String),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
+commonErrorDerivative!(Error);
// Helper errors are always passed as internal errors by default.
// To pass the specific error code back to the client, use `pass_helper_error`.
@@ -98,8 +99,6 @@ impl From<HelperError> for Error {
}
}
-impl CommonErrorDerivative for Error {}
-
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index f61aae11..c2393a51 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -25,11 +25,12 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::ResBody;
-use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
+use crate::encryption::EncryptionParams;
+use crate::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
diff --git a/src/api/s3/mod.rs b/src/api/s3/lib.rs
index b9bb1a6f..fd99b443 100644
--- a/src/api/s3/mod.rs
+++ b/src/api/s3/lib.rs
@@ -1,3 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
pub mod api_server;
pub mod error;
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 7eb1c2cb..c35047ed 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -5,11 +5,12 @@ use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 68d6cbe6..94c2c895 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -13,13 +13,14 @@ use garage_model::s3::object_table::*;
use garage_table::EnumerationOrder;
-use crate::encoding::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::multipart as s3_multipart;
-use crate::s3::xml as s3_xml;
+use garage_api_common::encoding::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::multipart as s3_multipart;
+use crate::xml as s3_xml;
const DUMMY_NAME: &str = "Dummy Key";
const DUMMY_KEY: &str = "GKDummyKey";
@@ -53,7 +54,6 @@ pub struct ListMultipartUploadsQuery {
#[derive(Debug)]
pub struct ListPartsQuery {
pub bucket_name: String,
- pub bucket_id: Uuid,
pub key: String,
pub upload_id: String,
pub part_number_marker: Option<u64>,
@@ -1244,10 +1244,8 @@ mod tests {
#[test]
fn test_fetch_part_info() -> Result<(), Error> {
- let uuid = Uuid::from([0x08; 32]);
let mut query = ListPartsQuery {
bucket_name: "a".to_string(),
- bucket_id: uuid,
key: "a".to_string(),
upload_id: "xx".to_string(),
part_number_marker: None,
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 3db3e8aa..fa053df2 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -15,14 +15,15 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::put::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::put::*;
+use crate::xml as s3_xml;
// ----
@@ -429,7 +430,16 @@ pub async fn handle_complete_multipart_upload(
// Send response saying ok we're done
let result = s3_xml::CompleteMultipartUploadResult {
xmlns: (),
- location: None,
+ // FIXME: the location returned is not always correct:
+ // - we always return https, but maybe some people do http
+ // - if root_domain is not specified, a full URL is not returned
+ location: garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key)))
+ .or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(format!("\"{}\"", etag)),
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 5279ec6a..6c0e73d4 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -16,15 +16,16 @@ use serde::Deserialize;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::ResBody;
-use crate::s3::checksum::*;
-use crate::s3::cors::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::put::{get_headers, save_stream, ChecksumMode};
-use crate::s3::xml as s3_xml;
-use crate::signature::payload::{verify_v4, Authorization};
+use garage_api_common::cors::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::payload::{verify_v4, Authorization};
+
+use crate::api_server::ResBody;
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::put::{get_headers, save_stream, ChecksumMode};
+use crate::xml as s3_xml;
pub async fn handle_post_object(
garage: Arc<Garage>,
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index bfb0dc9b..530b4e7b 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -30,11 +30,12 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index e7ac1d77..e3f58490 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -3,9 +3,10 @@ use std::borrow::Cow;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, Method, Request};
-use crate::helpers::Authorization;
-use crate::router_macros::{generateQueryParameters, router_match};
-use crate::s3::error::*;
+use garage_api_common::helpers::Authorization;
+use garage_api_common::router_macros::{generateQueryParameters, router_match};
+
+use crate::error::*;
router_match! {@func
@@ -351,6 +352,18 @@ impl Endpoint {
_ => return Err(Error::bad_request("Unknown method")),
};
+ if let Some(x_id) = query.x_id.take() {
+ if x_id != res.name() {
+ // I think AWS ignores the x-id parameter.
+ // Let's make this at least be a warnin to help debugging.
+ warn!(
+ "x-id ({}) does not match parsed endpoint ({})",
+ x_id,
+ res.name()
+ );
+ }
+ }
+
if let Some(message) = query.nonempty_message() {
debug!("Unused query parameter: {}", message)
}
@@ -695,7 +708,8 @@ generateQueryParameters! {
"uploadId" => upload_id,
"upload-id-marker" => upload_id_marker,
"versionId" => version_id,
- "version-id-marker" => version_id_marker
+ "version-id-marker" => version_id_marker,
+ "x-id" => x_id
]
}
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index fa36bc32..b55bb345 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -4,15 +4,16 @@ use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
-
use garage_model::bucket_table::*;
use garage_util::data::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+
pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(website) = bucket_params.website_config.get() {
diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs
index 1e569ade..e8af3ec0 100644
--- a/src/api/s3/xml.rs
+++ b/src/api/s3/xml.rs
@@ -1,7 +1,7 @@
use quick_xml::se::to_string;
use serde::{Deserialize, Serialize, Serializer};
-use crate::s3::error::Error as ApiError;
+use crate::error::Error as ApiError;
pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> {
let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string();
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 1af4d7f5..3358a3e7 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -34,10 +34,8 @@ async-compression.workspace = true
zstd.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 40b177a2..41b2f02a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::{ArcSwap, ArcSwapOption};
-use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
@@ -371,7 +370,7 @@ impl BlockManager {
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_sets(&hash);
+ let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
@@ -397,7 +396,7 @@ impl BlockManager {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
- who.as_ref(),
+ &[who],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)
@@ -669,10 +668,12 @@ impl BlockManager {
hash: &Hash,
wrong_path: DataBlockPath,
) -> Result<usize, Error> {
+ let data = self.read_block_from(hash, &wrong_path).await?;
self.lock_mutate(hash)
.await
- .fix_block_location(hash, wrong_path, self)
- .await
+ .write_block_inner(hash, &data, self, Some(wrong_path))
+ .await?;
+ Ok(data.as_parts_ref().1.len())
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
@@ -688,7 +689,6 @@ impl BlockManager {
}
}
-#[async_trait]
impl StreamingEndpointHandler<BlockRpc> for BlockManager {
async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
match message.msg() {
@@ -829,18 +829,6 @@ impl BlockManagerLocked {
}
Ok(())
}
-
- async fn fix_block_location(
- &self,
- hash: &Hash,
- wrong_path: DataBlockPath,
- mgr: &BlockManager,
- ) -> Result<usize, Error> {
- let data = mgr.read_block_from(hash, &wrong_path).await?;
- self.write_block_inner(hash, &data, mgr, Some(wrong_path))
- .await?;
- Ok(data.as_parts_ref().1.len())
- }
}
struct DeleteOnDrop(Option<PathBuf>);
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 947c68de..b476a0b8 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -377,7 +377,10 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.storage_nodes(hash);
+ let mut who = manager
+ .system
+ .cluster_layout()
+ .current_storage_nodes_of(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -455,6 +458,25 @@ impl BlockResyncManager {
}
if rc.is_nonzero() && !exists {
+ // The refcount is > 0, and the block is not present locally.
+ // We might need to fetch it from another node.
+
+ // First, check whether we are still supposed to store that
+ // block in the latest cluster layout version.
+ let storage_nodes = manager
+ .system
+ .cluster_layout()
+ .current_storage_nodes_of(&hash);
+
+ if !storage_nodes.contains(&manager.system.id) {
+ info!(
+ "Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
+ hash
+ );
+ return Ok(());
+ }
+
+ // We know we need the block. Fetch it.
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 0a278bc0..3ef51fae 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -13,7 +13,6 @@ path = "lib.rs"
[dependencies]
err-derive.workspace = true
-hexdump.workspace = true
tracing.workspace = true
heed = { workspace = true, optional = true }
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 483e33c0..c036f000 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,7 +23,9 @@ path = "tests/lib.rs"
[dependencies]
format_table.workspace = true
garage_db.workspace = true
-garage_api.workspace = true
+garage_api_admin.workspace = true
+garage_api_s3.workspace = true
+garage_api_k2v = { workspace = true, optional = true }
garage_block.workspace = true
garage_model.workspace = true
garage_net.workspace = true
@@ -40,7 +42,6 @@ parse_duration.workspace = true
hex.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
-rand.workspace = true
async-trait.workspace = true
sha1.workspace = true
sodiumoxide.workspace = true
@@ -48,21 +49,18 @@ structopt.workspace = true
git-version.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
-toml.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
opentelemetry.workspace = true
opentelemetry-prometheus = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
-prometheus = { workspace = true, optional = true }
syslog-tracing = { workspace = true, optional = true }
[dev-dependencies]
-aws-config.workspace = true
+garage_api_common.workspace = true
+
aws-sdk-s3.workspace = true
chrono.workspace = true
http.workspace = true
@@ -84,7 +82,7 @@ k2v-client.workspace = true
[features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ]
-k2v = [ "garage_util/k2v", "garage_api/k2v" ]
+k2v = [ "garage_util/k2v", "garage_api_k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
@@ -95,7 +93,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).
-metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ]
+metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ]
# Exporter for the OpenTelemetry Collector.
telemetry-otlp = [ "opentelemetry-otlp" ]
# Logging to syslog
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index e2468143..3bbc2b86 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -4,9 +4,11 @@ mod key;
use std::collections::HashMap;
use std::fmt::Write;
+use std::future::Future;
use std::sync::Arc;
-use async_trait::async_trait;
+use futures::future::FutureExt;
+
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
@@ -144,7 +146,12 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in self.garage.system.get_known_nodes().iter() {
+ if node.is_up && !all_nodes.contains(&node.id) {
+ all_nodes.push(node.id);
+ }
+ }
for node in all_nodes.iter() {
let mut opt = opt.clone();
@@ -482,7 +489,7 @@ impl AdminRpcHandler {
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
- .await
+ .await?
}))
.await;
@@ -495,7 +502,11 @@ impl AdminRpcHandler {
ret.push(format!("{:?}\t{}", to, res_str));
}
- Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ if resps.iter().any(Result::is_err) {
+ Err(GarageError::Message(format_table_to_string(ret)).into())
+ } else {
+ Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ }
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
@@ -505,22 +516,25 @@ impl AdminRpcHandler {
}
}
-#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
+ fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
+ ) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
+ let self2 = self.clone();
+ async move {
+ match message {
+ AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
+ AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
}
+ .boxed()
}
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 2c5227d2..47883f97 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -1,3 +1,4 @@
+use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
@@ -93,17 +94,16 @@ pub async fn launch_online_repair(
// ----
-#[async_trait]
trait TableRepair: Send + Sync + 'static {
type T: TableSchema;
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
- async fn process(
+ fn process(
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
- ) -> Result<bool, Error>;
+ ) -> impl Future<Output = Result<bool, Error>> + Send;
}
struct TableRepairWorker<T: TableRepair> {
@@ -174,7 +174,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
struct RepairVersions;
-#[async_trait]
impl TableRepair for RepairVersions {
type T = VersionTable;
@@ -221,7 +220,6 @@ impl TableRepair for RepairVersions {
struct RepairBlockRefs;
-#[async_trait]
impl TableRepair for RepairBlockRefs {
type T = BlockRefTable;
@@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs {
struct RepairMpu;
-#[async_trait]
impl TableRepair for RepairMpu {
type T = MultipartUploadTable;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 65bf34db..1dc86fd3 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,13 +6,13 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_api::admin::api_server::AdminApiServer;
-use garage_api::s3::api_server::S3ApiServer;
+use garage_api_admin::api_server::AdminApiServer;
+use garage_api_s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::WebServer;
#[cfg(feature = "k2v")]
-use garage_api::k2v::api_server::K2VApiServer;
+use garage_api_k2v::api_server::K2VApiServer;
use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
@@ -113,7 +113,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
if let Some(web_config) = &config.s3_web {
info!("Initializing web server...");
- let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
+ let web_server = WebServer::new(garage.clone(), &web_config);
servers.push((
"Web",
tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())),
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 42368976..2db72e9f 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
use super::garage::{Instance, Key};
-use garage_api::signature;
+use garage_api_common::signature;
pub type Body = FullBody<hyper::body::Bytes>;
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index db23d316..da6c624b 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -13,7 +13,6 @@ static GARAGE_TEST_SECRET: &str =
#[derive(Debug, Default, Clone)]
pub struct Key {
- pub name: Option<String>,
pub id: String,
pub secret: String,
}
@@ -213,10 +212,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
- Key {
- name: maybe_name.map(String::from),
- ..key
- }
+ key
}
}
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 694be1f8..bbd09b19 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -29,12 +29,11 @@ tokio.workspace = true
# cli deps
clap = { workspace = true, optional = true }
format_table = { workspace = true, optional = true }
-tracing = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
[features]
-cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"]
+cli = ["clap", "tokio/fs", "tokio/io-std", "tracing-subscriber", "format_table"]
[lib]
path = "lib.rs"
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 12931a4c..b58ad43b 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -22,7 +22,6 @@ garage_util.workspace = true
garage_net.workspace = true
async-trait.workspace = true
-arc-swap.workspace = true
blake2.workspace = true
chrono.workspace = true
err-derive.workspace = true
@@ -38,9 +37,7 @@ serde.workspace = true
serde_bytes.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
-opentelemetry.workspace = true
[features]
default = [ "lmdb", "sqlite" ]
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 29e0bddd..11c0d90f 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -329,7 +329,7 @@ impl Garage {
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
let lock = self.bucket_lock.lock().await;
- helper::locked::LockedHelper(self, lock)
+ helper::locked::LockedHelper(self, Some(lock))
}
}
diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs
index 43f4f363..482e91b0 100644
--- a/src/model/helper/locked.rs
+++ b/src/model/helper/locked.rs
@@ -27,9 +27,16 @@ use crate::permission::BucketKeyPerm;
/// See issues: #649, #723
pub struct LockedHelper<'a>(
pub(crate) &'a Garage,
- pub(crate) tokio::sync::MutexGuard<'a, ()>,
+ pub(crate) Option<tokio::sync::MutexGuard<'a, ()>>,
);
+impl<'a> Drop for LockedHelper<'a> {
+ fn drop(&mut self) {
+ // make it explicit that the mutexguard lives until here
+ drop(self.1.take())
+ }
+}
+
#[allow(clippy::ptr_arg)]
impl<'a> LockedHelper<'a> {
pub fn bucket(&self) -> BucketHelper<'a> {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index a1bf6ee0..821f4549 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -10,7 +10,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
-use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
@@ -537,7 +536,6 @@ impl K2VRpcHandler {
}
}
-#[async_trait]
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 38212a1c..bb10ba48 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -395,13 +395,13 @@ fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
.expect("bad local midnight")
.timestamp_millis() as u64;
}
- midnight.timestamp_millis() as u64
+ midnight.and_utc().timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
- NaiveDateTime::from_timestamp_millis(ts as i64)
+ DateTime::<Utc>::from_timestamp_millis(ts as i64)
.expect("bad timestamp")
- .date()
+ .date_naive()
.succ_opt()
.expect("no next day")
}
diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml
index 686aaaea..c0b47a6e 100644
--- a/src/net/Cargo.toml
+++ b/src/net/Cargo.toml
@@ -30,7 +30,6 @@ rand.workspace = true
log.workspace = true
arc-swap.workspace = true
-async-trait.workspace = true
err-derive.workspace = true
bytes.workspace = true
cfg-if.workspace = true
diff --git a/src/net/client.rs b/src/net/client.rs
index 607dd173..20e1dacd 100644
--- a/src/net/client.rs
+++ b/src/net/client.rs
@@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex};
use std::task::Poll;
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use bytes::Bytes;
use log::{debug, error, trace};
@@ -220,7 +219,6 @@ impl ClientConn {
impl SendLoop for ClientConn {}
-#[async_trait]
impl RecvLoop for ClientConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
trace!("ClientConn recv_handler {}", id);
diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs
index 3cafafeb..d46acc42 100644
--- a/src/net/endpoint.rs
+++ b/src/net/endpoint.rs
@@ -1,8 +1,9 @@
+use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
+use futures::future::{BoxFuture, FutureExt};
use crate::error::Error;
use crate::message::*;
@@ -14,19 +15,17 @@ use crate::netapp::*;
/// attached to the response..
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
-#[async_trait]
pub trait StreamingEndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
+ fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send;
}
/// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
-#[async_trait]
impl<M: Message> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
panic!("This endpoint should not have a local handler.");
@@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () {
/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, in the cases where it doesn't
/// care about attached stream in the request nor in the response.
-#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
+ fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send;
}
-#[async_trait]
impl<T, M> StreamingEndpointHandler<M> for T
where
T: EndpointHandler<M>,
@@ -161,9 +158,8 @@ where
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
-#[async_trait]
pub(crate) trait GenericEndpoint {
- async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
+ fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
@@ -174,21 +170,23 @@ where
M: Message,
H: StreamingEndpointHandler<M>;
-#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
M: Message,
H: StreamingEndpointHandler<M> + 'static,
{
- async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
- match self.0.handler.load_full() {
- None => Err(Error::NoHandler),
- Some(h) => {
- let req = Req::from_enc(req_enc)?;
- let res = h.handle(req, from).await;
- Ok(res.into_enc()?)
+ fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> {
+ async move {
+ match self.0.handler.load_full() {
+ None => Err(Error::NoHandler),
+ Some(h) => {
+ let req = Req::from_enc(req_enc)?;
+ let res = h.handle(req, from).await;
+ Ok(res.into_enc()?)
+ }
}
}
+ .boxed()
}
fn drop_handler(&self) {
diff --git a/src/net/netapp.rs b/src/net/netapp.rs
index 77e55774..36c6fc88 100644
--- a/src/net/netapp.rs
+++ b/src/net/netapp.rs
@@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use log::{debug, error, info, trace, warn};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::auth;
@@ -457,7 +456,6 @@ impl NetApp {
}
}
-#[async_trait]
impl EndpointHandler<HelloMessage> for NetApp {
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
diff --git a/src/net/peering.rs b/src/net/peering.rs
index a8d271ec..08378a08 100644
--- a/src/net/peering.rs
+++ b/src/net/peering.rs
@@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
-use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
@@ -592,7 +591,6 @@ impl PeeringManager {
}
}
-#[async_trait]
impl EndpointHandler<PingMessage> for PeeringManager {
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
let ping_resp = PingMessage {
@@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager {
}
}
-#[async_trait]
impl EndpointHandler<PeerListMessage> for PeeringManager {
async fn handle(
self: &Arc<Self>,
diff --git a/src/net/recv.rs b/src/net/recv.rs
index 0de7bef2..35a6d71a 100644
--- a/src/net/recv.rs
+++ b/src/net/recv.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
use bytes::Bytes;
use log::*;
@@ -50,7 +49,6 @@ impl Drop for Sender {
/// according to the protocol defined above: chunks of message in progress of being
/// received are stored in a buffer, and when the last chunk of a message is received,
/// the full message is passed to the receive handler.
-#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}
diff --git a/src/net/send.rs b/src/net/send.rs
index 1454eeb7..6f1ac02c 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -3,7 +3,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use log::*;
@@ -273,7 +272,6 @@ impl DataFrame {
///
/// The `.send_loop()` exits when the sending end of the channel is closed,
/// or if there is an error at any time writing to the async writer.
-#[async_trait]
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
diff --git a/src/net/server.rs b/src/net/server.rs
index 36dccb2f..fb6c6366 100644
--- a/src/net/server.rs
+++ b/src/net/server.rs
@@ -3,7 +3,6 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
@@ -174,7 +173,6 @@ impl ServerConn {
impl SendLoop for ServerConn {}
-#[async_trait]
impl RecvLoop for ServerConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
let resp_send = match self.resp_send.load_full() {
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index acde0911..fcc1c304 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,12 +15,10 @@ path = "lib.rs"
[dependencies]
format_table.workspace = true
-garage_db.workspace = true
garage_util.workspace = true
garage_net.workspace = true
arc-swap.workspace = true
-bytes.workspace = true
bytesize.workspace = true
gethostname.workspace = true
hex.workspace = true
@@ -46,9 +44,7 @@ reqwest = { workspace = true, optional = true }
pnet_datalink.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
-tokio-stream.workspace = true
opentelemetry.workspace = true
[features]
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 44c826f9..c08a5629 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -219,6 +219,11 @@ impl LayoutHelper {
ret
}
+ pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let ver = self.current();
+ ver.nodes_of(position, ver.replication_factor).collect()
+ }
+
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index b8ca8120..2505c2ce 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -540,19 +540,73 @@ impl RpcHelper {
// ---- functions not related to MAKING RPCs, but just determining to what nodes
// they should be made and in which order ----
+ /// Determine to what nodes, and in what order, requests to read a data block
+ /// should be sent. All nodes in the Vec returned by this function are tried
+ /// one by one until there is one that returns the block (in block/manager.rs).
+ ///
+ /// We want to have the best chance of finding the block in as few requests
+ /// as possible, and we want to avoid nodes that answer slowly.
+ ///
+ /// Note that when there are several active layout versions, the block might
+ /// be stored only by nodes of the latest version (in case of a block that was
+ /// written after the layout change), or only by nodes of the oldest active
+ /// version (for all blocks that were written before). So we have to try nodes
+ /// of all layout versions. We also want to try nodes of all layout versions
+ /// fast, so as to optimize the chance of finding the block fast.
+ ///
+ /// Therefore, the strategy is the following:
+ ///
+ /// 1. ask first all nodes of all currently active layout versions
+ /// -> ask the preferred node in all layout versions (older to newer),
+ /// then the second preferred onde in all verions, etc.
+ /// -> we start by the oldest active layout version first, because a majority
+ /// of blocks will have been saved before the layout change
+ /// 2. ask all nodes of historical layout versions, for blocks which have not
+ /// yet been transferred to their new storage nodes
+ ///
+ /// The preference order, for each layout version, is given by `request_order`,
+ /// based on factors such as nodes being in the same datacenter,
+ /// having low ping, etc.
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let layout = self.0.layout.read().unwrap();
- let mut ret = Vec::with_capacity(12);
- let ver_iter = layout
- .versions()
- .iter()
- .rev()
- .chain(layout.inner().old_versions.iter().rev());
- for ver in ver_iter {
- if ver.version > layout.sync_map_min() {
- continue;
+ // Compute, for each layout version, the set of nodes that might store
+ // the block, and put them in their preferred order as of `request_order`.
+ let mut vernodes = layout.versions().iter().map(|ver| {
+ let nodes = ver.nodes_of(position, ver.replication_factor);
+ rpc_helper.request_order(layout.current(), nodes)
+ });
+
+ let mut ret = if layout.versions().len() == 1 {
+ // If we have only one active layout version, then these are the
+ // only nodes we ask in step 1
+ vernodes.next().unwrap()
+ } else {
+ let vernodes = vernodes.collect::<Vec<_>>();
+
+ let mut nodes = Vec::<Uuid>::with_capacity(12);
+ for i in 0..layout.current().replication_factor {
+ for vn in vernodes.iter() {
+ if let Some(n) = vn.get(i) {
+ if !nodes.contains(&n) {
+ if *n == self.0.our_node_id {
+ // it's always fast (almost free) to ask locally,
+ // so always put that as first choice
+ nodes.insert(0, *n);
+ } else {
+ nodes.push(*n);
+ }
+ }
+ }
+ }
}
+
+ nodes
+ };
+
+ // Second step: add nodes of older layout versions
+ let old_ver_iter = layout.inner().old_versions.iter().rev();
+ for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) {
@@ -560,6 +614,7 @@ impl RpcHelper {
}
}
}
+
ret
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 0fa68218..2a52ae5d 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
@@ -749,7 +748,6 @@ impl System {
}
}
-#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index e704cd3c..fad6ea08 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -22,7 +22,6 @@ opentelemetry.workspace = true
async-trait.workspace = true
arc-swap.workspace = true
-bytes.workspace = true
hex.workspace = true
hexdump.workspace = true
tracing.workspace = true
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 9e060390..28ea119d 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
}
}
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 234ee8ea..2d43b9fc 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
diff --git a/src/table/table.rs b/src/table/table.rs
index ea8471d0..c96f4731 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,7 +2,6 @@ use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
-use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -204,6 +203,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
entries_vec.push((write_sets, e_enc));
}
+ if entries_vec.is_empty() {
+ return Ok(());
+ }
+
// Compute a deduplicated list of all of the write sets,
// and compute an index from each node to the position of the sets in which
// it takes part, to optimize the detection of a quorum.
@@ -496,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
}
}
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index da3e39b8..fec5b1ed 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -20,9 +20,7 @@ garage_net.workspace = true
arc-swap.workspace = true
async-trait.workspace = true
blake2.workspace = true
-bytes.workspace = true
bytesize.workspace = true
-digest.workspace = true
err-derive.workspace = true
hexdump.workspace = true
xxhash-rust.workspace = true
diff --git a/src/util/config.rs b/src/util/config.rs
index b4e2b008..73fc4ff4 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -183,6 +183,9 @@ pub struct WebConfig {
pub bind_addr: UnixOrTCPSocketAddress,
/// Suffix to remove from domain name to find bucket
pub root_domain: String,
+ /// Whether to add the requested domain to exported Prometheus metrics
+ #[serde(default)]
+ pub add_host_to_metrics: bool,
}
/// Configuration for the admin and monitoring HTTP API
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index d810d6f9..a0a3e566 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -14,7 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api.workspace = true
+garage_api_common.workspace = true
+garage_api_s3.workspace = true
garage_model.workspace = true
garage_util.workspace = true
garage_table.workspace = true
@@ -23,12 +24,9 @@ err-derive.workspace = true
tracing.workspace = true
percent-encoding.workspace = true
-futures.workspace = true
-
http.workspace = true
http-body-util.workspace = true
hyper.workspace = true
-hyper-util.workspace = true
tokio.workspace = true
diff --git a/src/web/error.rs b/src/web/error.rs
index bd8f17b5..7e6d4542 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -2,14 +2,14 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
-use garage_api::generic_server::ApiError;
+use garage_api_common::generic_server::ApiError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)]
- ApiError(garage_api::s3::error::Error),
+ ApiError(garage_api_s3::error::Error),
/// The file does not exist
#[error(display = "Not found")]
@@ -22,10 +22,10 @@ pub enum Error {
impl<T> From<T> for Error
where
- garage_api::s3::error::Error: From<T>,
+ garage_api_s3::error::Error: From<T>,
{
fn from(err: T) -> Self {
- Error::ApiError(garage_api::s3::error::Error::from(err))
+ Error::ApiError(garage_api_s3::error::Error::from(err))
}
}
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 69939f65..e73dab48 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -20,17 +20,20 @@ use opentelemetry::{
use crate::error::*;
-use garage_api::generic_server::{server_loop, UnixListenerOn};
-use garage_api::helpers::*;
-use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
-use garage_api::s3::error::{
+use garage_api_common::cors::{
+ add_cors_headers, find_matching_cors_rule, handle_options_for_bucket,
+};
+use garage_api_common::generic_server::{server_loop, UnixListenerOn};
+use garage_api_common::helpers::*;
+use garage_api_s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
};
-use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
+use garage_api_s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::garage::Garage;
use garage_table::*;
+use garage_util::config::WebConfig;
use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers;
@@ -67,16 +70,18 @@ pub struct WebServer {
garage: Arc<Garage>,
metrics: Arc<WebMetrics>,
root_domain: String,
+ add_host_to_metrics: bool,
}
impl WebServer {
/// Run a web server
- pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
+ pub fn new(garage: Arc<Garage>, config: &WebConfig) -> Arc<Self> {
let metrics = Arc::new(WebMetrics::new());
Arc::new(WebServer {
garage,
metrics,
- root_domain,
+ root_domain: config.root_domain.clone(),
+ add_host_to_metrics: config.add_host_to_metrics,
})
}
@@ -118,18 +123,27 @@ impl WebServer {
req: Request<IncomingBody>,
addr: String,
) -> Result<Response<BoxBody<Error>>, http::Error> {
+ let host_header = req
+ .headers()
+ .get(HOST)
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or("<unknown>")
+ .to_string();
+
if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(req.headers())
{
+ // uri() below has a preceding '/', so no space with host
info!(
- "{} (via {}) {} {}",
+ "{} (via {}) {} {}{}",
forwarded_for_ip_addr,
addr,
req.method(),
+ host_header,
req.uri()
);
} else {
- info!("{} {} {}", addr, req.method(), req.uri());
+ info!("{} {} {}{}", addr, req.method(), host_header, req.uri());
}
// Lots of instrumentation
@@ -138,12 +152,16 @@ impl WebServer {
.span_builder(format!("Web {} request", req.method()))
.with_trace_id(gen_trace_id())
.with_attributes(vec![
+ KeyValue::new("host", format!("{}", host_header.clone())),
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
- let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
+ let mut metrics_tags = vec![KeyValue::new("method", req.method().to_string())];
+ if self.add_host_to_metrics {
+ metrics_tags.push(KeyValue::new("host", host_header.clone()));
+ }
// The actual handler
let res = self
@@ -158,25 +176,30 @@ impl WebServer {
// Returning the result
match res {
Ok(res) => {
- debug!("{} {} {}", req.method(), res.status(), req.uri());
+ debug!(
+ "{} {} {}{}",
+ req.method(),
+ res.status(),
+ host_header,
+ req.uri()
+ );
Ok(res
.map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from))))
}
Err(error) => {
info!(
- "{} {} {} {}",
+ "{} {} {}{} {}",
req.method(),
error.http_status_code(),
+ host_header,
req.uri(),
error
);
- self.metrics.error_counter.add(
- 1,
- &[
- metrics_tags[0].clone(),
- KeyValue::new("status_code", error.http_status_code().to_string()),
- ],
- );
+ metrics_tags.push(KeyValue::new(
+ "status_code",
+ error.http_status_code().to_string(),
+ ));
+ self.metrics.error_counter.add(1, &metrics_tags);
Ok(error_to_res(error))
}
}