aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-13 11:24:56 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-13 11:36:28 +0100
commitcf2af186fcc0c8f581a966454b6cd4720d3821f0 (patch)
tree37a978ba9ffb780fc828cff7b8ec93662d50884f /src
parentdb48dd3d6c1f9e86a62e9b8edfce2c1620bcd5f3 (diff)
parent823078b4cdaf93e09de0847c5eaa75beb7b26b7f (diff)
downloadgarage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.tar.gz
garage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml72
-rw-r--r--src/api/admin/api_server.rs37
-rw-r--r--src/api/admin/bucket.rs41
-rw-r--r--src/api/admin/cluster.rs31
-rw-r--r--src/api/admin/error.rs23
-rw-r--r--src/api/admin/key.rs36
-rw-r--r--src/api/common_error.rs23
-rw-r--r--src/api/generic_server.rs216
-rw-r--r--src/api/helpers.rs68
-rw-r--r--src/api/k2v/api_server.rs22
-rw-r--r--src/api/k2v/batch.rs31
-rw-r--r--src/api/k2v/error.rs32
-rw-r--r--src/api/k2v/index.rs7
-rw-r--r--src/api/k2v/item.rs47
-rw-r--r--src/api/s3/api_server.rs55
-rw-r--r--src/api/s3/bucket.rs32
-rw-r--r--src/api/s3/copy.rs21
-rw-r--r--src/api/s3/cors.rs52
-rw-r--r--src/api/s3/delete.rs17
-rw-r--r--src/api/s3/error.rs42
-rw-r--r--src/api/s3/get.rs168
-rw-r--r--src/api/s3/lifecycle.rs23
-rw-r--r--src/api/s3/list.rs17
-rw-r--r--src/api/s3/multipart.rs33
-rw-r--r--src/api/s3/post_object.rs36
-rw-r--r--src/api/s3/put.rs22
-rw-r--r--src/api/s3/router.rs29
-rw-r--r--src/api/s3/website.rs23
-rw-r--r--src/api/signature/error.rs4
-rw-r--r--src/api/signature/payload.rs8
-rw-r--r--src/api/signature/streaming.rs34
-rw-r--r--src/block/Cargo.toml40
-rw-r--r--src/block/manager.rs8
-rw-r--r--src/db/Cargo.toml19
-rw-r--r--src/db/lib.rs42
-rw-r--r--src/garage/Cargo.toml83
-rw-r--r--src/garage/admin/block.rs47
-rw-r--r--src/garage/cli/convert_db.rs62
-rw-r--r--src/garage/cli/init.rs2
-rw-r--r--src/garage/cli/structs.rs8
-rw-r--r--src/garage/main.rs64
-rw-r--r--src/garage/repair/offline.rs4
-rw-r--r--src/garage/secrets.rs320
-rw-r--r--src/garage/server.rs29
-rw-r--r--src/garage/tests/common/client.rs2
-rw-r--r--src/garage/tests/common/custom_requester.rs38
-rw-r--r--src/garage/tests/k2v/batch.rs6
-rw-r--r--src/garage/tests/k2v/item.rs21
-rw-r--r--src/garage/tests/k2v/poll.rs15
-rw-r--r--src/garage/tests/k2v/simple.rs8
-rw-r--r--src/garage/tests/lib.rs18
-rw-r--r--src/garage/tests/s3/list.rs60
-rw-r--r--src/garage/tests/s3/multipart.rs30
-rw-r--r--src/garage/tests/s3/objects.rs48
-rw-r--r--src/garage/tests/s3/streaming_signature.rs16
-rw-r--r--src/garage/tests/s3/website.rs123
-rw-r--r--src/k2v-client/Cargo.toml35
-rw-r--r--src/k2v-client/error.rs4
-rw-r--r--src/k2v-client/lib.rs60
-rw-r--r--src/model/Cargo.toml40
-rw-r--r--src/model/helper/bucket.rs14
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/rpc/Cargo.toml54
-rw-r--r--src/rpc/consul.rs2
-rw-r--r--src/rpc/system.rs2
-rw-r--r--src/table/Cargo.toml26
-rw-r--r--src/util/Cargo.toml64
-rw-r--r--src/util/config.rs161
-rw-r--r--src/web/Cargo.toml19
-rw-r--r--src/web/web_server.rs134
70 files changed, 1866 insertions, 1168 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 15bf757e..9fb562a3 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -20,44 +20,46 @@ garage_block.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
-async-trait = "0.1.7"
-base64 = "0.21"
-bytes = "1.0"
-chrono = "0.4"
-crypto-common = "0.1"
-err-derive = "0.3"
-hex = "0.4"
-hmac = "0.12"
-idna = "0.4"
-tracing = "0.1"
-md-5 = "0.10"
-nom = "7.1"
-sha2 = "0.10"
+async-trait.workspace = true
+base64.workspace = true
+bytes.workspace = true
+chrono.workspace = true
+crypto-common.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+hmac.workspace = true
+idna.workspace = true
+tracing.workspace = true
+md-5.workspace = true
+nom.workspace = true
+pin-project.workspace = true
+sha2.workspace = true
-futures = "0.3"
-futures-util = "0.3"
-pin-project = "1.0.12"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-stream = "0.1"
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+tokio-stream.workspace = true
-form_urlencoded = "1.0.0"
-http = "0.2"
-httpdate = "1.0"
-http-range = "0.1"
-hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
-hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
-multer = "2.0"
-percent-encoding = "2.1.0"
-roxmltree = "0.18"
-serde = { version = "1.0", features = ["derive"] }
-serde_bytes = "0.11"
-serde_json = "1.0"
-quick-xml = { version = "0.26", features = [ "serialize" ] }
-url = "2.3"
+form_urlencoded.workspace = true
+http.workspace = true
+httpdate.workspace = true
+http-range.workspace = true
+http-body-util.workspace = true
+hyper.workspace = true
+hyper-util.workspace = true
+multer.workspace = true
+percent-encoding.workspace = true
+roxmltree.workspace = true
+url.workspace = true
-opentelemetry = "0.17"
-opentelemetry-prometheus = { version = "0.10", optional = true }
-prometheus = { version = "0.13", optional = true }
+serde.workspace = true
+serde_bytes.workspace = true
+serde_json.workspace = true
+quick-xml.workspace = true
+
+opentelemetry.workspace = true
+opentelemetry-prometheus = { workspace = true, optional = true }
+prometheus = { workspace = true, optional = true }
[features]
k2v = [ "garage_util/k2v", "garage_model/k2v" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 41a5e68c..2b9be24e 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -3,9 +3,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
@@ -27,7 +27,9 @@ use crate::admin::error::*;
use crate::admin::key::*;
use crate::admin::router_v0;
use crate::admin::router_v1::{Authorization, Endpoint};
-use crate::helpers::host_to_bucket;
+use crate::helpers::*;
+
+pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
@@ -63,24 +65,27 @@ impl AdminApiServer {
pub async fn run(
self,
bind_addr: UnixOrTCPSocketAddress,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
- .run_server(bind_addr, Some(0o220), shutdown_signal)
+ .run_server(bind_addr, Some(0o220), must_exit)
.await
}
- fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.header(ALLOW, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .body(Body::empty())?)
+ .body(empty_body())?)
}
- async fn handle_check_domain(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
+ async fn handle_check_domain(
+ &self,
+ req: Request<IncomingBody>,
+ ) -> Result<Response<ResBody>, Error> {
let query_params: HashMap<String, String> = req
.uri()
.query()
@@ -104,7 +109,7 @@ impl AdminApiServer {
if self.check_domain(domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::from(format!(
+ .body(string_body(format!(
"Domain '{domain}' is managed by Garage"
)))?)
} else {
@@ -167,7 +172,7 @@ impl AdminApiServer {
}
}
- fn handle_health(&self) -> Result<Response<Body>, Error> {
+ fn handle_health(&self) -> Result<Response<ResBody>, Error> {
let health = self.garage.system.health();
let (status, status_str) = match health.status {
@@ -189,10 +194,10 @@ impl AdminApiServer {
Ok(Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
- .body(Body::from(status_str))?)
+ .body(string_body(status_str))?)
}
- fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
#[cfg(feature = "metrics")]
{
use opentelemetry::trace::Tracer;
@@ -212,7 +217,7 @@ impl AdminApiServer {
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
- .body(Body::from(buffer))?)
+ .body(bytes_body(buffer.into()))?)
}
#[cfg(not(feature = "metrics"))]
Err(Error::bad_request(
@@ -229,7 +234,7 @@ impl ApiHandler for AdminApiServer {
type Endpoint = Endpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
Endpoint::from_v0(endpoint_v0)
@@ -240,9 +245,9 @@ impl ApiHandler for AdminApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: Endpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let expected_auth_header =
match endpoint.authorization_type() {
Authorization::None => None,
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 65929d61..a8718a9f 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
@@ -17,12 +17,13 @@ use garage_model::permission::*;
use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
use crate::admin::key::ApiBucketKeyPerm;
use crate::common_error::CommonError;
-use crate::helpers::{json_ok_response, parse_json_body};
+use crate::helpers::*;
-pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let buckets = garage
.bucket_table
.get_range(
@@ -90,7 +91,7 @@ pub async fn handle_get_bucket_info(
garage: &Arc<Garage>,
id: Option<String>,
global_alias: Option<String>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = match (id, global_alias) {
(Some(id), None) => parse_bucket_id(&id)?,
(None, Some(ga)) => garage
@@ -111,7 +112,7 @@ pub async fn handle_get_bucket_info(
async fn bucket_info_results(
garage: &Arc<Garage>,
bucket_id: Uuid,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
@@ -268,9 +269,9 @@ struct GetBucketInfoKey {
pub async fn handle_create_bucket(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<CreateBucketRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
if let Some(ga) = &req.global_alias {
if !is_valid_bucket_name(ga) {
@@ -360,7 +361,7 @@ struct CreateBucketLocalAlias {
pub async fn handle_delete_bucket(
garage: &Arc<Garage>,
id: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let helper = garage.bucket_helper();
let bucket_id = parse_bucket_id(&id)?;
@@ -403,15 +404,15 @@ pub async fn handle_delete_bucket(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_update_bucket(
garage: &Arc<Garage>,
id: String,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<UpdateBucketRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage
@@ -470,10 +471,10 @@ struct UpdateBucketWebsiteAccess {
pub async fn handle_bucket_change_key_perm(
garage: &Arc<Garage>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
new_perm_flag: bool,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&req.bucket_id)?;
@@ -526,7 +527,7 @@ pub async fn handle_global_alias_bucket(
garage: &Arc<Garage>,
bucket_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -541,7 +542,7 @@ pub async fn handle_global_unalias_bucket(
garage: &Arc<Garage>,
bucket_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -557,7 +558,7 @@ pub async fn handle_local_alias_bucket(
bucket_id: String,
access_key_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
@@ -573,7 +574,7 @@ pub async fn handle_local_unalias_bucket(
bucket_id: String,
access_key_id: String,
alias: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 8677257d..8ce6c5ed 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
-use hyper::{Body, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
@@ -12,10 +12,11 @@ use garage_rpc::layout;
use garage_model::garage::Garage;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
use crate::helpers::{json_ok_response, parse_json_body};
-pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@@ -110,7 +111,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
Ok(json_ok_response(&res)?)
}
-pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = ClusterHealth {
@@ -132,9 +133,9 @@ pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<
pub async fn handle_connect_cluster_nodes(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<Vec<String>>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<Vec<String>, _, Error>(req).await?;
let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
.await
@@ -154,7 +155,7 @@ pub async fn handle_connect_cluster_nodes(
Ok(json_ok_response(&res)?)
}
-pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let res = format_cluster_layout(&garage.system.cluster_layout());
Ok(json_ok_response(&res)?)
@@ -290,9 +291,9 @@ struct NodeResp {
pub async fn handle_update_cluster_layout(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
let mut layout = garage.system.cluster_layout().clone();
@@ -336,9 +337,9 @@ pub async fn handle_update_cluster_layout(
pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?;
let layout = garage.system.cluster_layout().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
@@ -356,7 +357,9 @@ pub async fn handle_apply_cluster_layout(
Ok(json_ok_response(&res)?)
}
-pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_revert_cluster_layout(
+ garage: &Arc<Garage>,
+) -> Result<Response<ResBody>, Error> {
let layout = garage.system.cluster_layout().clone();
let layout = layout.revert_staged_changes()?;
garage
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index ed1a07bd..2668b42d 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -1,13 +1,13 @@
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
+use hyper::{HeaderMap, StatusCode};
pub use garage_model::helper::error::Error as HelperError;
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
-use crate::helpers::CustomApiErrorBody;
+use crate::helpers::*;
/// Errors of this crate
#[derive(Debug, Error)]
@@ -40,18 +40,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
- }
- }
-}
-
impl Error {
fn code(&self) -> &'static str {
match self {
@@ -77,14 +65,14 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody {
code: self.code().to_string(),
message: format!("{}", self),
path: path.to_string(),
region: garage_region.to_string(),
};
- Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
r#"
{
"code": "InternalError",
@@ -92,6 +80,7 @@ impl ApiError for Error {
}
"#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 8d1c6890..1efaca16 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_table::*;
@@ -9,10 +9,11 @@ use garage_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use crate::admin::api_server::ResBody;
use crate::admin::error::*;
-use crate::helpers::{is_default, json_ok_response, parse_json_body};
+use crate::helpers::*;
-pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
+pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let res = garage
.key_table
.get_range(
@@ -45,7 +46,7 @@ pub async fn handle_get_key_info(
id: Option<String>,
search: Option<String>,
show_secret_key: bool,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let key = if let Some(id) = id {
garage.key_helper().get_existing_key(&id).await?
} else if let Some(search) = search {
@@ -62,9 +63,9 @@ pub async fn handle_get_key_info(
pub async fn handle_create_key(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<CreateKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@@ -80,9 +81,9 @@ struct CreateKeyRequest {
pub async fn handle_import_key(
garage: &Arc<Garage>,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<ImportKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
if prev_key.is_some() {
@@ -111,9 +112,9 @@ struct ImportKeyRequest {
pub async fn handle_update_key(
garage: &Arc<Garage>,
id: String,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<UpdateKeyRequest>(req).await?;
+ req: Request<IncomingBody>,
+) -> Result<Response<ResBody>, Error> {
+ let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?;
let mut key = garage.key_helper().get_existing_key(&id).await?;
@@ -146,7 +147,10 @@ struct UpdateKeyRequest {
deny: Option<KeyPerm>,
}
-pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> {
+pub async fn handle_delete_key(
+ garage: &Arc<Garage>,
+ id: String,
+) -> Result<Response<ResBody>, Error> {
let mut key = garage.key_helper().get_existing_key(&id).await?;
key.state.as_option().unwrap();
@@ -155,14 +159,14 @@ pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Respo
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
async fn key_info_results(
garage: &Arc<Garage>,
key: Key,
show_secret: bool,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let mut relevant_buckets = HashMap::new();
let key_state = key.state.as_option().unwrap();
diff --git a/src/api/common_error.rs b/src/api/common_error.rs
index ecb22fd8..c47555d4 100644
--- a/src/api/common_error.rs
+++ b/src/api/common_error.rs
@@ -3,6 +3,8 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
+use garage_model::helper::error::Error as HelperError;
+
/// Errors of this crate
#[derive(Debug, Error)]
pub enum CommonError {
@@ -28,6 +30,10 @@ pub enum CommonError {
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
// ---- SPECIFIC ERROR CONDITIONS ----
// These have to be error codes referenced in the S3 spec here:
// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
@@ -62,7 +68,9 @@ impl CommonError {
CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
- CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST,
+ CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => {
+ StatusCode::BAD_REQUEST
+ }
}
}
@@ -80,6 +88,7 @@ impl CommonError {
CommonError::BucketAlreadyExists => "BucketAlreadyExists",
CommonError::BucketNotEmpty => "BucketNotEmpty",
CommonError::InvalidBucketName(_) => "InvalidBucketName",
+ CommonError::InvalidHeader(_) => "InvalidHeaderValue",
}
}
@@ -88,6 +97,18 @@ impl CommonError {
}
}
+impl From<HelperError> for CommonError {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::InternalError(i),
+ HelperError::BadRequest(b) => Self::BadRequest(b),
+ HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n),
+ HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n),
+ e => Self::bad_request(format!("{}", e)),
+ }
+ }
+}
+
pub trait CommonErrorDerivative: From<CommonError> {
fn internal_error<M: ToString>(msg: M) -> Self {
Self::from(CommonError::InternalError(GarageError::Message(
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index fa346f48..9c49fdf3 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -1,3 +1,4 @@
+use std::convert::Infallible;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
@@ -5,16 +6,19 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::future::Future;
+use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
+use http_body_util::BodyExt;
use hyper::header::HeaderValue;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Request, Response, Server};
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{body::Incoming as IncomingBody, Request, Response};
use hyper::{HeaderMap, StatusCode};
+use hyper_util::rt::TokioIo;
-use hyperlocal::UnixServerExt;
-
-use tokio::net::UnixStream;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
+use tokio::sync::watch;
use opentelemetry::{
global,
@@ -28,6 +32,8 @@ use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
+use crate::helpers::{BoxBody, ErrorBody};
+
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn add_span_attributes(&self, span: SpanRef<'_>);
@@ -36,7 +42,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static {
pub trait ApiError: std::error::Error + Send + Sync + 'static {
fn http_status_code(&self) -> StatusCode;
fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
- fn http_body(&self, garage_region: &str, path: &str) -> Body;
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
}
#[async_trait]
@@ -47,12 +53,12 @@ pub(crate) trait ApiHandler: Send + Sync + 'static {
type Endpoint: ApiEndpoint;
type Error: ApiError;
- fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>;
+ fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: Self::Endpoint,
- ) -> Result<Response<Body>, Self::Error>;
+ ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>;
}
pub(crate) struct ApiServer<A: ApiHandler> {
@@ -99,74 +105,42 @@ impl<A: ApiHandler> ApiServer<A> {
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
unix_bind_addr_mode: Option<u32>,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
- let tcp_service = make_service_fn(|conn: &AddrStream| {
- let this = self.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let this = this.clone();
-
- this.handler(req, client_addr.to_string())
- }))
- }
- });
-
- let unix_service = make_service_fn(|_: &UnixStream| {
- let this = self.clone();
-
- let path = bind_addr.to_string();
- async move {
- Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
- let this = this.clone();
-
- this.handler(req, path.clone())
- }))
- }
- });
-
- info!(
- "{} API server listening on {}",
- A::API_NAME_DISPLAY,
- bind_addr
- );
+ let server_name = format!("{} API", A::API_NAME_DISPLAY);
+ info!("{} server listening on {}", server_name, bind_addr);
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
- Server::bind(&addr)
- .serve(tcp_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?
+ let listener = TcpListener::bind(addr).await?;
+
+ let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
fs::remove_file(path)?
}
- let bound = Server::bind_unix(path)?;
+ let listener = UnixListener::bind(path)?;
+ let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(
path,
Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
)?;
- bound
- .serve(unix_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?;
+ let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
- };
-
- Ok(())
+ }
}
async fn handler(
self: Arc<Self>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
addr: String,
- ) -> Result<Response<Body>, GarageError> {
+ ) -> Result<Response<BoxBody<A::Error>>, http::Error> {
let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) =
@@ -205,7 +179,7 @@ impl<A: ApiHandler> ApiServer<A> {
Ok(x)
}
Err(e) => {
- let body: Body = e.http_body(&self.region, uri.path());
+ let body = e.http_body(&self.region, uri.path());
let mut http_error_builder = Response::builder().status(e.http_status_code());
if let Some(header_map) = http_error_builder.headers_mut() {
@@ -219,12 +193,16 @@ impl<A: ApiHandler> ApiServer<A> {
} else {
info!("Response: error {}, {}", e.http_status_code(), e);
}
- Ok(http_error)
+ Ok(http_error
+ .map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!()))))
}
}
}
- async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> {
+ async fn handler_stage2(
+ &self,
+ req: Request<IncomingBody>,
+ ) -> Result<Response<BoxBody<A::Error>>, A::Error> {
let endpoint = self.api_handler.parse_endpoint(&req)?;
debug!("Endpoint: {}", endpoint.name());
@@ -265,3 +243,123 @@ impl<A: ApiHandler> ApiServer<A> {
res
}
}
+
+// ==== helper functions ====
+
+#[async_trait]
+pub trait Accept: Send + Sync + 'static {
+ type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
+}
+
+#[async_trait]
+impl Accept for TcpListener {
+ type Stream = TcpStream;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
+ self.accept()
+ .await
+ .map(|(stream, addr)| (stream, addr.to_string()))
+ }
+}
+
+pub struct UnixListenerOn(pub UnixListener, pub String);
+
+#[async_trait]
+impl Accept for UnixListenerOn {
+ type Stream = UnixStream;
+ async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
+ self.0
+ .accept()
+ .await
+ .map(|(stream, _addr)| (stream, self.1.clone()))
+ }
+}
+
+pub async fn server_loop<A, H, F, E>(
+ server_name: String,
+ listener: A,
+ handler: H,
+ mut must_exit: watch::Receiver<bool>,
+) -> Result<(), GarageError>
+where
+ A: Accept,
+ H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static,
+ F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
+ E: Send + Sync + std::error::Error + 'static,
+{
+ let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
+ let connection_collector = tokio::spawn({
+ let server_name = server_name.clone();
+ async move {
+ let mut connections = FuturesUnordered::new();
+ loop {
+ let collect_next = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ tokio::select! {
+ result = collect_next => {
+ trace!("{} server: HTTP connection finished: {:?}", server_name, result);
+ }
+ new_fut = conn_out.recv() => {
+ match new_fut {
+ Some(f) => connections.push(f),
+ None => break,
+ }
+ }
+ }
+ }
+ if !connections.is_empty() {
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ while let Some(conn_res) = connections.next().await {
+ trace!(
+ "{} server: HTTP connection finished: {:?}",
+ server_name,
+ conn_res
+ );
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ }
+ }
+ }
+ });
+
+ while !*must_exit.borrow() {
+ let (stream, client_addr) = tokio::select! {
+ acc = listener.accept() => acc?,
+ _ = must_exit.changed() => continue,
+ };
+
+ let io = TokioIo::new(stream);
+
+ let handler = handler.clone();
+ let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone());
+
+ let fut = tokio::task::spawn(async move {
+ let io = Box::pin(io);
+ if let Err(e) = http1::Builder::new()
+ .serve_connection(io, service_fn(serve))
+ .await
+ {
+ debug!("Error handling HTTP connection: {}", e);
+ }
+ });
+ conn_in.send(fut)?;
+ }
+
+ info!("{} server exiting", server_name);
+ drop(conn_in);
+ connection_collector.await?;
+
+ Ok(())
+}
diff --git a/src/api/helpers.rs b/src/api/helpers.rs
index 1d55ebd5..5f488912 100644
--- a/src/api/helpers.rs
+++ b/src/api/helpers.rs
@@ -1,7 +1,17 @@
-use hyper::{Body, Request, Response};
+use std::convert::Infallible;
+
+use futures::{Stream, StreamExt, TryStreamExt};
+
+use http_body_util::{BodyExt, Full as FullBody};
+use hyper::{
+ body::{Body, Bytes},
+ Request, Response,
+};
use idna::domain_to_unicode;
use serde::{Deserialize, Serialize};
+use garage_util::error::Error as GarageError;
+
use crate::common_error::{CommonError as Error, *};
/// What kind of authorization is required to perform a given action
@@ -138,18 +148,64 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None
}
-pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+// =============== body helpers =================
+
+pub type EmptyBody = http_body_util::Empty<bytes::Bytes>;
+pub type ErrorBody = FullBody<bytes::Bytes>;
+pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>;
+
+pub fn string_body<E>(s: String) -> BoxBody<E> {
+ bytes_body(bytes::Bytes::from(s.into_bytes()))
+}
+pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> {
+ BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!()))
+}
+pub fn empty_body<E>() -> BoxBody<E> {
+ BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!()))
+}
+pub fn error_body(s: String) -> ErrorBody {
+ ErrorBody::from(bytes::Bytes::from(s.into_bytes()))
+}
+
+pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E>
+where
+ T: for<'de> Deserialize<'de>,
+ B: Body,
+ E: From<<B as Body>::Error> + From<Error>,
+{
+ let body = req.into_body().collect().await?.to_bytes();
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
-pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> {
- let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?;
+pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E>
+where
+ E: From<Error>,
+{
+ let resp_json = serde_json::to_string_pretty(res)
+ .map_err(GarageError::from)
+ .map_err(Error::from)?;
Ok(Response::builder()
.status(hyper::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
- .body(Body::from(resp_json))?)
+ .body(string_body(resp_json))
+ .unwrap())
+}
+
+pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
+where
+ B: Body<Data = Bytes>,
+ <B as Body>::Error: Into<E>,
+ E: From<Error>,
+{
+ let stream = http_body_util::BodyStream::new(body);
+ let stream = TryStreamExt::map_err(stream, Into::into);
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| E::from(Error::bad_request("non-data frame")))
+ })
+ })
}
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 3a032aba..e97da2af 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -2,8 +2,8 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
-use hyper::{Body, Method, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -25,6 +25,9 @@ use crate::k2v::item::*;
use crate::k2v::router::Endpoint;
use crate::s3::cors::*;
+pub use crate::signature::streaming::ReqBody;
+pub type ResBody = BoxBody<Error>;
+
pub struct K2VApiServer {
garage: Arc<Garage>,
}
@@ -39,10 +42,10 @@ impl K2VApiServer {
garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, K2VApiServer { garage })
- .run_server(bind_addr, None, shutdown_signal)
+ .run_server(bind_addr, None, must_exit)
.await
}
}
@@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer {
type Endpoint = K2VApiEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
Ok(K2VApiEndpoint {
@@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: K2VApiEndpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let K2VApiEndpoint {
bucket_name,
endpoint,
@@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
- return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ let options_res = handle_options_api(garage, &req, Some(bucket_name))
.await
- .ok_or_bad_request("Error handling OPTIONS")?);
+ .ok_or_bad_request("Error handling OPTIONS")?;
+ return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 294380ea..ae2778b1 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use base64::prelude::*;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
@@ -13,15 +13,16 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
use crate::helpers::*;
+use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -41,15 +42,15 @@ pub async fn handle_insert_batch(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_read_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -139,9 +140,9 @@ async fn handle_read_batch_query(
pub async fn handle_delete_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
- let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range(
garage: Arc<Garage>,
bucket_id: Uuid,
partition_key: &str,
- req: Request<Body>,
-) -> Result<Response<Body>, Error> {
+ req: Request<ReqBody>,
+) -> Result<Response<ResBody>, Error> {
use garage_model::k2v::sub::PollRange;
- let query = parse_json_body::<PollRangeQuery>(req).await?;
+ let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
@@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range(
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
index 4eb017ab..16479227 100644
--- a/src/api/k2v/error.rs
+++ b/src/api/k2v/error.rs
@@ -1,13 +1,11 @@
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
-
-use garage_model::helper::error::Error as HelperError;
+use hyper::{HeaderMap, StatusCode};
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
-use crate::helpers::CustomApiErrorBody;
+use crate::helpers::*;
use crate::signature::error::Error as SignatureError;
/// Errors of this crate
@@ -30,10 +28,6 @@ pub enum Error {
#[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError),
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client asked for an invalid return format (invalid Accept header)
#[error(display = "Not acceptable: {}", _0)]
NotAcceptable(String),
@@ -54,18 +48,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- e => Self::Common(CommonError::BadRequest(format!("{}", e))),
- }
- }
-}
-
impl From<SignatureError> for Error {
fn from(err: SignatureError) -> Self {
match err {
@@ -74,7 +56,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
- SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
@@ -90,7 +71,6 @@ impl Error {
Error::NotAcceptable(_) => "NotAcceptable",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::InvalidBase64(_) => "InvalidBase64",
- Error::InvalidHeader(_) => "InvalidHeaderValue",
Error::InvalidUtf8Str(_) => "InvalidUtf8String",
}
}
@@ -105,7 +85,6 @@ impl ApiError for Error {
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
Error::AuthorizationHeaderMalformed(_)
| Error::InvalidBase64(_)
- | Error::InvalidHeader(_)
| Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
}
}
@@ -115,14 +94,14 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody {
code: self.code().to_string(),
message: format!("{}", self),
path: path.to_string(),
region: garage_region.to_string(),
};
- Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
r#"
{
"code": "InternalError",
@@ -130,6 +109,7 @@ impl ApiError for Error {
}
"#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index e8cd1fba..291464ab 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use hyper::{Body, Response};
+use hyper::Response;
use serde::Serialize;
use garage_util::data::*;
@@ -11,6 +11,7 @@ use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
+use crate::k2v::api_server::ResBody;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@@ -22,7 +23,7 @@ pub async fn handle_read_index(
end: Option<String>,
limit: Option<u64>,
reverse: Option<bool>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let reverse = reverse.unwrap_or(false);
let node_id_vec = garage
@@ -71,7 +72,7 @@ pub async fn handle_read_index(
next_start,
};
- Ok(json_ok_response(&resp)?)
+ json_ok_response::<Error, _>(&resp)
}
#[derive(Serialize)]
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index e13a0f30..0c5931a1 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use http::header;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@@ -11,6 +11,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
+use crate::helpers::*;
+use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
@@ -22,7 +24,7 @@ pub enum ReturnFormat {
}
impl ReturnFormat {
- pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> {
let accept = match req.headers().get(header::ACCEPT) {
Some(a) => a.to_str()?,
None => return Ok(Self::Json),
@@ -40,7 +42,7 @@ impl ReturnFormat {
}
}
- pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> {
let vals = item.values();
if vals.is_empty() {
@@ -52,7 +54,7 @@ impl ReturnFormat {
Self::Binary if vals.len() > 1 => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.status(StatusCode::CONFLICT)
- .body(Body::empty())?),
+ .body(empty_body())?),
Self::Binary => {
assert!(vals.len() == 1);
Self::make_binary_response(ct, vals[0])
@@ -62,22 +64,22 @@ impl ReturnFormat {
}
}
- fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> {
match v {
DvvsValue::Deleted => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?),
+ .body(empty_body())?),
DvvsValue::Value(v) => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::OK)
- .body(Body::from(v.to_vec()))?),
+ .body(bytes_body(v.to_vec().into()))?),
}
}
- fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> {
let items = v
.iter()
.map(|v| match v {
@@ -91,7 +93,7 @@ impl ReturnFormat {
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/json")
.status(StatusCode::OK)
- .body(Body::from(json_body))?)
+ .body(string_body(json_body))?)
}
}
@@ -99,11 +101,11 @@ impl ReturnFormat {
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let item = garage
@@ -124,11 +126,11 @@ pub async fn handle_read_item(
pub async fn handle_insert_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -137,7 +139,10 @@ pub async fn handle_insert_item(
.map(CausalContext::parse_helper)
.transpose()?;
- let body = hyper::body::to_bytes(req.into_body()).await?;
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
+
let value = DvvsValue::Value(body.to_vec());
garage
@@ -154,16 +159,16 @@ pub async fn handle_insert_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_delete_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -188,20 +193,20 @@ pub async fn handle_delete_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let causal_context =
@@ -226,6 +231,6 @@ pub async fn handle_poll_item(
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 887839dd..7fac6261 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use hyper::header;
-use hyper::{Body, Request, Response};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -34,6 +34,9 @@ use crate::s3::put::*;
use crate::s3::router::Endpoint;
use crate::s3::website::*;
+pub use crate::signature::streaming::ReqBody;
+pub type ResBody = BoxBody<Error>;
+
pub struct S3ApiServer {
garage: Arc<Garage>,
}
@@ -48,19 +51,19 @@ impl S3ApiServer {
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, S3ApiServer { garage })
- .run_server(addr, None, shutdown_signal)
+ .run_server(addr, None, must_exit)
.await
}
async fn handle_request_without_bucket(
&self,
- _req: Request<Body>,
+ _req: Request<ReqBody>,
api_key: Key,
endpoint: Endpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
match endpoint {
Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
@@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer {
type Endpoint = S3ApiEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> {
let authority = req
.headers()
.get(header::HOST)
@@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer {
async fn handle(
&self,
- req: Request<Body>,
+ req: Request<IncomingBody>,
endpoint: S3ApiEndpoint,
- ) -> Result<Response<Body>, Error> {
+ ) -> Result<Response<ResBody>, Error> {
let S3ApiEndpoint {
bucket_name,
endpoint,
@@ -118,7 +121,8 @@ impl ApiHandler for S3ApiServer {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, bucket_name).await;
+ let options_res = handle_options_api(garage, &req, bucket_name).await?;
+ return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
@@ -174,8 +178,26 @@ impl ApiHandler for S3ApiServer {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
- key, part_number, ..
- } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ key,
+ part_number,
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ ..
+ } => {
+ let overrides = GetObjectOverrides {
+ response_cache_control,
+ response_content_disposition,
+ response_content_encoding,
+ response_content_language,
+ response_content_type,
+ response_expires,
+ };
+ handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
+ }
Endpoint::UploadPart {
key,
part_number,
@@ -235,8 +257,7 @@ impl ApiHandler for S3ApiServer {
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
- let empty_body: Body = Body::from(vec![]);
- let response = Response::builder().body(empty_body).unwrap();
+ let response = Response::builder().body(empty_body()).unwrap();
Ok(response)
}
Endpoint::DeleteBucket {} => {
@@ -257,7 +278,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
@@ -287,7 +308,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
@@ -320,7 +341,7 @@ impl ApiHandler for S3ApiServer {
common: ListQueryCommon {
bucket_name,
bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
+ delimiter,
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 733981e1..fa2f1b6d 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
@@ -14,11 +15,13 @@ use garage_util::data::*;
use garage_util::time::*;
use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
-pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
let loc = s3_xml::LocationConstraint {
xmlns: (),
region: garage.config.s3_api.s3_region.to_string(),
@@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>,
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
+pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> {
let versioning = s3_xml::VersioningConfiguration {
xmlns: (),
status: None,
@@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
-pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(
+ garage: &Garage,
+ api_key: &Key,
+) -> Result<Response<ResBody>, Error> {
let key_p = api_key.params().ok_or_internal_error(
"Key should not be in deleted state at this point (in handle_list_buckets)",
)?;
@@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_create_bucket(
garage: &Garage,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key: Key,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -194,7 +200,7 @@ pub async fn handle_create_bucket(
Ok(Response::builder()
.header("Location", format!("/{}", bucket_name))
- .body(Body::empty())
+ .body(empty_body())
.unwrap())
}
@@ -203,7 +209,7 @@ pub async fn handle_delete_bucket(
bucket_id: Uuid,
bucket_name: String,
api_key: Key,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let key_params = api_key
.params()
.ok_or_internal_error("Key should not be deleted at this point")?;
@@ -282,7 +288,7 @@ pub async fn handle_delete_bucket(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 68b4f0c9..ba9bfc88 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt};
use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
-use hyper::{Body, Request, Response};
+use hyper::{Request, Response};
use serde::Serialize;
use garage_rpc::netapp::bytes_buf::BytesBuf;
@@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::parse_bucket_key;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::multipart;
use crate::s3::put::get_headers;
@@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&garage, api_key, req).await?;
@@ -176,18 +177,18 @@ pub async fn handle_copy(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
pub async fn handle_upload_part_copy(
garage: Arc<Garage>,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
@@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
)
- .body(Body::from(resp_xml))?)
+ .body(string_body(resp_xml))?)
}
async fn get_copy_source(
garage: &Garage,
api_key: &Key,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
) -> Result<Object, Error> {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
@@ -501,7 +502,7 @@ struct CopyPreconditionHeaders {
}
impl CopyPreconditionHeaders {
- fn parse(req: &Request<Body>) -> Result<Self, Error> {
+ fn parse(req: &Request<ReqBody>) -> Result<Self, Error> {
Ok(Self {
copy_source_if_match: req
.headers()
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 49097ad1..e069cae4 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -5,10 +5,18 @@ use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
};
-use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode};
+use hyper::{
+ body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response,
+ StatusCode,
+};
+
+use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
+use crate::common_error::CommonError;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -17,7 +25,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -34,18 +42,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -55,16 +63,16 @@ pub async fn handle_delete_cors(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -84,14 +92,14 @@ pub async fn handle_put_cors(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
-pub async fn handle_options_s3api(
+pub async fn handle_options_api(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket_name: Option<String>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
// FIXME: CORS rules of buckets with local aliases are
// not taken into account.
@@ -121,7 +129,7 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "*")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
} else {
// If there is no bucket name in the request,
@@ -131,14 +139,14 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(EmptyBody::new())?)
}
}
pub fn handle_options_for_bucket(
- req: &Request<Body>,
+ req: &Request<IncomingBody>,
bucket: &Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
.headers()
.get("Origin")
@@ -161,18 +169,20 @@ pub fn handle_options_for_bucket(
if let Some(rule) = matching_rule {
let mut resp = Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?;
+ .body(EmptyBody::new())?;
add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
return Ok(resp);
}
}
- Err(Error::forbidden("This CORS request is not allowed."))
+ Err(CommonError::Forbidden(
+ "This CORS request is not allowed.".into(),
+ ))
}
pub fn find_matching_cors_rule<'a>(
bucket: &'a Bucket,
- req: &Request<Body>,
+ req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, Error> {
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@@ -209,7 +219,7 @@ where
}
pub fn add_cors_headers(
- resp: &mut Response<Body>,
+ resp: &mut Response<impl Body>,
rule: &GarageCorsRule,
) -> Result<(), http::header::InvalidHeaderValue> {
let h = resp.headers_mut();
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 1c491eac..3fb39147 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -1,12 +1,15 @@
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
@@ -59,11 +62,11 @@ pub async fn handle_delete(
garage: Arc<Garage>,
bucket_id: Uuid,
key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
match handle_delete_internal(&garage, bucket_id, key).await {
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()),
Err(e) => Err(e),
}
@@ -72,10 +75,10 @@ pub async fn handle_delete(
pub async fn handle_delete_objects(
garage: Arc<Garage>,
bucket_id: Uuid,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -118,7 +121,7 @@ pub async fn handle_delete_objects(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
}
struct DeleteRequest {
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index c50cff9f..f86c19a6 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -2,13 +2,12 @@ use std::convert::TryInto;
use err_derive::Error;
use hyper::header::HeaderValue;
-use hyper::{Body, HeaderMap, StatusCode};
-
-use garage_model::helper::error::Error as HelperError;
+use hyper::{HeaderMap, StatusCode};
use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError;
+use crate::helpers::*;
use crate::s3::xml as s3_xml;
use crate::signature::error::Error as SignatureError;
@@ -62,10 +61,6 @@ pub enum Error {
#[error(display = "Invalid XML: {}", _0)]
InvalidXml(String),
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
-
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
@@ -86,18 +81,6 @@ where
impl CommonErrorDerivative for Error {}
-impl From<HelperError> for Error {
- fn from(err: HelperError) -> Self {
- match err {
- HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
- HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
- HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
- HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
- e => Self::bad_request(format!("{}", e)),
- }
- }
-}
-
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@@ -118,7 +101,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
- SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
}
}
}
@@ -143,9 +125,7 @@ impl Error {
Error::NotImplemented(_) => "NotImplemented",
Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange",
- Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
- "InvalidRequest"
- }
+ Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
}
}
}
@@ -165,8 +145,7 @@ impl ApiError for Error {
| Error::EntityTooSmall
| Error::InvalidXml(_)
| Error::InvalidUtf8Str(_)
- | Error::InvalidUtf8String(_)
- | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
+ | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
}
}
@@ -189,22 +168,23 @@ impl ApiError for Error {
}
}
- fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = s3_xml::Error {
code: s3_xml::Value(self.aws_code().to_string()),
message: s3_xml::Value(format!("{}", self)),
resource: Some(s3_xml::Value(path.to_string())),
region: Some(s3_xml::Value(garage_region.to_string())),
};
- Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
+ let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| {
r#"
<?xml version="1.0" encoding="UTF-8"?>
<Error>
- <Code>InternalError</Code>
- <Message>XML encoding of error failed</Message>
+ <Code>InternalError</Code>
+ <Message>XML encoding of error failed</Message>
</Error>
- "#
+ "#
.into()
- }))
+ });
+ error_body(error_str)
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 5e682726..53f0a345 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -1,17 +1,20 @@
//! Function related to GET and HEAD requests
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use futures::future;
use futures::stream::{self, StreamExt};
use http::header::{
- ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
- IF_NONE_MATCH, LAST_MODIFIED, RANGE,
+ ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
+ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
+ LAST_MODIFIED, RANGE,
};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
-use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
+use garage_block::manager::BlockStream;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
@@ -20,10 +23,22 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
+#[derive(Default)]
+pub struct GetObjectOverrides {
+ pub(crate) response_cache_control: Option<String>,
+ pub(crate) response_content_disposition: Option<String>,
+ pub(crate) response_content_encoding: Option<String>,
+ pub(crate) response_content_language: Option<String>,
+ pub(crate) response_content_type: Option<String>,
+ pub(crate) response_expires: Option<String>,
+}
+
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
@@ -49,11 +64,37 @@ fn object_headers(
resp
}
+/// Override headers according to specific query parameters, see
+/// section "Overriding response header values through the request" in
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+fn getobject_override_headers(
+ overrides: GetObjectOverrides,
+ resp: &mut http::response::Builder,
+) -> Result<(), Error> {
+ // TODO: this only applies for signed requests, so when we support
+ // anonymous access in the future we will have to do a permission check here
+ let overrides = [
+ (CACHE_CONTROL, overrides.response_cache_control),
+ (CONTENT_DISPOSITION, overrides.response_content_disposition),
+ (CONTENT_ENCODING, overrides.response_content_encoding),
+ (CONTENT_LANGUAGE, overrides.response_content_language),
+ (CONTENT_TYPE, overrides.response_content_type),
+ (EXPIRES, overrides.response_expires),
+ ];
+ for (hdr, val_opt) in overrides {
+ if let Some(val) = val_opt {
+ let val = val.try_into().ok_or_bad_request("invalid header value")?;
+ resp.headers_mut().unwrap().insert(hdr, val);
+ }
+ }
+ Ok(())
+}
+
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
- req: &Request<Body>,
-) -> Option<Response<Body>> {
+ req: &Request<impl Body>,
+) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
@@ -80,7 +121,7 @@ fn try_answer_cached(
Some(
Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
)
} else {
@@ -91,11 +132,11 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -138,7 +179,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -163,7 +204,7 @@ pub async fn handle_head(
)
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
.status(StatusCode::PARTIAL_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
_ => unreachable!(),
}
@@ -171,18 +212,19 @@ pub async fn handle_head(
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
-) -> Result<Response<Body>, Error> {
+ overrides: GetObjectOverrides,
+) -> Result<Response<ResBody>, Error> {
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -233,18 +275,18 @@ pub async fn handle_get(
(None, None) => (),
}
- let resp_builder = object_headers(last_v, last_v_meta)
+ let mut resp_builder = object_headers(last_v, last_v_meta)
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK);
+ getobject_override_headers(overrides, &mut resp_builder)?;
match &last_v_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- let body: Body = Body::from(bytes.to_vec());
- Ok(resp_builder.body(body)?)
+ Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let (tx, rx) = mpsc::channel(2);
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash;
@@ -282,20 +324,12 @@ pub async fn handle_get(
{
Ok(()) => (),
Err(e) => {
- let err = std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- let _ = tx
- .send(Box::pin(stream::once(future::ready(Err(err)))))
- .await;
+ let _ = tx.send(error_stream_item(e)).await;
}
}
});
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
-
- let body = hyper::body::Body::wrap_stream(body_stream);
+ let body = response_body_from_block_stream(rx);
Ok(resp_builder.body(body)?)
}
}
@@ -308,7 +342,10 @@ async fn handle_get_range(
version_meta: &ObjectVersionMeta,
begin: u64,
end: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Here we do not use getobject_override_headers because we don't
+ // want to add any overridden headers (those should not be added
+ // when returning PARTIAL_CONTENT)
let resp_builder = object_headers(version, version_meta)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
@@ -321,7 +358,7 @@ async fn handle_get_range(
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
if end as usize <= bytes.len() {
- let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
+ let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
} else {
Err(Error::internal_error(
@@ -348,7 +385,8 @@ async fn handle_get_part(
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
+ // Same as for get_range, no getobject_override_headers
let resp_builder =
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
@@ -364,7 +402,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(Body::from(bytes.to_vec()))?)
+ .body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@@ -392,7 +430,7 @@ async fn handle_get_part(
}
fn parse_range_header(
- req: &Request<Body>,
+ req: &Request<impl Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@@ -434,7 +472,7 @@ fn body_from_blocks_range(
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
-) -> Body {
+) -> ResBody {
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
@@ -456,17 +494,17 @@ fn body_from_blocks_range(
}
let order_stream = OrderTag::stream();
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (block, block_offset))| {
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
+
+ tokio::spawn(async move {
+ match async {
let garage = garage.clone();
- async move {
- garage
+ for (i, (block, block_offset)) in blocks.iter().enumerate() {
+ let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- .scan(block_offset, move |chunk_offset, chunk| {
+ .await?
+ .scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
@@ -502,20 +540,42 @@ fn body_from_blocks_range(
};
futures::future::ready(r)
})
- .filter_map(futures::future::ready)
+ .filter_map(futures::future::ready);
+
+ let block_stream: BlockStream = Box::pin(block_stream);
+ tx.send(Box::pin(block_stream))
+ .await
+ .ok_or_message("channel closed")?;
}
- })
- .buffered(2)
- .flatten();
- hyper::body::Body::wrap_stream(body_stream)
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let _ = tx.send(error_stream_item(e)).await;
+ }
+ }
+ });
+
+ response_body_from_block_stream(rx)
+}
+
+fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
+ .flatten()
+ .map(|x| {
+ x.map(hyper::body::Frame::data)
+ .map_err(|e| Error::from(garage_util::error::Error::from(e)))
+ });
+ ResBody::new(http_body_util::StreamBody::new(body_stream))
}
-fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
- Box::pin(futures::stream::once(async move {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Could not get block {}: {}", i, e),
- ))
- }))
+fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ Box::pin(stream::once(future::ready(Err(err))))
}
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 1e7d6755..35757e8c 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -1,10 +1,13 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -16,7 +19,7 @@ use garage_model::bucket_table::{
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 1b9e8cd5..b832a4f4 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable};
use std::sync::Arc;
use base64::prelude::*;
-use hyper::{Body, Response};
+use hyper::Response;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -16,7 +16,8 @@ use garage_model::s3::object_table::*;
use garage_table::EnumerationOrder;
use crate::encoding::*;
-use crate::helpers::key_after_prefix;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
use crate::s3::error::*;
use crate::s3::multipart as s3_multipart;
use crate::s3::xml as s3_xml;
@@ -63,7 +64,7 @@ pub struct ListPartsQuery {
pub async fn handle_list(
garage: Arc<Garage>,
query: &ListObjectsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -162,13 +163,13 @@ pub async fn handle_list(
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_multipart_upload(
garage: Arc<Garage>,
query: &ListMultipartUploadsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
pub async fn handle_list_parts(
garage: Arc<Garage>,
query: &ListPartsQuery,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
@@ -319,7 +320,7 @@ pub async fn handle_list_parts(
Ok(Response::builder()
.header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
+ .body(string_body(xml))?)
}
/*
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 6b786318..b9d15b21 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::prelude::*;
-use hyper::body::Body;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
@@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@@ -26,11 +27,11 @@ use crate::signature::verify_signed_content;
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_put_part(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@@ -87,8 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let body = req.into_body().map_err(Error::from);
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let stream = body_stream(req.into_body());
+ let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
@@ -172,7 +173,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::empty())
+ .body(empty_body())
.unwrap();
Ok(response)
}
@@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup {
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::new(Body::from(xml.into_bytes())))
+ Ok(Response::new(string_body(xml)))
}
pub async fn handle_abort_multipart_upload(
@@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload(
bucket_id: Uuid,
key: &str,
upload_id: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
@@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload(
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
- Ok(Response::new(Body::from(vec![])))
+ Ok(Response::new(empty_body()))
}
// ======== helpers ============
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 542b7a81..bca8d6c6 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::convert::TryInto;
+use std::convert::{Infallible, TryInto};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -9,12 +9,15 @@ use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
+use crate::helpers::*;
+use crate::s3::api_server::ResBody;
+use crate::s3::cors::*;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
@@ -22,9 +25,9 @@ use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
bucket_name: String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let boundary = req
.headers()
.get(header::CONTENT_TYPE)
@@ -41,7 +44,8 @@ pub async fn handle_post_object(
);
let (head, body) = req.into_parts();
- let mut multipart = Multipart::with_constraints(body, boundary, constraints);
+ let stream = body_stream::<_, Error>(body);
+ let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
@@ -242,7 +246,7 @@ pub async fn handle_post_object(
let etag = format!("\"{}\"", md5);
- let resp = if let Some(mut target) = params
+ let mut resp = if let Some(mut target) = params
.get("success_action_redirect")
.and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok())
@@ -258,12 +262,11 @@ pub async fn handle_post_object(
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
- .body(target.into())?
+ .body(string_body(target))?
} else {
let path = head
.uri
- .into_parts()
- .path_and_query
+ .path_and_query()
.map(|paq| paq.path().to_string())
.unwrap_or_else(|| "/".to_string());
let authority = head
@@ -290,7 +293,7 @@ pub async fn handle_post_object(
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
match action {
- "200" => builder.status(StatusCode::OK).body(Body::empty())?,
+ "200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => {
let xml = s3_xml::PostObject {
xmlns: (),
@@ -302,12 +305,21 @@ pub async fn handle_post_object(
let body = s3_xml::to_xml_with_header(&xml)?;
builder
.status(StatusCode::CREATED)
- .body(Body::from(body.into_bytes()))?
+ .body(string_body(body))?
}
- _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
+ _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?,
}
};
+ let matching_cors_rule = find_matching_cors_rule(
+ &bucket,
+ &Request::from_parts(head, empty_body::<Infallible>()),
+ )?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
Ok(resp)
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index d1c88a76..8902b14c 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -4,12 +4,13 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
use futures::try_join;
-use hyper::body::{Body, Bytes};
-use hyper::header::{HeaderMap, HeaderValue};
-use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
+use hyper::body::Bytes;
+use hyper::header::{HeaderMap, HeaderValue};
+use hyper::{Request, Response};
+
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
@@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
pub async fn handle_put(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
@@ -48,13 +51,12 @@ pub async fn handle_put(
None => None,
};
- let (_head, body) = req.into_parts();
- let body = body.map_err(Error::from);
+ let stream = body_stream(req.into_body());
save_stream(
garage,
headers,
- body,
+ stream,
bucket,
key,
content_md5,
@@ -434,11 +436,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
}
}
-pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
+pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
- .body(Body::from(vec![]))
+ .body(empty_body())
.unwrap()
}
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index 821b0e07..e7ac1d77 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -125,6 +125,12 @@ pub enum Endpoint {
key: String,
part_number: Option<u64>,
version_id: Option<String>,
+ response_cache_control: Option<String>,
+ response_content_disposition: Option<String>,
+ response_content_encoding: Option<String>,
+ response_content_language: Option<String>,
+ response_content_type: Option<String>,
+ response_expires: Option<String>,
},
GetObjectAcl {
key: String,
@@ -170,7 +176,7 @@ pub enum Endpoint {
},
ListBuckets,
ListMultipartUploads {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_uploads: Option<usize>,
@@ -178,7 +184,7 @@ pub enum Endpoint {
upload_id_marker: Option<String>,
},
ListObjects {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
marker: Option<String>,
max_keys: Option<usize>,
@@ -188,7 +194,7 @@ pub enum Endpoint {
// This value should always be 2. It is not checked when constructing the struct
list_type: String,
continuation_token: Option<String>,
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
fetch_owner: Option<bool>,
max_keys: Option<usize>,
@@ -196,7 +202,7 @@ pub enum Endpoint {
start_after: Option<String>,
},
ListObjectVersions {
- delimiter: Option<char>,
+ delimiter: Option<String>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_keys: Option<u64>,
@@ -358,7 +364,14 @@ impl Endpoint {
(query.keyword.take().unwrap_or_default(), key, query, None),
key: [
EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker),
- EMPTY => GetObject (query_opt::version_id, opt_parse::part_number),
+ EMPTY => GetObject (query_opt::version_id,
+ opt_parse::part_number,
+ query_opt::response_cache_control,
+ query_opt::response_content_disposition,
+ query_opt::response_content_encoding,
+ query_opt::response_content_language,
+ query_opt::response_content_type,
+ query_opt::response_expires),
ACL => GetObjectAcl (query_opt::version_id),
LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id),
RETENTION => GetObjectRetention (query_opt::version_id),
@@ -671,6 +684,12 @@ generateQueryParameters! {
"partNumber" => part_number,
"part-number-marker" => part_number_marker,
"prefix" => prefix,
+ "response-cache-control" => response_cache_control,
+ "response-content-disposition" => response_content_disposition,
+ "response-content-encoding" => response_content_encoding,
+ "response-content-language" => response_content_language,
+ "response-content-type" => response_content_type,
+ "response-expires" => response_expires,
"select-type" => select_type,
"start-after" => start_after,
"uploadId" => upload_id,
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index 7f2ab925..1c1dbf20 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -1,9 +1,12 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
-use hyper::{Body, Request, Response, StatusCode};
+use http_body_util::BodyExt;
+use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
+use crate::helpers::*;
+use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
@@ -12,7 +15,7 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_util::data::*;
-pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> {
+pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/xml")
- .body(Body::from(xml))?)
+ .body(string_body(xml))?)
} else {
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}
pub async fn handle_delete_website(
garage: Arc<Garage>,
mut bucket: Bucket,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
@@ -54,16 +57,16 @@ pub async fn handle_delete_website(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_put_website(
garage: Arc<Garage>,
mut bucket: Bucket,
- req: Request<Body>,
+ req: Request<ReqBody>,
content_sha256: Option<Hash>,
-) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
+) -> Result<Response<ResBody>, Error> {
+ let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
@@ -83,7 +86,7 @@ pub async fn handle_put_website(
Ok(Response::builder()
.status(StatusCode::OK)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs
index f0d7c816..2d92a072 100644
--- a/src/api/signature/error.rs
+++ b/src/api/signature/error.rs
@@ -18,10 +18,6 @@ pub enum Error {
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
-
- /// The client sent a header with invalid value
- #[error(display = "Invalid header value: {}", _0)]
- InvalidHeader(#[error(source)] hyper::header::ToStrError),
}
impl<T> From<T> for Error
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index b50fb3bb..423aad93 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -1,8 +1,8 @@
use std::collections::HashMap;
-use chrono::{DateTime, Duration, NaiveDateTime, Utc};
+use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
use hmac::Mac;
-use hyper::{Body, Method, Request};
+use hyper::{body::Incoming as IncomingBody, Method, Request};
use sha2::{Digest, Sha256};
use garage_table::*;
@@ -20,7 +20,7 @@ use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
service: &'static str,
- request: &Request<Body>,
+ request: &Request<IncomingBody>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
for (key, val) in request.headers() {
@@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
- Ok(DateTime::from_utc(date, Utc))
+ Ok(Utc.from_utc_datetime(&date))
}
pub async fn verify_v4(
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
index c8358c4f..39147ca0 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/signature/streaming.rs
@@ -1,26 +1,30 @@
use std::pin::Pin;
-use chrono::{DateTime, NaiveDateTime, Utc};
+use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use garage_model::key_table::Key;
use hmac::Mac;
-use hyper::body::Bytes;
-use hyper::{Body, Request};
+use http_body_util::StreamBody;
+use hyper::body::{Bytes, Incoming as IncomingBody};
+use hyper::Request;
use garage_util::data::Hash;
use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
+use crate::helpers::*;
use crate::signature::error::*;
+pub type ReqBody = BoxBody<Error>;
+
pub fn parse_streaming_body(
api_key: &Key,
- req: Request<Body>,
+ req: Request<IncomingBody>,
content_sha256: &mut Option<Hash>,
region: &str,
service: &str,
-) -> Result<Request<Body>, Error> {
+) -> Result<Request<ReqBody>, Error> {
match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
@@ -40,26 +44,22 @@ pub fn parse_streaming_body(
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
- let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+ let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
let scope = compute_scope(&date, region, service);
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
- Body::wrap_stream(
- SignedPayloadStream::new(
- body.map_err(Error::from),
- signing_hmac,
- date,
- &scope,
- signature,
- )
- .map_err(Error::from),
- )
+ let stream = body_stream::<_, Error>(body);
+ let signed_payload_stream =
+ SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
+ .map(|x| x.map(hyper::body::Frame::data))
+ .map_err(Error::from);
+ ReqBody::new(StreamBody::new(signed_payload_stream))
}))
}
- _ => Ok(req),
+ _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
}
}
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index e4265cbe..7cf82ce6 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -19,26 +19,26 @@ garage_rpc.workspace = true
garage_util.workspace = true
garage_table.workspace = true
-opentelemetry = "0.17"
-
-arc-swap = "1.5"
-async-trait = "0.1.7"
-bytes = "1.0"
-bytesize = "1.2"
-hex = "0.4"
-tracing = "0.1"
-rand = "0.8"
-
-async-compression = { version = "0.4", features = ["tokio", "zstd"] }
-zstd = { version = "0.12", default-features = false }
-
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_bytes = "0.11"
-
-futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-util = { version = "0.7", features = ["io"] }
+opentelemetry.workspace = true
+
+arc-swap.workspace = true
+async-trait.workspace = true
+bytes.workspace = true
+bytesize.workspace = true
+hex.workspace = true
+tracing.workspace = true
+rand.workspace = true
+
+async-compression.workspace = true
+zstd.workspace = true
+
+serde.workspace = true
+serde_bytes.workspace = true
+
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+tokio-util.workspace = true
[features]
system-libs = [ "zstd/pkg-config" ]
diff --git a/src/block/manager.rs b/src/block/manager.rs
index bfd390ee..aae1ce45 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
+pub type BlockStream =
+ Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
+
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@@ -326,10 +329,7 @@ impl BlockManager {
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
- ) -> Result<
- Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
- Error,
- > {
+ ) -> Result<BlockStream, Error> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
DataBlockHeader::Plain => Ok(stream),
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 530f1966..fddc5cca 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -12,24 +12,19 @@ readme = "../../README.md"
path = "lib.rs"
[dependencies]
-err-derive = "0.3"
-hexdump = "0.1"
-tracing = "0.1"
+err-derive.workspace = true
+hexdump.workspace = true
+tracing.workspace = true
-heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true }
-rusqlite = { version = "0.29", optional = true }
-sled = { version = "0.34", optional = true }
-
-# cli deps
-clap = { version = "4.1", optional = true, features = ["derive", "env"] }
-pretty_env_logger = { version = "0.5", optional = true }
+heed = { workspace = true, optional = true }
+rusqlite = { workspace = true, optional = true }
+sled = { workspace = true, optional = true }
[dev-dependencies]
-mktemp = "0.5"
+mktemp.workspace = true
[features]
default = [ "sled", "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
-cli = ["clap", "pretty_env_logger"]
lmdb = [ "heed" ]
sqlite = [ "rusqlite" ]
diff --git a/src/db/lib.rs b/src/db/lib.rs
index fe44b01e..eef3e177 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -171,6 +171,48 @@ impl Db {
}
}
+/// List of supported database engine types
+///
+/// The `enum` holds list of *all* database engines that are are be supported by crate, no matter
+/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine
+/// and valid engine, whose support is not enabled via feature flag.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Engine {
+ Lmdb,
+ Sqlite,
+ Sled,
+}
+
+impl Engine {
+ /// Return variant name as static `&str`
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ Self::Lmdb => "lmdb",
+ Self::Sqlite => "sqlite",
+ Self::Sled => "sled",
+ }
+ }
+}
+
+impl std::fmt::Display for Engine {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
+ self.as_str().fmt(fmt)
+ }
+}
+
+impl std::str::FromStr for Engine {
+ type Err = Error;
+
+ fn from_str(text: &str) -> Result<Engine> {
+ match text {
+ "lmdb" | "heed" => Ok(Self::Lmdb),
+ "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
+ "sled" => Ok(Self::Sled),
+ kind => Err(Error(format!("Invalid DB engine: {}", kind).into())),
+ }
+ }
+}
+
#[allow(clippy::len_without_is_empty)]
impl Tree {
#[inline]
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index dce7ea73..02a72502 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -31,48 +31,51 @@ garage_table.workspace = true
garage_util.workspace = true
garage_web.workspace = true
-backtrace = "0.3"
-bytes = "1.0"
-bytesize = "1.2"
-timeago = { version = "0.4", default-features = false }
-parse_duration = "2.1"
-hex = "0.4"
-tracing = { version = "0.1" }
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
-rand = "0.8"
-async-trait = "0.1.7"
-sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-git-version = "0.3.4"
-
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_bytes = "0.11"
-structopt = { version = "0.3", default-features = false }
-toml = "0.6"
-
-futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-
-netapp = "0.10"
-
-opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
-opentelemetry-prometheus = { version = "0.10", optional = true }
-opentelemetry-otlp = { version = "0.10", optional = true }
-prometheus = { version = "0.13", optional = true }
+backtrace.workspace = true
+bytes.workspace = true
+bytesize.workspace = true
+timeago.workspace = true
+parse_duration.workspace = true
+hex.workspace = true
+tracing.workspace = true
+tracing-subscriber.workspace = true
+rand.workspace = true
+async-trait.workspace = true
+sodiumoxide.workspace = true
+structopt.workspace = true
+git-version.workspace = true
+
+serde.workspace = true
+serde_bytes.workspace = true
+toml.workspace = true
+
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+
+netapp.workspace = true
+
+opentelemetry.workspace = true
+opentelemetry-prometheus = { workspace = true, optional = true }
+opentelemetry-otlp = { workspace = true, optional = true }
+prometheus = { workspace = true, optional = true }
[dev-dependencies]
-aws-config = "0.55.2"
-aws-sdk-s3 = "0.28"
-chrono = "0.4"
-http = "0.2"
-hmac = "0.12"
-hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
-sha2 = "0.10"
-
-static_init = "1.0"
-assert-json-diff = "2.0"
-serde_json = "1.0"
-base64 = "0.21"
+aws-config.workspace = true
+aws-sdk-s3.workspace = true
+chrono.workspace = true
+http.workspace = true
+hmac.workspace = true
+http-body-util.workspace = true
+hyper.workspace = true
+hyper-util.workspace = true
+mktemp.workspace = true
+sha2.workspace = true
+
+static_init.workspace = true
+assert-json-diff.workspace = true
+serde_json.workspace = true
+base64.workspace = true
k2v-client.workspace = true
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
index c4a45738..edeb88c0 100644
--- a/src/garage/admin/block.rs
+++ b/src/garage/admin/block.rs
@@ -25,8 +25,7 @@ impl AdminRpcHandler {
}
async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let hash = self.find_block_hash_by_prefix(hash)?;
let refcount = self.garage.block_manager.get_block_rc(&hash)?;
let block_refs = self
.garage
@@ -189,4 +188,48 @@ impl AdminRpcHandler {
Ok(())
}
+
+ // ---- helper function ----
+ fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> {
+ if prefix.len() < 4 {
+ return Err(Error::BadRequest(
+ "Please specify at least 4 characters of the block hash".into(),
+ ));
+ }
+
+ let prefix_bin =
+ hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
+
+ let iter = self
+ .garage
+ .block_ref_table
+ .data
+ .store
+ .range(&prefix_bin[..]..)
+ .map_err(GarageError::from)?;
+ let mut found = None;
+ for item in iter {
+ let (k, _v) = item.map_err(GarageError::from)?;
+ let hash = Hash::try_from(&k[..32]).unwrap();
+ if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
+ break;
+ }
+ if hex::encode(hash.as_slice()).starts_with(prefix) {
+ match &found {
+ Some(x) if *x == hash => (),
+ Some(_) => {
+ return Err(Error::BadRequest(format!(
+ "Several blocks match prefix `{}`",
+ prefix
+ )));
+ }
+ None => {
+ found = Some(hash);
+ }
+ }
+ }
+ }
+
+ found.ok_or_else(|| Error::BadRequest("No matching block found".into()))
+ }
}
diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/convert_db.rs
index 3c6ce69c..6b854ccb 100644
--- a/src/garage/cli/convert_db.rs
+++ b/src/garage/cli/convert_db.rs
@@ -14,44 +14,73 @@ pub struct ConvertDbOpt {
/// Input database engine (sled, lmdb or sqlite; limited by db engines
/// enabled in this build)
#[structopt(short = "a")]
- input_engine: String,
+ input_engine: Engine,
/// Output database path
#[structopt(short = "o")]
output_path: PathBuf,
/// Output database engine
#[structopt(short = "b")]
- output_engine: String,
+ output_engine: Engine,
+
+ #[structopt(flatten)]
+ db_open: OpenDbOpt,
+}
+
+/// Overrides for database open operation
+#[derive(StructOpt, Debug, Default)]
+pub struct OpenDbOpt {
+ #[cfg(feature = "lmdb")]
+ #[structopt(flatten)]
+ lmdb: OpenLmdbOpt,
+}
+
+/// Overrides for LMDB database open operation
+#[cfg(feature = "lmdb")]
+#[derive(StructOpt, Debug, Default)]
+pub struct OpenLmdbOpt {
+ /// LMDB map size override
+ /// (supported suffixes: B, KiB, MiB, GiB, TiB, PiB)
+ #[cfg(feature = "lmdb")]
+ #[structopt(long = "lmdb-map-size", name = "bytes", display_order = 1_000)]
+ map_size: Option<bytesize::ByteSize>,
}
pub(crate) fn do_conversion(args: ConvertDbOpt) -> Result<()> {
- let input = open_db(args.input_path, args.input_engine)?;
- let output = open_db(args.output_path, args.output_engine)?;
+ if args.input_engine == args.output_engine {
+ return Err(Error("input and output database engine must differ".into()));
+ }
+
+ let input = open_db(args.input_path, args.input_engine, &args.db_open)?;
+ let output = open_db(args.output_path, args.output_engine, &args.db_open)?;
output.import(&input)?;
Ok(())
}
-fn open_db(path: PathBuf, engine: String) -> Result<Db> {
- match engine.as_str() {
+fn open_db(path: PathBuf, engine: Engine, open: &OpenDbOpt) -> Result<Db> {
+ match engine {
#[cfg(feature = "sled")]
- "sled" => {
+ Engine::Sled => {
let db = sled_adapter::sled::Config::default().path(&path).open()?;
Ok(sled_adapter::SledDb::init(db))
}
#[cfg(feature = "sqlite")]
- "sqlite" | "sqlite3" | "rusqlite" => {
+ Engine::Sqlite => {
let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
- db.pragma_update(None, "journal_mode", &"WAL")?;
- db.pragma_update(None, "synchronous", &"NORMAL")?;
+ db.pragma_update(None, "journal_mode", "WAL")?;
+ db.pragma_update(None, "synchronous", "NORMAL")?;
Ok(sqlite_adapter::SqliteDb::init(db))
}
#[cfg(feature = "lmdb")]
- "lmdb" | "heed" => {
+ Engine::Lmdb => {
std::fs::create_dir_all(&path).map_err(|e| {
Error(format!("Unable to create LMDB data directory: {}", e).into())
})?;
- let map_size = lmdb_adapter::recommended_map_size();
+ let map_size = match open.lmdb.map_size {
+ Some(c) => c.as_u64() as usize,
+ None => lmdb_adapter::recommended_map_size(),
+ };
let mut env_builder = lmdb_adapter::heed::EnvOpenOptions::new();
env_builder.max_dbs(100);
@@ -62,8 +91,13 @@ fn open_db(path: PathBuf, engine: String) -> Result<Db> {
let db = env_builder.open(&path)?;
Ok(lmdb_adapter::LmdbDb::init(db))
}
- e => Err(Error(
- format!("Invalid or unsupported DB engine: {}", e).into(),
+
+ // Pattern is unreachable when all supported DB engines are compiled into binary. The allow
+ // attribute is added so that we won't have to change this match in case stop building
+ // support for one or more engines by default.
+ #[allow(unreachable_patterns)]
+ engine => Err(Error(
+ format!("Engine support not available in this build: {}", engine).into(),
)),
}
}
diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs
index 20813f1c..43ca5c09 100644
--- a/src/garage/cli/init.rs
+++ b/src/garage/cli/init.rs
@@ -43,7 +43,7 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
idstr
);
eprintln!(
- "where <remote_node> is their own node identifier in the format: <pubkey>@<ip>:<port>"
+ "where <remote_node> is their own node identifier in the format: <full-node-id>@<ip>:<port>"
);
eprintln!();
eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:");
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 6bc3da22..40e47ee1 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -64,7 +64,8 @@ pub enum Command {
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
- /// Print identifier (public key) of this Garage node
+ /// Print the full node ID (public key) of this Garage node, and its publicly reachable IP
+ /// address and port if they are specified in config file under `rpc_public_addr`
#[structopt(name = "id", version = garage_version())]
NodeId(NodeIdOpt),
@@ -82,8 +83,9 @@ pub struct NodeIdOpt {
#[derive(StructOpt, Debug)]
pub struct ConnectNodeOpt {
- /// Node public key and address, in the format:
- /// `<public key hexadecimal>@<ip or hostname>:<port>`
+ /// Full node ID (public key) and IP address and port, in the format:
+ /// `<full node ID>@<ip or hostname>:<port>`.
+ /// You can retrieve this information on the target node using `garage node id`.
pub(crate) node: String,
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 66403d05..1a6a6e32 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -7,6 +7,7 @@ extern crate tracing;
mod admin;
mod cli;
mod repair;
+mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
mod tracing_setup;
@@ -28,7 +29,6 @@ use structopt::StructOpt;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::NetworkKey;
-use garage_util::config::Config;
use garage_util::error::*;
use garage_rpc::system::*;
@@ -38,6 +38,7 @@ use garage_model::helper::error::Error as HelperError;
use admin::*;
use cli::*;
+use secrets::Secrets;
#[derive(StructOpt, Debug)]
#[structopt(
@@ -45,8 +46,7 @@ use cli::*;
about = "S3-compatible object store for self-hosted geo-distributed deployments"
)]
struct Opt {
- /// Host to connect to for admin operations, in the format:
- /// <public-key>@<ip>:<port>
+ /// Host to connect to for admin operations, in the format: <full-node-id>@<ip>:<port>
#[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")]
pub rpc_host: Option<String>,
@@ -66,24 +66,6 @@ struct Opt {
cmd: Command,
}
-#[derive(StructOpt, Debug)]
-pub struct Secrets {
- /// RPC secret network key, used to replace rpc_secret in config.toml when running the
- /// daemon or doing admin operations
- #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
- pub rpc_secret: Option<String>,
-
- /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
- /// running the Garage daemon
- #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")]
- pub admin_token: Option<String>,
-
- /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
- /// running the Garage daemon
- #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")]
- pub metrics_token: Option<String>,
-}
-
#[tokio::main]
async fn main() {
// Initialize version and features info
@@ -192,7 +174,9 @@ async fn main() {
}
async fn cli_command(opt: Opt) -> Result<(), Error> {
- let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() {
+ let config = if (opt.secrets.rpc_secret.is_none() && opt.secrets.rpc_secret_file.is_none())
+ || opt.rpc_host.is_none()
+ {
Some(garage_util::config::read_config(opt.config_file.clone())
.err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?)
} else {
@@ -200,14 +184,19 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
};
// Find and parse network RPC secret
- let net_key_hex_str = opt
- .secrets
- .rpc_secret
- .as_ref()
- .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref()))
- .ok_or("No RPC secret provided")?;
+ let mut rpc_secret = config.as_ref().and_then(|c| c.rpc_secret.clone());
+ secrets::fill_secret(
+ &mut rpc_secret,
+ &config.as_ref().and_then(|c| c.rpc_secret_file.clone()),
+ &opt.secrets.rpc_secret,
+ &opt.secrets.rpc_secret_file,
+ "rpc_secret",
+ true,
+ )?;
+
+ let net_key_hex_str = rpc_secret.ok_or("No RPC secret provided")?;
let network_key = NetworkKey::from_slice(
- &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],
+ &hex::decode(&net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],
)
.ok_or("Invalid RPC secret provided (wrong length)")?;
@@ -218,7 +207,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
- let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
+ let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <full node id>@<IP or hostname>:<port>.", h))?;
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
@@ -248,7 +237,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
addr
);
}
- Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
+ Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct full-length node ID (public key).")?;
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
@@ -261,16 +250,3 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
Ok(x) => Ok(x),
}
}
-
-fn fill_secrets(mut config: Config, secrets: Secrets) -> Config {
- if secrets.rpc_secret.is_some() {
- config.rpc_secret = secrets.rpc_secret;
- }
- if secrets.admin_token.is_some() {
- config.admin.admin_token = secrets.admin_token;
- }
- if secrets.metrics_token.is_some() {
- config.admin.metrics_token = secrets.metrics_token;
- }
- config
-}
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index f4edcf03..45024e71 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -6,7 +6,7 @@ use garage_util::error::*;
use garage_model::garage::Garage;
use crate::cli::structs::*;
-use crate::{fill_secrets, Secrets};
+use crate::secrets::{fill_secrets, Secrets};
pub async fn offline_repair(
config_file: PathBuf,
@@ -20,7 +20,7 @@ pub async fn offline_repair(
}
info!("Loading configuration...");
- let config = fill_secrets(read_config(config_file)?, secrets);
+ let config = fill_secrets(read_config(config_file)?, secrets)?;
info!("Initializing Garage main data store...");
let garage = Garage::new(config)?;
diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs
new file mode 100644
index 00000000..c3d704aa
--- /dev/null
+++ b/src/garage/secrets.rs
@@ -0,0 +1,320 @@
+use std::path::PathBuf;
+
+use structopt::StructOpt;
+
+use garage_util::config::Config;
+use garage_util::error::Error;
+
+/// Structure for secret values or paths that are passed as CLI arguments or environment
+/// variables, instead of in the config file.
+#[derive(StructOpt, Debug, Default, Clone)]
+pub struct Secrets {
+ /// Skip permission check on files containing secrets
+ #[cfg(unix)]
+ #[structopt(
+ long = "allow-world-readable-secrets",
+ env = "GARAGE_ALLOW_WORLD_READABLE_SECRETS"
+ )]
+ pub allow_world_readable_secrets: Option<bool>,
+
+ /// RPC secret network key, used to replace rpc_secret in config.toml when running the
+ /// daemon or doing admin operations
+ #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
+ pub rpc_secret: Option<String>,
+
+ /// RPC secret network key, used to replace rpc_secret in config.toml and rpc-secret
+ /// when running the daemon or doing admin operations
+ #[structopt(long = "rpc-secret-file", env = "GARAGE_RPC_SECRET_FILE")]
+ pub rpc_secret_file: Option<PathBuf>,
+
+ /// Admin API authentication token, replaces admin.admin_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")]
+ pub admin_token: Option<String>,
+
+ /// Admin API authentication token file path, replaces admin.admin_token in config.toml
+ /// and admin-token when running the Garage daemon
+ #[structopt(long = "admin-token-file", env = "GARAGE_ADMIN_TOKEN_FILE")]
+ pub admin_token_file: Option<PathBuf>,
+
+ /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")]
+ pub metrics_token: Option<String>,
+
+ /// Metrics API authentication token file path, replaces admin.metrics_token in config.toml
+ /// and metrics-token when running the Garage daemon
+ #[structopt(long = "metrics-token-file", env = "GARAGE_METRICS_TOKEN_FILE")]
+ pub metrics_token_file: Option<PathBuf>,
+}
+
+/// Single function to fill all secrets in the Config struct from their correct source (value
+/// from config or CLI param or env variable or read from a file specified in config or CLI
+/// param or env variable)
+pub fn fill_secrets(mut config: Config, secrets: Secrets) -> Result<Config, Error> {
+ let allow_world_readable = secrets
+ .allow_world_readable_secrets
+ .unwrap_or(config.allow_world_readable_secrets);
+
+ fill_secret(
+ &mut config.rpc_secret,
+ &config.rpc_secret_file,
+ &secrets.rpc_secret,
+ &secrets.rpc_secret_file,
+ "rpc_secret",
+ allow_world_readable,
+ )?;
+
+ fill_secret(
+ &mut config.admin.admin_token,
+ &config.admin.admin_token_file,
+ &secrets.admin_token,
+ &secrets.admin_token_file,
+ "admin.admin_token",
+ allow_world_readable,
+ )?;
+ fill_secret(
+ &mut config.admin.metrics_token,
+ &config.admin.metrics_token_file,
+ &secrets.metrics_token,
+ &secrets.metrics_token_file,
+ "admin.metrics_token",
+ allow_world_readable,
+ )?;
+
+ Ok(config)
+}
+
+pub(crate) fn fill_secret(
+ config_secret: &mut Option<String>,
+ config_secret_file: &Option<PathBuf>,
+ cli_secret: &Option<String>,
+ cli_secret_file: &Option<PathBuf>,
+ name: &'static str,
+ allow_world_readable: bool,
+) -> Result<(), Error> {
+ let cli_value = match (&cli_secret, &cli_secret_file) {
+ (Some(_), Some(_)) => {
+ return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into());
+ }
+ (Some(secret), None) => Some(secret.to_string()),
+ (None, Some(file)) => Some(read_secret_file(file, allow_world_readable)?),
+ (None, None) => None,
+ };
+
+ if let Some(val) = cli_value {
+ if config_secret.is_some() || config_secret_file.is_some() {
+ debug!("Overriding secret `{}` using value specified using CLI argument or environnement variable.", name);
+ }
+
+ *config_secret = Some(val);
+ } else if let Some(file_path) = &config_secret_file {
+ if config_secret.is_some() {
+ return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into());
+ }
+
+ *config_secret = Some(read_secret_file(file_path, allow_world_readable)?);
+ }
+
+ Ok(())
+}
+
+fn read_secret_file(file_path: &PathBuf, allow_world_readable: bool) -> Result<String, Error> {
+ if !allow_world_readable {
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::MetadataExt;
+ let metadata = std::fs::metadata(file_path)?;
+ if metadata.mode() & 0o077 != 0 {
+ return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path.display(), metadata.mode()).into());
+ }
+ }
+ }
+
+ let secret_buf = std::fs::read_to_string(file_path)?;
+
+ // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
+ // also editors sometimes add a trailing newline
+ Ok(String::from(secret_buf.trim_end()))
+}
+
+#[cfg(test)]
+mod tests {
+ use std::fs::File;
+ use std::io::Write;
+
+ use garage_util::config::read_config;
+ use garage_util::error::Error;
+
+ use super::*;
+
+ #[test]
+ fn test_rpc_secret_file_works() -> Result<(), Error> {
+ let path_secret = mktemp::Temp::new_file()?;
+ let mut file_secret = File::create(path_secret.as_path())?;
+ writeln!(file_secret, "foo")?;
+ drop(file_secret);
+
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ let path_secret_path = path_secret.as_path();
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret_file = "{}"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#,
+ path_secret_path.display()
+ )?;
+ drop(file_config);
+
+ // Second configuration file, same as previous one
+ // except it allows world-readable secrets.
+ let path_config_allow_world_readable = mktemp::Temp::new_file()?;
+ let mut file_config_allow_world_readable =
+ File::create(path_config_allow_world_readable.as_path())?;
+ writeln!(
+ file_config_allow_world_readable,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret_file = "{}"
+ allow_world_readable_secrets = true
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#,
+ path_secret_path.display()
+ )?;
+ drop(file_config_allow_world_readable);
+
+ let config = read_config(path_config.to_path_buf())?;
+ let config = fill_secrets(config, Secrets::default())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+
+ // ---- Check non world-readable secrets config ----
+ #[cfg(unix)]
+ {
+ let secrets_allow_world_readable = Secrets {
+ allow_world_readable_secrets: Some(true),
+ ..Default::default()
+ };
+ let secrets_no_allow_world_readable = Secrets {
+ allow_world_readable_secrets: Some(false),
+ ..Default::default()
+ };
+
+ use std::os::unix::fs::PermissionsExt;
+ let metadata = std::fs::metadata(path_secret_path)?;
+ let mut perm = metadata.permissions();
+ perm.set_mode(0o660);
+ std::fs::set_permissions(path_secret_path, perm)?;
+
+ // Config file that just specifies the path
+ let config = read_config(path_config.to_path_buf())?;
+ assert!(fill_secrets(config, Secrets::default()).is_err());
+
+ let config = read_config(path_config.to_path_buf())?;
+ assert!(fill_secrets(config, secrets_allow_world_readable.clone()).is_ok());
+
+ let config = read_config(path_config.to_path_buf())?;
+ assert!(fill_secrets(config, secrets_no_allow_world_readable.clone()).is_err());
+
+ // Config file that also specifies to allow world_readable_secrets
+ let config = read_config(path_config_allow_world_readable.to_path_buf())?;
+ assert!(fill_secrets(config, Secrets::default()).is_ok());
+
+ let config = read_config(path_config_allow_world_readable.to_path_buf())?;
+ assert!(fill_secrets(config, secrets_allow_world_readable).is_ok());
+
+ let config = read_config(path_config_allow_world_readable.to_path_buf())?;
+ assert!(fill_secrets(config, secrets_no_allow_world_readable).is_err());
+ }
+
+ // ---- Check alternative secrets specified on CLI ----
+
+ let path_secret2 = mktemp::Temp::new_file()?;
+ let mut file_secret2 = File::create(path_secret2.as_path())?;
+ writeln!(file_secret2, "bar")?;
+ drop(file_secret2);
+
+ let config = read_config(path_config.to_path_buf())?;
+ let config = fill_secrets(
+ config,
+ Secrets {
+ rpc_secret: Some("baz".into()),
+ ..Default::default()
+ },
+ )?;
+ assert_eq!(config.rpc_secret.as_deref(), Some("baz"));
+
+ let config = read_config(path_config.to_path_buf())?;
+ let config = fill_secrets(
+ config,
+ Secrets {
+ rpc_secret_file: Some(path_secret2.clone()),
+ ..Default::default()
+ },
+ )?;
+ assert_eq!(config.rpc_secret.as_deref(), Some("bar"));
+
+ let config = read_config(path_config.to_path_buf())?;
+ assert!(fill_secrets(
+ config,
+ Secrets {
+ rpc_secret: Some("baz".into()),
+ rpc_secret_file: Some(path_secret2.clone()),
+ ..Default::default()
+ }
+ )
+ .is_err());
+
+ drop(path_secret);
+ drop(path_secret2);
+ drop(path_config);
+ drop(path_config_allow_world_readable);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret= "dummy"
+ rpc_secret_file = "dummy"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ let config = read_config(path_config.to_path_buf())?;
+ assert_eq!(
+ "only one of `rpc_secret` and `rpc_secret_file` can be set",
+ fill_secrets(config, Secrets::default())
+ .unwrap_err()
+ .to_string()
+ );
+ drop(path_config);
+ drop(file_config);
+ Ok(())
+ }
+}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 3ad10b72..51b06b8e 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -15,9 +15,9 @@ use garage_web::WebServer;
use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
+use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
-use crate::{fill_secrets, Secrets};
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -29,12 +29,19 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> {
info!("Loading configuration...");
- let config = fill_secrets(read_config(config_file)?, secrets);
+ let config = fill_secrets(read_config(config_file)?, secrets)?;
// ---- Initialize Garage internals ----
#[cfg(feature = "metrics")]
- let metrics_exporter = opentelemetry_prometheus::exporter().init();
+ let metrics_exporter = opentelemetry_prometheus::exporter()
+ .with_default_summary_quantiles(vec![0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
+ .with_default_histogram_boundaries(vec![
+ 0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07, 0.1,
+ 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30., 40., 50., 60.,
+ 70., 100.,
+ ])
+ .init();
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone())?;
@@ -81,7 +88,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
garage.clone(),
s3_bind_addr.clone(),
config.s3_api.s3_region.clone(),
- wait_from(watch_cancel.clone()),
+ watch_cancel.clone(),
)),
));
}
@@ -96,7 +103,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
garage.clone(),
config.k2v_api.as_ref().unwrap().api_bind_addr.clone(),
config.s3_api.s3_region.clone(),
- wait_from(watch_cancel.clone()),
+ watch_cancel.clone(),
)),
));
}
@@ -106,14 +113,10 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
if let Some(web_config) = &config.s3_web {
info!("Initializing web server...");
+ let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
servers.push((
"Web",
- tokio::spawn(WebServer::run(
- garage.clone(),
- web_config.bind_addr.clone(),
- web_config.root_domain.clone(),
- wait_from(watch_cancel.clone()),
- )),
+ tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())),
));
}
@@ -121,9 +124,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching Admin API server...");
servers.push((
"Admin",
- tokio::spawn(
- admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())),
- ),
+ tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())),
));
}
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index ef4daa5d..ffa4cae8 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -1,3 +1,4 @@
+use aws_sdk_s3::config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::{Client, Config};
@@ -11,6 +12,7 @@ pub fn build_client(key: &Key) -> Client {
.endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
+ .behavior_version(BehaviorVersion::v2023_11_09())
.build();
Client::from_conf(config)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 4133bb8b..e5f4cca1 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -5,12 +5,17 @@ use std::convert::TryFrom;
use chrono::{offset::Utc, DateTime};
use hmac::{Hmac, Mac};
-use hyper::client::HttpConnector;
-use hyper::{Body, Client, Method, Request, Response, Uri};
+use http_body_util::BodyExt;
+use http_body_util::Full as FullBody;
+use hyper::{Method, Request, Response, Uri};
+use hyper_util::client::legacy::{connect::HttpConnector, Client};
+use hyper_util::rt::TokioExecutor;
use super::garage::{Instance, Key};
use garage_api::signature;
+pub type Body = FullBody<hyper::body::Bytes>;
+
/// You should ever only use this to send requests AWS sdk won't send,
/// like to reproduce behavior of unusual implementations found to be
/// problematic.
@@ -19,7 +24,7 @@ pub struct CustomRequester {
key: Key,
uri: Uri,
service: &'static str,
- client: Client<HttpConnector>,
+ client: Client<HttpConnector, Body>,
}
impl CustomRequester {
@@ -28,7 +33,7 @@ impl CustomRequester {
key: key.clone(),
uri: instance.s3_uri(),
service: "s3",
- client: Client::new(),
+ client: Client::builder(TokioExecutor::new()).build_http(),
}
}
@@ -37,7 +42,7 @@ impl CustomRequester {
key: key.clone(),
uri: instance.k2v_uri(),
service: "k2v",
- client: Client::new(),
+ client: Client::builder(TokioExecutor::new()).build_http(),
}
}
@@ -139,7 +144,7 @@ impl<'a> RequestBuilder<'a> {
self
}
- pub async fn send(&mut self) -> hyper::Result<Response<Body>> {
+ pub async fn send(&mut self) -> Result<Response<Body>, String> {
// TODO this is a bit incorrect in that path and query params should be url-encoded and
// aren't, but this is good enought for now.
@@ -200,8 +205,8 @@ impl<'a> RequestBuilder<'a> {
all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone());
let mut signed_headers = all_headers
- .iter()
- .map(|(k, _)| k.as_ref())
+ .keys()
+ .map(|k| k.as_ref())
.collect::<Vec<&str>>();
signed_headers.sort();
let signed_headers = signed_headers.join(";");
@@ -242,7 +247,22 @@ impl<'a> RequestBuilder<'a> {
.method(self.method.clone())
.body(Body::from(body))
.unwrap();
- self.requester.client.request(request).await
+
+ let result = self
+ .requester
+ .client
+ .request(request)
+ .await
+ .map_err(|err| format!("hyper client error: {}", err))?;
+
+ let (head, body) = result.into_parts();
+ let body = Body::new(
+ body.collect()
+ .await
+ .map_err(|err| format!("hyper client error in body.collect: {}", err))?
+ .to_bytes(),
+ );
+ Ok(Response::from_parts(head, body))
}
}
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
index 71de91bf..39554d4d 100644
--- a/src/garage/tests/k2v/batch.rs
+++ b/src/garage/tests/k2v/batch.rs
@@ -7,6 +7,7 @@ use base64::prelude::*;
use serde_json::json;
use crate::json_body;
+use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test]
@@ -77,10 +78,7 @@ async fn test_batch() {
.unwrap()
.to_string(),
);
- let res_body = hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec();
+ let res_body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(res_body, values.get(sk).unwrap().as_bytes());
}
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 20add889..5a347bd9 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -7,6 +7,7 @@ use base64::prelude::*;
use serde_json::json;
use crate::json_body;
+use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test]
@@ -83,10 +84,7 @@ async fn test_items_and_indices() {
.to_str()
.unwrap()
.to_string();
- let res_body = hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec();
+ let res_body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(res_body, content);
// ReadIndex -- now there should be some stuff
@@ -152,10 +150,7 @@ async fn test_items_and_indices() {
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
);
- let res_body = hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec();
+ let res_body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(res_body, content2);
// ReadIndex -- now there should be some stuff
@@ -394,10 +389,7 @@ async fn test_item_return_format() {
.to_str()
.unwrap()
.to_string();
- let res_body = hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec();
+ let res_body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(res_body, single_value);
// f1: not specified
@@ -434,10 +426,7 @@ async fn test_item_return_format() {
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
);
- let res_body = hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec();
+ let res_body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(res_body, single_value);
// f3: json
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index 452317c2..277f8bc8 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,4 +1,5 @@
use base64::prelude::*;
+use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
use std::time::Duration;
@@ -47,11 +48,8 @@ async fn test_poll_item() {
.unwrap()
.to_string();
- let res2_body = hyper::body::to_bytes(res2.into_body())
- .await
- .unwrap()
- .to_vec();
- assert_eq!(res2_body, b"Initial value");
+ let res2_body = res2.into_body().collect().await.unwrap().to_bytes();
+ assert_eq!(res2_body, b"Initial value"[..]);
// Start poll operation
let poll = {
@@ -95,11 +93,8 @@ async fn test_poll_item() {
assert_eq!(poll_res.status(), StatusCode::OK);
- let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
- .await
- .unwrap()
- .to_vec();
- assert_eq!(poll_res_body, b"New value");
+ let poll_res_body = poll_res.into_body().collect().await.unwrap().to_bytes();
+ assert_eq!(poll_res_body, b"New value"[..]);
}
#[tokio::test]
diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs
index 465fc24d..1017330d 100644
--- a/src/garage/tests/k2v/simple.rs
+++ b/src/garage/tests/k2v/simple.rs
@@ -1,5 +1,6 @@
use crate::common;
+use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test]
@@ -32,9 +33,6 @@ async fn test_simple() {
.unwrap();
assert_eq!(res2.status(), StatusCode::OK);
- let res2_body = hyper::body::to_bytes(res2.into_body())
- .await
- .unwrap()
- .to_vec();
- assert_eq!(res2_body, b"Hello, world!");
+ let res2_body = res2.into_body().collect().await.unwrap().to_bytes();
+ assert_eq!(res2_body, b"Hello, world!"[..]);
}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index ab92bc0a..ef370db3 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -11,15 +11,15 @@ mod k2v;
#[cfg(feature = "k2v")]
mod k2v_client;
-use hyper::{Body, Response};
+use http_body_util::BodyExt;
+use hyper::{body::Body, Response};
-pub async fn json_body(res: Response<Body>) -> serde_json::Value {
- let res_body: serde_json::Value = serde_json::from_slice(
- &hyper::body::to_bytes(res.into_body())
- .await
- .unwrap()
- .to_vec()[..],
- )
- .unwrap();
+pub async fn json_body<B>(res: Response<B>) -> serde_json::Value
+where
+ B: Body,
+ <B as Body>::Error: std::fmt::Debug,
+{
+ let body = res.into_body().collect().await.unwrap().to_bytes();
+ let res_body: serde_json::Value = serde_json::from_slice(&body).unwrap();
res_body
}
diff --git a/src/garage/tests/s3/list.rs b/src/garage/tests/s3/list.rs
index bb03f250..1b0c006d 100644
--- a/src/garage/tests/s3/list.rs
+++ b/src/garage/tests/s3/list.rs
@@ -613,3 +613,63 @@ async fn test_listmultipart() {
assert!(r.common_prefixes.is_none());
}
}
+
+#[tokio::test]
+async fn test_multichar_delimiter() {
+ // Test case from dpape from issue #692 with reference results from Amazon
+
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("multichardelim");
+
+ for k in [
+ "a/", "a/b/", "a/b/c/", "a/b/c/d", "a/c/", "a/c/b/", "a/c/b/e",
+ ] {
+ ctx.client
+ .put_object()
+ .bucket(&bucket)
+ .key(k)
+ .send()
+ .await
+ .unwrap();
+ }
+
+ // With delimiter /
+ {
+ let r = ctx
+ .client
+ .list_objects_v2()
+ .bucket(&bucket)
+ .delimiter("/")
+ .send()
+ .await
+ .unwrap();
+
+ assert!(r.contents.is_none());
+
+ let common_prefixes = r.common_prefixes.unwrap();
+ assert_eq!(common_prefixes.len(), 1);
+ assert_eq!(common_prefixes[0].prefix.as_deref().unwrap(), "a/");
+ }
+
+ // With delimiter b/
+ {
+ let r = ctx
+ .client
+ .list_objects_v2()
+ .bucket(&bucket)
+ .delimiter("b/")
+ .send()
+ .await
+ .unwrap();
+
+ let contents = r.contents.unwrap();
+ assert_eq!(contents.len(), 2);
+ assert_eq!(contents[0].key.as_deref().unwrap(), "a/");
+ assert_eq!(contents[1].key.as_deref().unwrap(), "a/c/");
+
+ let common_prefixes = r.common_prefixes.unwrap();
+ assert_eq!(common_prefixes.len(), 2);
+ assert_eq!(common_prefixes[0].prefix.as_deref().unwrap(), "a/b/");
+ assert_eq!(common_prefixes[1].prefix.as_deref().unwrap(), "a/c/b/");
+ }
+}
diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs
index 09ae5e5b..51c9df74 100644
--- a/src/garage/tests/s3/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
@@ -154,7 +154,7 @@ async fn test_multipart_upload() {
.await
.unwrap();
- assert_eq!(r.content_length, (SZ_5MB * 3) as i64);
+ assert_eq!(r.content_length.unwrap(), (SZ_5MB * 3) as i64);
}
{
@@ -183,7 +183,7 @@ async fn test_multipart_upload() {
.unwrap();
eprintln!("get_object with part_number = {}", part_number);
- assert_eq!(o.content_length, SZ_5MB as i64);
+ assert_eq!(o.content_length.unwrap(), SZ_5MB as i64);
assert_bytes_eq!(o.body, data);
}
}
@@ -249,14 +249,14 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 1);
- assert_eq!(ps[0].part_number, 2);
+ assert_eq!(ps[0].part_number.unwrap(), 2);
let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3366bb9dcf710d6801b5926467d02e19\""
);
- assert_eq!(fp.size, SZ_5MB as i64);
+ assert_eq!(fp.size.unwrap(), SZ_5MB as i64);
}
let p2 = ctx
@@ -286,23 +286,23 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 2);
- assert_eq!(ps[0].part_number, 1);
+ assert_eq!(ps[0].part_number.unwrap(), 1);
let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3c484266f9315485694556e6c693bfa2\""
);
- assert_eq!(fp.size, SZ_5MB as i64);
+ assert_eq!(fp.size.unwrap(), SZ_5MB as i64);
- assert_eq!(ps[1].part_number, 2);
+ assert_eq!(ps[1].part_number.unwrap(), 2);
let sp = &ps[1];
assert!(sp.last_modified.is_some());
assert_eq!(
sp.e_tag.as_ref().unwrap(),
"\"3366bb9dcf710d6801b5926467d02e19\""
);
- assert_eq!(sp.size, SZ_5MB as i64);
+ assert_eq!(sp.size.unwrap(), SZ_5MB as i64);
}
{
@@ -320,14 +320,14 @@ async fn test_uploadlistpart() {
assert!(r.part_number_marker.is_none());
assert_eq!(r.next_part_number_marker.as_deref(), Some("1"));
- assert_eq!(r.max_parts, 1_i32);
- assert!(r.is_truncated);
+ assert_eq!(r.max_parts.unwrap(), 1_i32);
+ assert!(r.is_truncated.unwrap());
assert_eq!(r.key.unwrap(), "a");
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
let parts = r.parts.unwrap();
assert_eq!(parts.len(), 1);
let fp = &parts[0];
- assert_eq!(fp.part_number, 1);
+ assert_eq!(fp.part_number.unwrap(), 1);
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3c484266f9315485694556e6c693bfa2\""
@@ -349,19 +349,19 @@ async fn test_uploadlistpart() {
r2.part_number_marker.as_ref().unwrap(),
r.next_part_number_marker.as_ref().unwrap()
);
- assert_eq!(r2.max_parts, 1_i32);
+ assert_eq!(r2.max_parts.unwrap(), 1_i32);
assert_eq!(r2.key.unwrap(), "a");
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
let parts = r2.parts.unwrap();
assert_eq!(parts.len(), 1);
let fp = &parts[0];
- assert_eq!(fp.part_number, 2);
+ assert_eq!(fp.part_number.unwrap(), 2);
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3366bb9dcf710d6801b5926467d02e19\""
);
//assert!(r2.is_truncated); // WHY? (this was the test before)
- assert!(!r2.is_truncated);
+ assert!(!r2.is_truncated.unwrap());
}
let cmp = CompletedMultipartUpload::builder()
@@ -411,7 +411,7 @@ async fn test_uploadlistpart() {
.await
.unwrap();
- assert_eq!(r.content_length, (SZ_5MB * 2) as i64);
+ assert_eq!(r.content_length.unwrap(), (SZ_5MB * 2) as i64);
}
}
diff --git a/src/garage/tests/s3/objects.rs b/src/garage/tests/s3/objects.rs
index 27697d45..ad5f63f1 100644
--- a/src/garage/tests/s3/objects.rs
+++ b/src/garage/tests/s3/objects.rs
@@ -50,9 +50,9 @@ async fn test_putobject() {
// assert_eq!(o.version_id.unwrap(), _version);
assert_eq!(o.content_type.unwrap(), content_type);
assert!(o.last_modified.is_some());
- assert_eq!(o.content_length, 0);
- assert_eq!(o.parts_count, 0);
- assert_eq!(o.tag_count, 0);
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
}
{
@@ -86,9 +86,9 @@ async fn test_putobject() {
assert_bytes_eq!(o.body, b"hi");
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
- assert_eq!(o.content_length, 2);
- assert_eq!(o.parts_count, 0);
- assert_eq!(o.tag_count, 0);
+ assert_eq!(o.content_length.unwrap(), 2);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
}
{
@@ -119,9 +119,9 @@ async fn test_putobject() {
assert_bytes_eq!(o.body, b"");
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
- assert_eq!(o.content_length, 0);
- assert_eq!(o.parts_count, 0);
- assert_eq!(o.tag_count, 0);
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
}
}
@@ -185,6 +185,30 @@ async fn test_getobject() {
assert_eq!(o.content_range.unwrap().as_str(), "bytes 57-61/62");
assert_bytes_eq!(o.body, &BODY[57..]);
}
+ {
+ let exp = aws_sdk_s3::primitives::DateTime::from_secs(10000000000);
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key(STD_KEY)
+ .response_content_type("application/x-dummy-test")
+ .response_cache_control("ccdummy")
+ .response_content_disposition("cddummy")
+ .response_content_encoding("cedummy")
+ .response_content_language("cldummy")
+ .response_expires(exp)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(o.content_type.unwrap().as_str(), "application/x-dummy-test");
+ assert_eq!(o.cache_control.unwrap().as_str(), "ccdummy");
+ assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy");
+ assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy");
+ assert_eq!(o.content_language.unwrap().as_str(), "cldummy");
+ assert_eq!(o.expires.unwrap(), exp);
+ assert_bytes_eq!(o.body, &BODY[..]);
+ }
}
#[tokio::test]
@@ -205,7 +229,7 @@ async fn test_deleteobject() {
.await
.unwrap();
if i > 0 {
- to_del = to_del.objects(ObjectIdentifier::builder().key(k).build());
+ to_del = to_del.objects(ObjectIdentifier::builder().key(k).build().unwrap());
}
}
@@ -223,7 +247,7 @@ async fn test_deleteobject() {
.unwrap();
if i > 0 {
- to_del = to_del.objects(ObjectIdentifier::builder().key(k).build());
+ to_del = to_del.objects(ObjectIdentifier::builder().key(k).build().unwrap());
}
}
@@ -247,7 +271,7 @@ async fn test_deleteobject() {
.client
.delete_objects()
.bucket(&bucket)
- .delete(to_del.build())
+ .delete(to_del.build().unwrap())
.send()
.await
.unwrap();
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index b7a1acae..224b9ed5 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -57,9 +57,9 @@ async fn test_putobject_streaming() {
// assert_eq!(o.version_id.unwrap(), _version);
assert_eq!(o.content_type.unwrap(), content_type);
assert!(o.last_modified.is_some());
- assert_eq!(o.content_length, 0);
- assert_eq!(o.parts_count, 0);
- assert_eq!(o.tag_count, 0);
+ assert_eq!(o.content_length.unwrap(), 0);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
}
{
@@ -95,9 +95,9 @@ async fn test_putobject_streaming() {
assert_bytes_eq!(o.body, BODY);
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
- assert_eq!(o.content_length, 62);
- assert_eq!(o.parts_count, 0);
- assert_eq!(o.tag_count, 0);
+ assert_eq!(o.content_length.unwrap(), 62);
+ assert_eq!(o.parts_count, None);
+ assert_eq!(o.tag_count, None);
}
}
@@ -187,7 +187,7 @@ async fn test_put_website_streaming() {
.await
.unwrap();
- assert_eq!(o.index_document.unwrap().suffix.unwrap(), "home.html");
- assert_eq!(o.error_document.unwrap().key.unwrap(), "err/error.html");
+ assert_eq!(o.index_document.unwrap().suffix, "home.html");
+ assert_eq!(o.error_document.unwrap().key, "err/error.html");
}
}
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index eeafb5fa..0cadc388 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -8,15 +8,18 @@ use aws_sdk_s3::{
types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
};
use http::{Request, StatusCode};
-use hyper::{
- body::{to_bytes, Body},
- Client,
-};
+use http_body_util::BodyExt;
+use http_body_util::Full as FullBody;
+use hyper::body::Bytes;
+use hyper_util::client::legacy::Client;
+use hyper_util::rt::TokioExecutor;
use serde_json::json;
const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
const BODY_ERR: &[u8; 6] = b"erreur";
+pub type Body = FullBody<Bytes>;
+
#[tokio::test]
async fn test_website() {
const BCKT_NAME: &str = "my-website";
@@ -34,14 +37,14 @@ async fn test_website() {
.await
.unwrap();
- let client = Client::new();
+ let client = Client::builder(TokioExecutor::new()).build_http();
let req = || {
Request::builder()
.method("GET")
.uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -49,7 +52,7 @@ async fn test_website() {
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ BodyExt::collect(resp.into_body()).await.unwrap().to_bytes(),
BODY.as_ref()
); /* check that we do not leak body */
@@ -58,10 +61,9 @@ async fn test_website() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
+ ctx.garage.admin_port, BCKT_NAME
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -87,7 +89,7 @@ async fn test_website() {
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
);
@@ -103,14 +105,14 @@ async fn test_website() {
"http://127.0.0.1:{0}/check?domain={1}",
ctx.garage.admin_port, bname
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
- let mut admin_resp = client.request(admin_req()).await.unwrap();
+ let admin_resp = client.request(admin_req()).await.unwrap();
assert_eq!(admin_resp.status(), StatusCode::OK);
assert_eq!(
- to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
+ admin_resp.into_body().collect().await.unwrap().to_bytes(),
format!("Domain '{bname}' is managed by Garage").as_bytes()
);
}
@@ -124,7 +126,7 @@ async fn test_website() {
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
); /* check that we do not leak body */
@@ -133,10 +135,9 @@ async fn test_website() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
+ ctx.garage.admin_port, BCKT_NAME
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -181,8 +182,18 @@ async fn test_website_s3_api() {
.unwrap();
let conf = WebsiteConfiguration::builder()
- .index_document(IndexDocument::builder().suffix("home.html").build())
- .error_document(ErrorDocument::builder().key("err/error.html").build())
+ .index_document(
+ IndexDocument::builder()
+ .suffix("home.html")
+ .build()
+ .unwrap(),
+ )
+ .error_document(
+ ErrorDocument::builder()
+ .key("err/error.html")
+ .build()
+ .unwrap(),
+ )
.build();
ctx.client
@@ -201,9 +212,11 @@ async fn test_website_s3_api() {
.allowed_methods("GET")
.allowed_methods("PUT")
.allowed_origins("*")
- .build(),
+ .build()
+ .unwrap(),
)
- .build();
+ .build()
+ .unwrap();
ctx.client
.put_bucket_cors()
@@ -222,24 +235,21 @@ async fn test_website_s3_api() {
.await
.unwrap();
- let main_rule = cors_res.cors_rules().unwrap().iter().next().unwrap();
+ let main_rule = cors_res.cors_rules().iter().next().unwrap();
assert_eq!(main_rule.id.as_ref().unwrap(), "main-rule");
assert_eq!(
main_rule.allowed_headers.as_ref().unwrap(),
&vec!["*".to_string()]
);
+ assert_eq!(&main_rule.allowed_origins, &vec!["*".to_string()]);
assert_eq!(
- main_rule.allowed_origins.as_ref().unwrap(),
- &vec!["*".to_string()]
- );
- assert_eq!(
- main_rule.allowed_methods.as_ref().unwrap(),
+ &main_rule.allowed_methods,
&vec!["GET".to_string(), "PUT".to_string()]
);
}
- let client = Client::new();
+ let client = Client::builder(TokioExecutor::new()).build_http();
// Test direct requests with CORS
{
@@ -248,10 +258,10 @@ async fn test_website_s3_api() {
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
@@ -259,7 +269,7 @@ async fn test_website_s3_api() {
"*"
);
assert_eq!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
);
}
@@ -273,14 +283,14 @@ async fn test_website_s3_api() {
ctx.garage.web_port
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY_ERR.as_ref()
);
}
@@ -293,10 +303,10 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
@@ -304,7 +314,7 @@ async fn test_website_s3_api() {
"*"
);
assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
);
}
@@ -317,14 +327,14 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "DELETE")
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
);
}
@@ -358,14 +368,14 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
+ resp.into_body().collect().await.unwrap().to_bytes(),
BODY.as_ref()
);
}
@@ -384,20 +394,15 @@ async fn test_website_s3_api() {
.method("GET")
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap();
- let mut resp = client.request(req).await.unwrap();
+ let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
- assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
- BODY_ERR.as_ref()
- );
- assert_ne!(
- to_bytes(resp.body_mut()).await.unwrap().as_ref(),
- BODY.as_ref()
- );
+ let resp_bytes = resp.into_body().collect().await.unwrap().to_bytes();
+ assert_ne!(resp_bytes, BODY_ERR.as_ref());
+ assert_ne!(resp_bytes, BODY.as_ref());
}
}
@@ -405,13 +410,13 @@ async fn test_website_s3_api() {
async fn test_website_check_domain() {
let ctx = common::context();
- let client = Client::new();
+ let client = Client::builder(TokioExecutor::new()).build_http();
let admin_req = || {
Request::builder()
.method("GET")
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -435,7 +440,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=",
ctx.garage.admin_port
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -459,7 +464,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=foobar",
ctx.garage.admin_port
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
@@ -483,7 +488,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=%E2%98%B9",
ctx.garage.admin_port
))
- .body(Body::empty())
+ .body(Body::new(Bytes::new()))
.unwrap()
};
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 2ccb9fe5..694be1f8 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -9,25 +9,28 @@ repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[dependencies]
-base64 = "0.21"
-sha2 = "0.10"
-hex = "0.4"
-http = "0.2"
-log = "0.4"
-aws-sigv4 = "0.55"
-percent-encoding = "2.2"
-hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] }
-hyper-rustls = { version = "0.24", features = ["http2"] }
-serde = { version = "1.0", features = [ "derive" ] }
-serde_json = "1.0"
-thiserror = "1.0"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+base64.workspace = true
+sha2.workspace = true
+hex.workspace = true
+http.workspace = true
+http-body-util.workspace = true
+log.workspace = true
+aws-sigv4.workspace = true
+aws-sdk-config.workspace = true
+percent-encoding.workspace = true
+hyper = { workspace = true, default-features = false, features = ["http1", "http2"] }
+hyper-util.workspace = true
+hyper-rustls.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+thiserror.workspace = true
+tokio.workspace = true
# cli deps
-clap = { version = "4.1", optional = true, features = ["derive", "env"] }
+clap = { workspace = true, optional = true }
format_table = { workspace = true, optional = true }
-tracing = { version = "0.1", optional = true }
-tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] }
+tracing = { workspace = true, optional = true }
+tracing-subscriber = { workspace = true, optional = true }
[features]
diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs
index 564ce497..96f5674a 100644
--- a/src/k2v-client/error.rs
+++ b/src/k2v-client/error.rs
@@ -22,12 +22,14 @@ pub enum Error {
Http(#[from] http::Error),
#[error("hyper error: {0}")]
Hyper(#[from] hyper::Error),
+ #[error("hyper client error: {0}")]
+ HyperClient(#[from] hyper_util::client::legacy::Error),
#[error("invalid header: {0}")]
Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error),
#[error("invalid signature parameters: {0}")]
- SignParameters(#[from] aws_sigv4::signing_params::BuildError),
+ SignParameters(#[from] aws_sigv4::sign::v4::signing_params::BuildError),
#[error("could not sign request: {0}")]
SignRequest(#[from] aws_sigv4::http_request::SigningError),
#[error("request timed out")]
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index 4aa7a20a..852274a7 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -9,11 +9,15 @@ use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
use http::header::{ACCEPT, CONTENT_TYPE};
use http::status::StatusCode;
use http::{HeaderName, HeaderValue, Request};
-use hyper::{body::Bytes, Body};
-use hyper::{client::connect::HttpConnector, Client as HttpClient};
+use http_body_util::{BodyExt, Full as FullBody};
+use hyper::body::Bytes;
use hyper_rustls::HttpsConnector;
+use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
+use hyper_util::rt::TokioExecutor;
-use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings};
+use aws_sdk_config::config::Credentials;
+use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings};
+use aws_sigv4::sign::v4::SigningParams;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -22,6 +26,8 @@ mod error;
pub use error::Error;
+pub type Body = FullBody<Bytes>;
+
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v";
@@ -53,19 +59,19 @@ pub struct K2vClientConfig {
pub struct K2vClient {
config: K2vClientConfig,
user_agent: HeaderValue,
- client: HttpClient<HttpsConnector<HttpConnector>>,
+ client: HttpClient<HttpsConnector<HttpConnector>, Body>,
}
impl K2vClient {
/// Create a new K2V client.
pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new()
- .with_native_roots()
+ .with_native_roots()?
.https_or_http()
.enable_http1()
.enable_http2()
.build();
- let client = HttpClient::builder().build(connector);
+ let client = HttpClient::builder(TokioExecutor::new()).build(connector);
let user_agent: std::borrow::Cow<str> = match &config.user_agent {
Some(ua) => ua.into(),
None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
@@ -363,21 +369,37 @@ impl K2vClient {
// Sign request
let signing_settings = SigningSettings::default();
+ let identity = Credentials::new(
+ &self.config.aws_access_key_id,
+ &self.config.aws_secret_access_key,
+ None,
+ None,
+ "k2v-client",
+ )
+ .into();
let signing_params = SigningParams::builder()
- .access_key(&self.config.aws_access_key_id)
- .secret_key(&self.config.aws_secret_access_key)
+ .identity(&identity)
.region(&self.config.region)
- .service_name(SERVICE)
+ .name(SERVICE)
.time(SystemTime::now())
.settings(signing_settings)
- .build()?;
+ .build()?
+ .into();
// Convert the HTTP request into a signable request
- let signable_request = SignableRequest::from(&req);
+ let signable_request = SignableRequest::new(
+ req.method().as_str(),
+ req.uri().to_string(),
+ // TODO: get rid of Unwrap
+ req.headers()
+ .iter()
+ .map(|(x, y)| (x.as_str(), y.to_str().unwrap())),
+ SignableBody::Bytes(req.body().as_ref()),
+ )?;
// Sign and then apply the signature to the request
let (signing_instructions, _signature) =
sign(signable_request, &signing_params)?.into_parts();
- signing_instructions.apply_to_request(&mut req);
+ signing_instructions.apply_to_request_http1x(&mut req);
// Send and wait for timeout
let res = tokio::select! {
@@ -398,12 +420,16 @@ impl K2vClient {
};
let body = match res.status {
- StatusCode::OK => hyper::body::to_bytes(body).await?,
+ StatusCode::OK => BodyExt::collect(body).await?.to_bytes(),
StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound),
StatusCode::NOT_MODIFIED => Bytes::new(),
s => {
- let err_body = hyper::body::to_bytes(body).await.unwrap_or_default();
+ let err_body = body
+ .collect()
+ .await
+ .map(|x| x.to_bytes())
+ .unwrap_or_default();
let err_body_str = std::str::from_utf8(&err_body)
.map(String::from)
.unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body));
@@ -451,7 +477,11 @@ impl K2vClient {
}
fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String {
- let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket);
+ let mut url = format!(
+ "{}/{}",
+ self.config.endpoint.trim_end_matches('/'),
+ self.config.bucket
+ );
if let Some(pk) = partition_key {
url.push('/');
url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET));
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 124b84b0..fd0abc3a 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -20,26 +20,26 @@ garage_table.workspace = true
garage_block.workspace = true
garage_util.workspace = true
-async-trait = "0.1.7"
-arc-swap = "1.0"
-blake2 = "0.10"
-chrono = "0.4"
-err-derive = "0.3"
-hex = "0.4"
-base64 = "0.21"
-tracing = "0.1"
-rand = "0.8"
-zstd = { version = "0.12", default-features = false }
-
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_bytes = "0.11"
-
-futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-opentelemetry = "0.17"
-
-netapp = "0.10"
+async-trait.workspace = true
+arc-swap.workspace = true
+blake2.workspace = true
+chrono.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+base64.workspace = true
+tracing.workspace = true
+rand.workspace = true
+zstd.workspace = true
+
+serde.workspace = true
+serde_bytes.workspace = true
+
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+opentelemetry.workspace = true
+
+netapp.workspace = true
[features]
default = [ "sled", "lmdb", "sqlite" ]
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index efa3e27b..222cfc83 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -126,7 +126,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, add alias
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
let alias_ts = increment_logical_clock_2(
bucket_p.aliases.get_timestamp(alias_name),
@@ -163,7 +163,7 @@ impl<'a> BucketHelper<'a> {
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut bucket_state = bucket.state.as_option_mut().unwrap();
+ let bucket_state = bucket.state.as_option_mut().unwrap();
let mut alias = self
.0
@@ -245,7 +245,7 @@ impl<'a> BucketHelper<'a> {
self.0.bucket_alias_table.insert(&alias).await?;
}
- if let Some(mut bucket_state) = bucket.state.as_option_mut() {
+ if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
}
@@ -274,7 +274,7 @@ impl<'a> BucketHelper<'a> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
- let mut key_param = key.state.as_option_mut().unwrap();
+ let key_param = key.state.as_option_mut().unwrap();
if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) {
if *existing_alias != bucket_id {
@@ -283,7 +283,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, add alias
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
@@ -326,7 +326,7 @@ impl<'a> BucketHelper<'a> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
if key
.state
@@ -359,7 +359,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, remove alias
- let mut key_param = key.state.as_option_mut().unwrap();
+ let key_param = key.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
let alias_ts = increment_logical_clock_2(
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index e8702bf1..aa13ee7b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -232,7 +232,7 @@ impl<T: CountedItem> IndexCounter<T> {
let now = now_msec();
for (s, inc) in counts.iter() {
- let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ let ent = entry.values.entry(s.to_string()).or_insert((0, 0));
ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -348,7 +348,7 @@ impl<T: CountedItem> IndexCounter<T> {
},
};
for (s, v) in counts.iter() {
- let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ let tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 += v;
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index e19f80a8..1b2867a5 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -18,38 +18,38 @@ format_table.workspace = true
garage_db.workspace = true
garage_util.workspace = true
-arc-swap = "1.0"
-bytes = "1.0"
-bytesize = "1.1"
-gethostname = "0.4"
-hex = "0.4"
-tracing = "0.1"
-rand = "0.8"
-itertools="0.10"
-sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-nix = { version = "0.27", default-features = false, features = ["fs"] }
-
-async-trait = "0.1.7"
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_bytes = "0.11"
-serde_json = "1.0"
-err-derive = { version = "0.3", optional = true }
+arc-swap.workspace = true
+bytes.workspace = true
+bytesize.workspace = true
+gethostname.workspace = true
+hex.workspace = true
+tracing.workspace = true
+rand.workspace = true
+itertools.workspace = true
+sodiumoxide.workspace = true
+nix.workspace = true
+
+async-trait.workspace = true
+serde.workspace = true
+serde_bytes.workspace = true
+serde_json.workspace = true
+err-derive = { workspace = true, optional = true }
# newer version requires rust edition 2021
-kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true }
-k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
-schemars = { version = "0.8", optional = true }
-reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
+kube = { workspace = true, optional = true }
+k8s-openapi = { workspace = true, optional = true }
+schemars = { workspace = true, optional = true }
+reqwest = { workspace = true, optional = true }
-pnet_datalink = "0.33"
+pnet_datalink.workspace = true
-futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-stream = { version = "0.1", features = ["net"] }
-opentelemetry = "0.17"
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
+tokio-stream.workspace = true
+opentelemetry.workspace = true
-netapp = { version = "0.10", features = ["telemetry"] }
+netapp.workspace = true
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index ab8d1112..71fd71b0 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -148,7 +148,7 @@ impl ConsulDiscovery {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
- "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ "Could not process node spec from Consul: {:?} (invalid IP address or node ID/pubkey)",
ent
);
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 83cc6816..f22247c3 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -57,7 +57,7 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
pub enum SystemRpc {
/// Response to successfull advertisements
Ok,
- /// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
+ /// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID)
Connect(String),
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 62cffac7..cac17da6 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -18,19 +18,19 @@ garage_db.workspace = true
garage_rpc.workspace = true
garage_util.workspace = true
-opentelemetry = "0.17"
+opentelemetry.workspace = true
-async-trait = "0.1.7"
-arc-swap = "1.0"
-bytes = "1.0"
-hex = "0.4"
-hexdump = "0.1"
-tracing = "0.1"
-rand = "0.8"
+async-trait.workspace = true
+arc-swap.workspace = true
+bytes.workspace = true
+hex.workspace = true
+hexdump.workspace = true
+tracing.workspace = true
+rand.workspace = true
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_bytes = "0.11"
+serde.workspace = true
+serde_bytes.workspace = true
-futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+futures.workspace = true
+futures-util.workspace = true
+tokio.workspace = true
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index afc4d3c3..d72245dd 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -16,42 +16,42 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
-arc-swap = "1.0"
-async-trait = "0.1"
-blake2 = "0.10"
-bytes = "1.0"
-bytesize = "1.2"
-digest = "0.10"
-err-derive = "0.3"
-hexdump = "0.1"
-xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
-hex = "0.4"
-lazy_static = "1.4"
-tracing = "0.1"
-rand = "0.8"
-sha2 = "0.10"
-
-chrono = "0.4"
-rmp-serde = "1.1.2"
-serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
-serde_json = "1.0"
-toml = "0.6"
-
-futures = "0.3"
-tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-
-netapp = "0.10"
-
-http = "0.2"
-hyper = "0.14"
-
-opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
+arc-swap.workspace = true
+async-trait.workspace = true
+blake2.workspace = true
+bytes.workspace = true
+bytesize.workspace = true
+digest.workspace = true
+err-derive.workspace = true
+hexdump.workspace = true
+xxhash-rust.workspace = true
+hex.workspace = true
+lazy_static.workspace = true
+tracing.workspace = true
+rand.workspace = true
+sha2.workspace = true
+
+chrono.workspace = true
+rmp-serde.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+toml.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+
+netapp.workspace = true
+
+http.workspace = true
+hyper.workspace = true
+
+opentelemetry.workspace = true
[build-dependencies]
-rustc_version = "0.4.0"
+rustc_version.workspace = true
[dev-dependencies]
-mktemp = "0.5"
+mktemp.workspace = true
[features]
k2v = []
diff --git a/src/util/config.rs b/src/util/config.rs
index ad5c8e1f..a9a72110 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -1,6 +1,5 @@
//! Contains type and functions related to Garage configuration file
use std::convert::TryFrom;
-use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -45,11 +44,15 @@ pub struct Config {
)]
pub compression_level: Option<i32>,
+ /// Skip the permission check of secret files. Useful when
+ /// POSIX ACLs (or more complex chmods) are used.
+ #[serde(default)]
+ pub allow_world_readable_secrets: bool,
+
/// RPC secret key: 32 bytes hex encoded
pub rpc_secret: Option<String>,
/// Optional file where RPC secret key is read from
- pub rpc_secret_file: Option<String>,
-
+ pub rpc_secret_file: Option<PathBuf>,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Public IP address of this node
@@ -163,12 +166,12 @@ pub struct AdminConfig {
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,
/// File to read metrics token from
- pub metrics_token_file: Option<String>,
+ pub metrics_token_file: Option<PathBuf>,
/// Bearer token to use to access Admin API endpoints
pub admin_token: Option<String>,
/// File to read admin token from
- pub admin_token_file: Option<String>,
+ pub admin_token_file: Option<PathBuf>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
@@ -221,6 +224,13 @@ pub struct KubernetesDiscoveryConfig {
pub skip_crd: bool,
}
+/// Read and parse configuration
+pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
+ let config = std::fs::read_to_string(config_file)?;
+
+ Ok(toml::from_str(&config)?)
+}
+
fn default_db_engine() -> String {
"lmdb".into()
}
@@ -235,68 +245,6 @@ fn default_block_size() -> usize {
1048576
}
-/// Read and parse configuration
-pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
- let mut file = std::fs::OpenOptions::new()
- .read(true)
- .open(config_file.as_path())?;
-
- let mut config = String::new();
- file.read_to_string(&mut config)?;
-
- let mut parsed_config: Config = toml::from_str(&config)?;
-
- secret_from_file(
- &mut parsed_config.rpc_secret,
- &parsed_config.rpc_secret_file,
- "rpc_secret",
- )?;
- secret_from_file(
- &mut parsed_config.admin.metrics_token,
- &parsed_config.admin.metrics_token_file,
- "admin.metrics_token",
- )?;
- secret_from_file(
- &mut parsed_config.admin.admin_token,
- &parsed_config.admin.admin_token_file,
- "admin.admin_token",
- )?;
-
- Ok(parsed_config)
-}
-
-fn secret_from_file(
- secret: &mut Option<String>,
- secret_file: &Option<String>,
- name: &'static str,
-) -> Result<(), Error> {
- match (&secret, &secret_file) {
- (_, None) => {
- // no-op
- }
- (Some(_), Some(_)) => {
- return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into());
- }
- (None, Some(file_path)) => {
- #[cfg(unix)]
- if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
- use std::os::unix::fs::MetadataExt;
- let metadata = std::fs::metadata(file_path)?;
- if metadata.mode() & 0o077 != 0 {
- return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
- }
- }
- let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?;
- let mut secret_buf = String::new();
- file.read_to_string(&mut secret_buf)?;
- // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
- // also editors sometimes add a trailing newline
- *secret = Some(String::from(secret_buf.trim_end()));
- }
- }
- Ok(())
-}
-
fn default_compression() -> Option<i32> {
Some(1)
}
@@ -425,83 +373,4 @@ mod tests {
Ok(())
}
-
- #[test]
- fn test_rpc_secret_file_works() -> Result<(), Error> {
- let path_secret = mktemp::Temp::new_file()?;
- let mut file_secret = File::create(path_secret.as_path())?;
- writeln!(file_secret, "foo")?;
- drop(file_secret);
-
- let path_config = mktemp::Temp::new_file()?;
- let mut file_config = File::create(path_config.as_path())?;
- let path_secret_path = path_secret.as_path();
- writeln!(
- file_config,
- r#"
- metadata_dir = "/tmp/garage/meta"
- data_dir = "/tmp/garage/data"
- replication_mode = "3"
- rpc_bind_addr = "[::]:3901"
- rpc_secret_file = "{}"
-
- [s3_api]
- s3_region = "garage"
- api_bind_addr = "[::]:3900"
- "#,
- path_secret_path.display()
- )?;
- let config = super::read_config(path_config.to_path_buf())?;
- assert_eq!("foo", config.rpc_secret.unwrap());
-
- #[cfg(unix)]
- {
- use std::os::unix::fs::PermissionsExt;
- let metadata = std::fs::metadata(&path_secret_path)?;
- let mut perm = metadata.permissions();
- perm.set_mode(0o660);
- std::fs::set_permissions(&path_secret_path, perm)?;
-
- std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false");
- assert!(super::read_config(path_config.to_path_buf()).is_err());
-
- std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true");
- assert!(super::read_config(path_config.to_path_buf()).is_ok());
- }
-
- drop(path_config);
- drop(path_secret);
- drop(file_config);
- Ok(())
- }
-
- #[test]
- fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
- let path_config = mktemp::Temp::new_file()?;
- let mut file_config = File::create(path_config.as_path())?;
- writeln!(
- file_config,
- r#"
- metadata_dir = "/tmp/garage/meta"
- data_dir = "/tmp/garage/data"
- replication_mode = "3"
- rpc_bind_addr = "[::]:3901"
- rpc_secret= "dummy"
- rpc_secret_file = "dummy"
-
- [s3_api]
- s3_region = "garage"
- api_bind_addr = "[::]:3900"
- "#
- )?;
- assert_eq!(
- "only one of `rpc_secret` and `rpc_secret_file` can be set",
- super::read_config(path_config.to_path_buf())
- .unwrap_err()
- .to_string()
- );
- drop(path_config);
- drop(file_config);
- Ok(())
- }
}
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 9f7720da..49549c9b 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -19,16 +19,17 @@ garage_model.workspace = true
garage_util.workspace = true
garage_table.workspace = true
-err-derive = "0.3"
-tracing = "0.1"
-percent-encoding = "2.1.0"
+err-derive.workspace = true
+tracing.workspace = true
+percent-encoding.workspace = true
-futures = "0.3"
+futures.workspace = true
-http = "0.2"
-hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
-hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
+http.workspace = true
+http-body-util.workspace = true
+hyper.workspace = true
+hyper-util.workspace = true
-tokio = { version = "1.0", default-features = false, features = ["net"] }
+tokio.workspace = true
-opentelemetry = "0.17"
+opentelemetry.workspace = true
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 73780efb..0f9b5dc8 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -2,19 +2,15 @@ use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc};
-use futures::future::Future;
+use tokio::net::{TcpListener, UnixListener};
+use tokio::sync::watch;
use hyper::{
+ body::Incoming as IncomingBody,
header::{HeaderValue, HOST},
- server::conn::AddrStream,
- service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server, StatusCode,
+ Method, Request, Response, StatusCode,
};
-use hyperlocal::UnixServerExt;
-
-use tokio::net::UnixStream;
-
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
@@ -24,7 +20,8 @@ use opentelemetry::{
use crate::error::*;
-use garage_api::helpers::{authority_to_host, host_to_bucket};
+use garage_api::generic_server::{server_loop, UnixListenerOn};
+use garage_api::helpers::*;
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
@@ -74,78 +71,53 @@ pub struct WebServer {
impl WebServer {
/// Run a web server
- pub async fn run(
- garage: Arc<Garage>,
- addr: UnixOrTCPSocketAddress,
- root_domain: String,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), GarageError> {
+ pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
let metrics = Arc::new(WebMetrics::new());
- let web_server = Arc::new(WebServer {
+ Arc::new(WebServer {
garage,
metrics,
root_domain,
- });
-
- let tcp_service = make_service_fn(|conn: &AddrStream| {
- let web_server = web_server.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let web_server = web_server.clone();
-
- web_server.handle_request(req, client_addr.to_string())
- }))
- }
- });
-
- let unix_service = make_service_fn(|_: &UnixStream| {
- let web_server = web_server.clone();
-
- let path = addr.to_string();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let web_server = web_server.clone();
-
- web_server.handle_request(req, path.clone())
- }))
- }
- });
+ })
+ }
- info!("Web server listening on {}", addr);
+ pub async fn run(
+ self: Arc<Self>,
+ bind_addr: UnixOrTCPSocketAddress,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), GarageError> {
+ let server_name = "Web".into();
+ info!("Web server listening on {}", bind_addr);
- match addr {
+ match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
- Server::bind(&addr)
- .serve(tcp_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?
+ let listener = TcpListener::bind(addr).await?;
+
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
fs::remove_file(path)?
}
- let bound = Server::bind_unix(path)?;
+ let listener = UnixListener::bind(path)?;
+ let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(path, Permissions::from_mode(0o222))?;
- bound
- .serve(unix_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?;
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
- };
-
- Ok(())
+ }
}
async fn handle_request(
self: Arc<Self>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
addr: String,
- ) -> Result<Response<Body>, Infallible> {
+ ) -> Result<Response<BoxBody<Error>>, http::Error> {
if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(req.headers())
{
@@ -187,7 +159,8 @@ impl WebServer {
match res {
Ok(res) => {
debug!("{} {} {}", req.method(), res.status(), req.uri());
- Ok(res)
+ Ok(res
+ .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from))))
}
Err(error) => {
info!(
@@ -220,7 +193,10 @@ impl WebServer {
Ok(exists)
}
- async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> {
+ async fn serve_file(
+ self: &Arc<Self>,
+ req: &Request<IncomingBody>,
+ ) -> Result<Response<BoxBody<ApiError>>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
.headers()
@@ -267,9 +243,21 @@ impl WebServer {
);
let ret_doc = match *req.method() {
- Method::OPTIONS => handle_options_for_bucket(req, &bucket),
- Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await,
- Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await,
+ Method::OPTIONS => handle_options_for_bucket(req, &bucket)
+ .map_err(ApiError::from)
+ .map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
+ Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
+ Method::GET => {
+ handle_get(
+ self.garage.clone(),
+ &req,
+ bucket_id,
+ &key,
+ None,
+ Default::default(),
+ )
+ .await
+ }
_ => Err(ApiError::bad_request("HTTP method not supported")),
};
@@ -281,7 +269,7 @@ impl WebServer {
Ok(Response::builder()
.status(StatusCode::FOUND)
.header("Location", url)
- .body(Body::empty())
+ .body(empty_body())
.unwrap())
}
_ => ret_doc,
@@ -310,10 +298,18 @@ impl WebServer {
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
- .body(Body::empty())
+ .body(empty_body::<Infallible>())
.unwrap();
- match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await
+ match handle_get(
+ self.garage.clone(),
+ &req2,
+ bucket_id,
+ &error_document,
+ None,
+ Default::default(),
+ )
+ .await
{
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
@@ -358,7 +354,7 @@ impl WebServer {
}
}
-fn error_to_res(e: Error) -> Response<Body> {
+fn error_to_res(e: Error) -> Response<BoxBody<Error>> {
// If we are here, it is either that:
// - there was an error before trying to get the requested URL
// from the bucket (e.g. bucket not found)
@@ -366,7 +362,7 @@ fn error_to_res(e: Error) -> Response<Body> {
// was a HEAD request or we couldn't get the error document)
// We do NOT enter this code path when returning the bucket's
// error document (this is handled in serve_file)
- let body = Body::from(format!("{}\n", e));
+ let body = string_body(format!("{}\n", e));
let mut http_error = Response::new(body);
*http_error.status_mut() = e.http_status_code();
e.add_headers(http_error.headers_mut());