diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-13 11:24:56 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-13 11:36:28 +0100 |
commit | cf2af186fcc0c8f581a966454b6cd4720d3821f0 (patch) | |
tree | 37a978ba9ffb780fc828cff7b8ec93662d50884f /src | |
parent | db48dd3d6c1f9e86a62e9b8edfce2c1620bcd5f3 (diff) | |
parent | 823078b4cdaf93e09de0847c5eaa75beb7b26b7f (diff) | |
download | garage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.tar.gz garage-cf2af186fcc0c8f581a966454b6cd4720d3821f0.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
70 files changed, 1866 insertions, 1168 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 15bf757e..9fb562a3 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -20,44 +20,46 @@ garage_block.workspace = true garage_util.workspace = true garage_rpc.workspace = true -async-trait = "0.1.7" -base64 = "0.21" -bytes = "1.0" -chrono = "0.4" -crypto-common = "0.1" -err-derive = "0.3" -hex = "0.4" -hmac = "0.12" -idna = "0.4" -tracing = "0.1" -md-5 = "0.10" -nom = "7.1" -sha2 = "0.10" +async-trait.workspace = true +base64.workspace = true +bytes.workspace = true +chrono.workspace = true +crypto-common.workspace = true +err-derive.workspace = true +hex.workspace = true +hmac.workspace = true +idna.workspace = true +tracing.workspace = true +md-5.workspace = true +nom.workspace = true +pin-project.workspace = true +sha2.workspace = true -futures = "0.3" -futures-util = "0.3" -pin-project = "1.0.12" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-stream = "0.1" +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true +tokio-stream.workspace = true -form_urlencoded = "1.0.0" -http = "0.2" -httpdate = "1.0" -http-range = "0.1" -hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } -hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } -multer = "2.0" -percent-encoding = "2.1.0" -roxmltree = "0.18" -serde = { version = "1.0", features = ["derive"] } -serde_bytes = "0.11" -serde_json = "1.0" -quick-xml = { version = "0.26", features = [ "serialize" ] } -url = "2.3" +form_urlencoded.workspace = true +http.workspace = true +httpdate.workspace = true +http-range.workspace = true +http-body-util.workspace = true +hyper.workspace = true +hyper-util.workspace = true +multer.workspace = true +percent-encoding.workspace = true +roxmltree.workspace = true +url.workspace = true -opentelemetry = "0.17" -opentelemetry-prometheus = { version = "0.10", optional = true } -prometheus = { version = "0.13", optional = true } +serde.workspace = true +serde_bytes.workspace = true +serde_json.workspace = true +quick-xml.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } [features] k2v = [ "garage_util/k2v", "garage_model/k2v" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 41a5e68c..2b9be24e 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use tokio::sync::watch; use opentelemetry::trace::SpanRef; @@ -27,7 +27,9 @@ use crate::admin::error::*; use crate::admin::key::*; use crate::admin::router_v0; use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::host_to_bucket; +use crate::helpers::*; + +pub type ResBody = BoxBody<Error>; pub struct AdminApiServer { garage: Arc<Garage>, @@ -63,24 +65,27 @@ impl AdminApiServer { pub async fn run( self, bind_addr: UnixOrTCPSocketAddress, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, Some(0o220), shutdown_signal) + .run_server(bind_addr, Some(0o220), must_exit) .await } - fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { + fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> { Ok(Response::builder() .status(StatusCode::NO_CONTENT) .header(ALLOW, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .body(Body::empty())?) + .body(empty_body())?) } - async fn handle_check_domain(&self, req: Request<Body>) -> Result<Response<Body>, Error> { + async fn handle_check_domain( + &self, + req: Request<IncomingBody>, + ) -> Result<Response<ResBody>, Error> { let query_params: HashMap<String, String> = req .uri() .query() @@ -104,7 +109,7 @@ impl AdminApiServer { if self.check_domain(domain).await? { Ok(Response::builder() .status(StatusCode::OK) - .body(Body::from(format!( + .body(string_body(format!( "Domain '{domain}' is managed by Garage" )))?) } else { @@ -167,7 +172,7 @@ impl AdminApiServer { } } - fn handle_health(&self) -> Result<Response<Body>, Error> { + fn handle_health(&self) -> Result<Response<ResBody>, Error> { let health = self.garage.system.health(); let (status, status_str) = match health.status { @@ -189,10 +194,10 @@ impl AdminApiServer { Ok(Response::builder() .status(status) .header(http::header::CONTENT_TYPE, "text/plain") - .body(Body::from(status_str))?) + .body(string_body(status_str))?) } - fn handle_metrics(&self) -> Result<Response<Body>, Error> { + fn handle_metrics(&self) -> Result<Response<ResBody>, Error> { #[cfg(feature = "metrics")] { use opentelemetry::trace::Tracer; @@ -212,7 +217,7 @@ impl AdminApiServer { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer))?) + .body(bytes_body(buffer.into()))?) } #[cfg(not(feature = "metrics"))] Err(Error::bad_request( @@ -229,7 +234,7 @@ impl ApiHandler for AdminApiServer { type Endpoint = Endpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> { if req.uri().path().starts_with("/v0/") { let endpoint_v0 = router_v0::Endpoint::from_request(req)?; Endpoint::from_v0(endpoint_v0) @@ -240,9 +245,9 @@ impl ApiHandler for AdminApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: Endpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let expected_auth_header = match endpoint.authorization_type() { Authorization::None => None, diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 65929d61..a8718a9f 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -17,12 +17,13 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::admin::key::ApiBucketKeyPerm; use crate::common_error::CommonError; -use crate::helpers::{json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let buckets = garage .bucket_table .get_range( @@ -90,7 +91,7 @@ pub async fn handle_get_bucket_info( garage: &Arc<Garage>, id: Option<String>, global_alias: Option<String>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = match (id, global_alias) { (Some(id), None) => parse_bucket_id(&id)?, (None, Some(ga)) => garage @@ -111,7 +112,7 @@ pub async fn handle_get_bucket_info( async fn bucket_info_results( garage: &Arc<Garage>, bucket_id: Uuid, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) @@ -268,9 +269,9 @@ struct GetBucketInfoKey { pub async fn handle_create_bucket( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<CreateBucketRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?; if let Some(ga) = &req.global_alias { if !is_valid_bucket_name(ga) { @@ -360,7 +361,7 @@ struct CreateBucketLocalAlias { pub async fn handle_delete_bucket( garage: &Arc<Garage>, id: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let helper = garage.bucket_helper(); let bucket_id = parse_bucket_id(&id)?; @@ -403,15 +404,15 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_update_bucket( garage: &Arc<Garage>, id: String, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<UpdateBucketRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?; let bucket_id = parse_bucket_id(&id)?; let mut bucket = garage @@ -470,10 +471,10 @@ struct UpdateBucketWebsiteAccess { pub async fn handle_bucket_change_key_perm( garage: &Arc<Garage>, - req: Request<Body>, + req: Request<IncomingBody>, new_perm_flag: bool, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?; let bucket_id = parse_bucket_id(&req.bucket_id)?; @@ -526,7 +527,7 @@ pub async fn handle_global_alias_bucket( garage: &Arc<Garage>, bucket_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -541,7 +542,7 @@ pub async fn handle_global_unalias_bucket( garage: &Arc<Garage>, bucket_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -557,7 +558,7 @@ pub async fn handle_local_alias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -573,7 +574,7 @@ pub async fn handle_local_unalias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 8677257d..8ce6c5ed 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -12,10 +12,11 @@ use garage_rpc::layout; use garage_model::garage::Garage; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; -pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let layout = garage.system.cluster_layout(); let mut nodes = garage .system @@ -110,7 +111,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response< Ok(json_ok_response(&res)?) } -pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = ClusterHealth { @@ -132,9 +133,9 @@ pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response< pub async fn handle_connect_cluster_nodes( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<Vec<String>>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<Vec<String>, _, Error>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) .await @@ -154,7 +155,7 @@ pub async fn handle_connect_cluster_nodes( Ok(json_ok_response(&res)?) } -pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = format_cluster_layout(&garage.system.cluster_layout()); Ok(json_ok_response(&res)?) @@ -290,9 +291,9 @@ struct NodeResp { pub async fn handle_update_cluster_layout( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; let mut layout = garage.system.cluster_layout().clone(); @@ -336,9 +337,9 @@ pub async fn handle_update_cluster_layout( pub async fn handle_apply_cluster_layout( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let param = parse_json_body::<ApplyLayoutRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?; let layout = garage.system.cluster_layout().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; @@ -356,7 +357,9 @@ pub async fn handle_apply_cluster_layout( Ok(json_ok_response(&res)?) } -pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_revert_cluster_layout( + garage: &Arc<Garage>, +) -> Result<Response<ResBody>, Error> { let layout = garage.system.cluster_layout().clone(); let layout = layout.revert_staged_changes()?; garage diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index ed1a07bd..2668b42d 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -1,13 +1,13 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; +use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::*; /// Errors of this crate #[derive(Debug, Error)] @@ -40,18 +40,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n), - } - } -} - impl Error { fn code(&self) -> &'static str { match self { @@ -77,14 +65,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -92,6 +80,7 @@ impl ApiError for Error { } "# .into() - })) + }); + error_body(error_str) } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 8d1c6890..1efaca16 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_table::*; @@ -9,10 +9,11 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; -use crate::helpers::{is_default, json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = garage .key_table .get_range( @@ -45,7 +46,7 @@ pub async fn handle_get_key_info( id: Option<String>, search: Option<String>, show_secret_key: bool, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let key = if let Some(id) = id { garage.key_helper().get_existing_key(&id).await? } else if let Some(search) = search { @@ -62,9 +63,9 @@ pub async fn handle_get_key_info( pub async fn handle_create_key( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<CreateKeyRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); garage.key_table.insert(&key).await?; @@ -80,9 +81,9 @@ struct CreateKeyRequest { pub async fn handle_import_key( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<ImportKeyRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; if prev_key.is_some() { @@ -111,9 +112,9 @@ struct ImportKeyRequest { pub async fn handle_update_key( garage: &Arc<Garage>, id: String, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let req = parse_json_body::<UpdateKeyRequest>(req).await?; + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { + let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?; let mut key = garage.key_helper().get_existing_key(&id).await?; @@ -146,7 +147,10 @@ struct UpdateKeyRequest { deny: Option<KeyPerm>, } -pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> { +pub async fn handle_delete_key( + garage: &Arc<Garage>, + id: String, +) -> Result<Response<ResBody>, Error> { let mut key = garage.key_helper().get_existing_key(&id).await?; key.state.as_option().unwrap(); @@ -155,14 +159,14 @@ pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Respo Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } async fn key_info_results( garage: &Arc<Garage>, key: Key, show_secret: bool, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let mut relevant_buckets = HashMap::new(); let key_state = key.state.as_option().unwrap(); diff --git a/src/api/common_error.rs b/src/api/common_error.rs index ecb22fd8..c47555d4 100644 --- a/src/api/common_error.rs +++ b/src/api/common_error.rs @@ -3,6 +3,8 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +use garage_model::helper::error::Error as HelperError; + /// Errors of this crate #[derive(Debug, Error)] pub enum CommonError { @@ -28,6 +30,10 @@ pub enum CommonError { #[error(display = "Bad request: {}", _0)] BadRequest(String), + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + // ---- SPECIFIC ERROR CONDITIONS ---- // These have to be error codes referenced in the S3 spec here: // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList @@ -62,7 +68,9 @@ impl CommonError { CommonError::Forbidden(_) => StatusCode::FORBIDDEN, CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, - CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, + CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => { + StatusCode::BAD_REQUEST + } } } @@ -80,6 +88,7 @@ impl CommonError { CommonError::BucketAlreadyExists => "BucketAlreadyExists", CommonError::BucketNotEmpty => "BucketNotEmpty", CommonError::InvalidBucketName(_) => "InvalidBucketName", + CommonError::InvalidHeader(_) => "InvalidHeaderValue", } } @@ -88,6 +97,18 @@ impl CommonError { } } +impl From<HelperError> for CommonError { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::InternalError(i), + HelperError::BadRequest(b) => Self::BadRequest(b), + HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n), + HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n), + e => Self::bad_request(format!("{}", e)), + } + } +} + pub trait CommonErrorDerivative: From<CommonError> { fn internal_error<M: ToString>(msg: M) -> Self { Self::from(CommonError::InternalError(GarageError::Message( diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index fa346f48..9c49fdf3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use std::fs::{self, Permissions}; use std::os::unix::fs::PermissionsExt; use std::sync::Arc; @@ -5,16 +6,19 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; +use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; +use http_body_util::BodyExt; use hyper::header::HeaderValue; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use hyper::{HeaderMap, StatusCode}; +use hyper_util::rt::TokioIo; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; +use tokio::sync::watch; use opentelemetry::{ global, @@ -28,6 +32,8 @@ use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; +use crate::helpers::{BoxBody, ErrorBody}; + pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); @@ -36,7 +42,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_status_code(&self) -> StatusCode; fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); - fn http_body(&self, garage_region: &str, path: &str) -> Body; + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } #[async_trait] @@ -47,12 +53,12 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Endpoint: ApiEndpoint; type Error: ApiError; - fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>; + fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: Self::Endpoint, - ) -> Result<Response<Body>, Self::Error>; + ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; } pub(crate) struct ApiServer<A: ApiHandler> { @@ -99,74 +105,42 @@ impl<A: ApiHandler> ApiServer<A> { self: Arc<Self>, bind_addr: UnixOrTCPSocketAddress, unix_bind_addr_mode: Option<u32>, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { - let tcp_service = make_service_fn(|conn: &AddrStream| { - let this = self.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let this = this.clone(); - - this.handler(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let this = self.clone(); - - let path = bind_addr.to_string(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let this = this.clone(); - - this.handler(req, path.clone()) - })) - } - }); - - info!( - "{} API server listening on {}", - A::API_NAME_DISPLAY, - bind_addr - ); + let server_name = format!("{} API", A::API_NAME_DISPLAY); + info!("{} server listening on {}", server_name, bind_addr); match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + let listener = TcpListener::bind(addr).await?; + + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(server_name, listener, handler, must_exit).await } - }; - - Ok(()) + } } async fn handler( self: Arc<Self>, - req: Request<Body>, + req: Request<IncomingBody>, addr: String, - ) -> Result<Response<Body>, GarageError> { + ) -> Result<Response<BoxBody<A::Error>>, http::Error> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = @@ -205,7 +179,7 @@ impl<A: ApiHandler> ApiServer<A> { Ok(x) } Err(e) => { - let body: Body = e.http_body(&self.region, uri.path()); + let body = e.http_body(&self.region, uri.path()); let mut http_error_builder = Response::builder().status(e.http_status_code()); if let Some(header_map) = http_error_builder.headers_mut() { @@ -219,12 +193,16 @@ impl<A: ApiHandler> ApiServer<A> { } else { info!("Response: error {}, {}", e.http_status_code(), e); } - Ok(http_error) + Ok(http_error + .map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!())))) } } } - async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> { + async fn handler_stage2( + &self, + req: Request<IncomingBody>, + ) -> Result<Response<BoxBody<A::Error>>, A::Error> { let endpoint = self.api_handler.parse_endpoint(&req)?; debug!("Endpoint: {}", endpoint.name()); @@ -265,3 +243,123 @@ impl<A: ApiHandler> ApiServer<A> { res } } + +// ==== helper functions ==== + +#[async_trait] +pub trait Accept: Send + Sync + 'static { + type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; +} + +#[async_trait] +impl Accept for TcpListener { + type Stream = TcpStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.accept() + .await + .map(|(stream, addr)| (stream, addr.to_string())) + } +} + +pub struct UnixListenerOn(pub UnixListener, pub String); + +#[async_trait] +impl Accept for UnixListenerOn { + type Stream = UnixStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.0 + .accept() + .await + .map(|(stream, _addr)| (stream, self.1.clone())) + } +} + +pub async fn server_loop<A, H, F, E>( + server_name: String, + listener: A, + handler: H, + mut must_exit: watch::Receiver<bool>, +) -> Result<(), GarageError> +where + A: Accept, + H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static, + F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static, + E: Send + Sync + std::error::Error + 'static, +{ + let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); + let connection_collector = tokio::spawn({ + let server_name = server_name.clone(); + async move { + let mut connections = FuturesUnordered::new(); + loop { + let collect_next = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("{} server: HTTP connection finished: {:?}", server_name, result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => connections.push(f), + None => break, + } + } + } + } + if !connections.is_empty() { + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + while let Some(conn_res) = connections.next().await { + trace!( + "{} server: HTTP connection finished: {:?}", + server_name, + conn_res + ); + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + } + } + } + }); + + while !*must_exit.borrow() { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = must_exit.changed() => continue, + }; + + let io = TokioIo::new(stream); + + let handler = handler.clone(); + let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone()); + + let fut = tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + conn_in.send(fut)?; + } + + info!("{} server exiting", server_name); + drop(conn_in); + connection_collector.await?; + + Ok(()) +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 1d55ebd5..5f488912 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,7 +1,17 @@ -use hyper::{Body, Request, Response}; +use std::convert::Infallible; + +use futures::{Stream, StreamExt, TryStreamExt}; + +use http_body_util::{BodyExt, Full as FullBody}; +use hyper::{ + body::{Body, Bytes}, + Request, Response, +}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; +use garage_util::error::Error as GarageError; + use crate::common_error::{CommonError as Error, *}; /// What kind of authorization is required to perform a given action @@ -138,18 +148,64 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { None } -pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +// =============== body helpers ================= + +pub type EmptyBody = http_body_util::Empty<bytes::Bytes>; +pub type ErrorBody = FullBody<bytes::Bytes>; +pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>; + +pub fn string_body<E>(s: String) -> BoxBody<E> { + bytes_body(bytes::Bytes::from(s.into_bytes())) +} +pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> { + BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!())) +} +pub fn empty_body<E>() -> BoxBody<E> { + BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!())) +} +pub fn error_body(s: String) -> ErrorBody { + ErrorBody::from(bytes::Bytes::from(s.into_bytes())) +} + +pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E> +where + T: for<'de> Deserialize<'de>, + B: Body, + E: From<<B as Body>::Error> + From<Error>, +{ + let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } -pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> { - let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; +pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E> +where + E: From<Error>, +{ + let resp_json = serde_json::to_string_pretty(res) + .map_err(GarageError::from) + .map_err(Error::from)?; Ok(Response::builder() .status(hyper::StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(resp_json))?) + .body(string_body(resp_json)) + .unwrap()) +} + +pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>> +where + B: Body<Data = Bytes>, + <B as Body>::Error: Into<E>, + E: From<Error>, +{ + let stream = http_body_util::BodyStream::new(body); + let stream = TryStreamExt::map_err(stream, Into::into); + stream.map(|x| { + x.and_then(|f| { + f.into_data() + .map_err(|_| E::from(Error::bad_request("non-data frame"))) + }) + }) } pub fn is_default<T: Default + PartialEq>(v: &T) -> bool { diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 3a032aba..e97da2af 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; -use hyper::{Body, Method, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -25,6 +25,9 @@ use crate::k2v::item::*; use crate::k2v::router::Endpoint; use crate::s3::cors::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody<Error>; + pub struct K2VApiServer { garage: Arc<Garage>, } @@ -39,10 +42,10 @@ impl K2VApiServer { garage: Arc<Garage>, bind_addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, None, shutdown_signal) + .run_server(bind_addr, None, must_exit) .await } } @@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer { type Endpoint = K2VApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> { let (endpoint, bucket_name) = Endpoint::from_request(req)?; Ok(K2VApiEndpoint { @@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: K2VApiEndpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let K2VApiEndpoint { bucket_name, endpoint, @@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is procesed early, before we even check for an API key if let Endpoint::Options = endpoint { - return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + let options_res = handle_options_api(garage, &req, Some(bucket_name)) .await - .ok_or_bad_request("Error handling OPTIONS")?); + .ok_or_bad_request("Error handling OPTIONS")?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 294380ea..ae2778b1 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -13,15 +13,16 @@ use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; let mut items2 = vec![]; for it in items { @@ -41,15 +42,15 @@ pub async fn handle_insert_batch( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_read_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -139,9 +140,9 @@ async fn handle_read_batch_query( pub async fn handle_delete_batch( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { + let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range( garage: Arc<Garage>, bucket_id: Uuid, partition_key: &str, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<ReqBody>, +) -> Result<Response<ResBody>, Error> { use garage_model::k2v::sub::PollRange; - let query = parse_json_body::<PollRangeQuery>(req).await?; + let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; @@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 4eb017ab..16479227 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -1,13 +1,11 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; - -use garage_model::helper::error::Error as HelperError; +use hyper::{HeaderMap, StatusCode}; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::*; use crate::signature::error::Error as SignatureError; /// Errors of this crate @@ -30,10 +28,6 @@ pub enum Error { #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client asked for an invalid return format (invalid Accept header) #[error(display = "Not acceptable: {}", _0)] NotAcceptable(String), @@ -54,18 +48,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::Common(CommonError::BadRequest(format!("{}", e))), - } - } -} - impl From<SignatureError> for Error { fn from(err: SignatureError) -> Self { match err { @@ -74,7 +56,6 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -90,7 +71,6 @@ impl Error { Error::NotAcceptable(_) => "NotAcceptable", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InvalidBase64(_) => "InvalidBase64", - Error::InvalidHeader(_) => "InvalidHeaderValue", Error::InvalidUtf8Str(_) => "InvalidUtf8String", } } @@ -105,7 +85,6 @@ impl ApiError for Error { Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) - | Error::InvalidHeader(_) | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, } } @@ -115,14 +94,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -130,6 +109,7 @@ impl ApiError for Error { } "# .into() - })) + }); + error_body(error_str) } } diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e8cd1fba..291464ab 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use hyper::{Body, Response}; +use hyper::Response; use serde::Serialize; use garage_util::data::*; @@ -11,6 +11,7 @@ use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; +use crate::k2v::api_server::ResBody; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -22,7 +23,7 @@ pub async fn handle_read_index( end: Option<String>, limit: Option<u64>, reverse: Option<bool>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let reverse = reverse.unwrap_or(false); let node_id_vec = garage @@ -71,7 +72,7 @@ pub async fn handle_read_index( next_start, }; - Ok(json_ok_response(&resp)?) + json_ok_response::<Error, _>(&resp) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index e13a0f30..0c5931a1 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use http::header; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -11,6 +11,8 @@ use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; +use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; @@ -22,7 +24,7 @@ pub enum ReturnFormat { } impl ReturnFormat { - pub fn from(req: &Request<Body>) -> Result<Self, Error> { + pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> { let accept = match req.headers().get(header::ACCEPT) { Some(a) => a.to_str()?, None => return Ok(Self::Json), @@ -40,7 +42,7 @@ impl ReturnFormat { } } - pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { + pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> { let vals = item.values(); if vals.is_empty() { @@ -52,7 +54,7 @@ impl ReturnFormat { Self::Binary if vals.len() > 1 => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .status(StatusCode::CONFLICT) - .body(Body::empty())?), + .body(empty_body())?), Self::Binary => { assert!(vals.len() == 1); Self::make_binary_response(ct, vals[0]) @@ -62,22 +64,22 @@ impl ReturnFormat { } } - fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { + fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> { match v { DvvsValue::Deleted => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::NO_CONTENT) - .body(Body::empty())?), + .body(empty_body())?), DvvsValue::Value(v) => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::OK) - .body(Body::from(v.to_vec()))?), + .body(bytes_body(v.to_vec().into()))?), } } - fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> { let items = v .iter() .map(|v| match v { @@ -91,7 +93,7 @@ impl ReturnFormat { .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/json") .status(StatusCode::OK) - .body(Body::from(json_body))?) + .body(string_body(json_body))?) } } @@ -99,11 +101,11 @@ impl ReturnFormat { #[allow(clippy::ptr_arg)] pub async fn handle_read_item( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let format = ReturnFormat::from(req)?; let item = garage @@ -124,11 +126,11 @@ pub async fn handle_read_item( pub async fn handle_insert_item( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -137,7 +139,10 @@ pub async fn handle_insert_item( .map(CausalContext::parse_helper) .transpose()?; - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); + let value = DvvsValue::Value(body.to_vec()); garage @@ -154,16 +159,16 @@ pub async fn handle_insert_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_delete_item( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -188,20 +193,20 @@ pub async fn handle_delete_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let format = ReturnFormat::from(req)?; let causal_context = @@ -226,6 +231,6 @@ pub async fn handle_poll_item( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 887839dd..7fac6261 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::header; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -34,6 +34,9 @@ use crate::s3::put::*; use crate::s3::router::Endpoint; use crate::s3::website::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody<Error>; + pub struct S3ApiServer { garage: Arc<Garage>, } @@ -48,19 +51,19 @@ impl S3ApiServer { garage: Arc<Garage>, addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, None, shutdown_signal) + .run_server(addr, None, must_exit) .await } async fn handle_request_without_bucket( &self, - _req: Request<Body>, + _req: Request<ReqBody>, api_key: Key, endpoint: Endpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { match endpoint { Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), @@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer { type Endpoint = S3ApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> { let authority = req .headers() .get(header::HOST) @@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: S3ApiEndpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let S3ApiEndpoint { bucket_name, endpoint, @@ -118,7 +121,8 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; + let options_res = handle_options_api(garage, &req, bucket_name).await?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; @@ -174,8 +178,26 @@ impl ApiHandler for S3ApiServer { key, part_number, .. } => handle_head(garage, &req, bucket_id, &key, part_number).await, Endpoint::GetObject { - key, part_number, .. - } => handle_get(garage, &req, bucket_id, &key, part_number).await, + key, + part_number, + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + .. + } => { + let overrides = GetObjectOverrides { + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + }; + handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + } Endpoint::UploadPart { key, part_number, @@ -235,8 +257,7 @@ impl ApiHandler for S3ApiServer { } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); + let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } Endpoint::DeleteBucket {} => { @@ -257,7 +278,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), @@ -287,7 +308,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), prefix: prefix.unwrap_or_default(), @@ -320,7 +341,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 733981e1..fa2f1b6d 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; @@ -14,11 +15,13 @@ use garage_util::data::*; use garage_util::time::*; use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> { let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> { let versioning = s3_xml::VersioningConfiguration { xmlns: (), status: None, @@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> { +pub async fn handle_list_buckets( + garage: &Garage, + api_key: &Key, +) -> Result<Response<ResBody>, Error> { let key_p = api_key.params().ok_or_internal_error( "Key should not be in deleted state at this point (in handle_list_buckets)", )?; @@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_create_bucket( garage: &Garage, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, api_key: Key, bucket_name: String, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -194,7 +200,7 @@ pub async fn handle_create_bucket( Ok(Response::builder() .header("Location", format!("/{}", bucket_name)) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } @@ -203,7 +209,7 @@ pub async fn handle_delete_bucket( bucket_id: Uuid, bucket_name: String, api_key: Key, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let key_params = api_key .params() .ok_or_internal_error("Key should not be deleted at this point")?; @@ -282,7 +288,7 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 68b4f0c9..ba9bfc88 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; -use hyper::{Body, Request, Response}; +use hyper::{Request, Response}; use serde::Serialize; use garage_rpc::netapp::bytes_buf::BytesBuf; @@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::parse_bucket_key; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::multipart; use crate::s3::put::get_headers; @@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let source_object = get_copy_source(&garage, api_key, req).await?; @@ -176,18 +177,18 @@ pub async fn handle_copy( "x-amz-copy-source-version-id", hex::encode(source_version.uuid), ) - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_upload_part_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let dest_upload_id = multipart::decode_upload_id(upload_id)?; @@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), ) - .body(Body::from(resp_xml))?) + .body(string_body(resp_xml))?) } async fn get_copy_source( garage: &Garage, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, ) -> Result<Object, Error> { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; @@ -501,7 +502,7 @@ struct CopyPreconditionHeaders { } impl CopyPreconditionHeaders { - fn parse(req: &Request<Body>) -> Result<Self, Error> { + fn parse(req: &Request<ReqBody>) -> Result<Self, Error> { Ok(Self { copy_source_if_match: req .headers() diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 49097ad1..e069cae4 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -5,10 +5,18 @@ use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, }; -use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; +use hyper::{ + body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, + StatusCode, +}; + +use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -17,7 +25,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -34,18 +42,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_cors( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -55,16 +63,16 @@ pub async fn handle_delete_cors( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_cors( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -84,14 +92,14 @@ pub async fn handle_put_cors( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } -pub async fn handle_options_s3api( +pub async fn handle_options_api( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<IncomingBody>, bucket_name: Option<String>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -121,7 +129,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(Body::empty())?) + .body(EmptyBody::new())?) } } else { // If there is no bucket name in the request, @@ -131,14 +139,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(Body::empty())?) + .body(EmptyBody::new())?) } } pub fn handle_options_for_bucket( - req: &Request<Body>, + req: &Request<IncomingBody>, bucket: &Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<EmptyBody>, CommonError> { let origin = req .headers() .get("Origin") @@ -161,18 +169,20 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(Body::empty())?; + .body(EmptyBody::new())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } } - Err(Error::forbidden("This CORS request is not allowed.")) + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) } pub fn find_matching_cors_rule<'a>( bucket: &'a Bucket, - req: &Request<Body>, + req: &Request<impl Body>, ) -> Result<Option<&'a GarageCorsRule>, Error> { if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -209,7 +219,7 @@ where } pub fn add_cors_headers( - resp: &mut Response<Body>, + resp: &mut Response<impl Body>, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 1c491eac..3fb39147 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,12 +1,15 @@ use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; @@ -59,11 +62,11 @@ pub async fn handle_delete( garage: Arc<Garage>, bucket_id: Uuid, key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { match handle_delete_internal(&garage, bucket_id, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap()), Err(e) => Err(e), } @@ -72,10 +75,10 @@ pub async fn handle_delete( pub async fn handle_delete_objects( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -118,7 +121,7 @@ pub async fn handle_delete_objects( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } struct DeleteRequest { diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index c50cff9f..f86c19a6 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -2,13 +2,12 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; - -use garage_model::helper::error::Error as HelperError; +use hyper::{HeaderMap, StatusCode}; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; +use crate::helpers::*; use crate::s3::xml as s3_xml; use crate::signature::error::Error as SignatureError; @@ -62,10 +61,6 @@ pub enum Error { #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), @@ -86,18 +81,6 @@ where impl CommonErrorDerivative for Error {} -impl From<HelperError> for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::bad_request(format!("{}", e)), - } - } -} - impl From<roxmltree::Error> for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -118,7 +101,6 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -143,9 +125,7 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", - Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { - "InvalidRequest" - } + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", } } } @@ -165,8 +145,7 @@ impl ApiError for Error { | Error::EntityTooSmall | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) - | Error::InvalidUtf8String(_) - | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, } } @@ -189,22 +168,23 @@ impl ApiError for Error { } } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = s3_xml::Error { code: s3_xml::Value(self.aws_code().to_string()), message: s3_xml::Value(format!("{}", self)), resource: Some(s3_xml::Value(path.to_string())), region: Some(s3_xml::Value(garage_region.to_string())), }; - Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { r#" <?xml version="1.0" encoding="UTF-8"?> <Error> - <Code>InternalError</Code> - <Message>XML encoding of error failed</Message> + <Code>InternalError</Code> + <Message>XML encoding of error failed</Message> </Error> - "# + "# .into() - })) + }); + error_body(error_str) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 5e682726..53f0a345 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -1,17 +1,20 @@ //! Function related to GET and HEAD requests +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; use futures::future; use futures::stream::{self, StreamExt}; use http::header::{ - ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, - IF_NONE_MATCH, LAST_MODIFIED, RANGE, + ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, + LAST_MODIFIED, RANGE, }; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; +use garage_block::manager::BlockStream; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; @@ -20,10 +23,22 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; +#[derive(Default)] +pub struct GetObjectOverrides { + pub(crate) response_cache_control: Option<String>, + pub(crate) response_content_disposition: Option<String>, + pub(crate) response_content_encoding: Option<String>, + pub(crate) response_content_language: Option<String>, + pub(crate) response_content_type: Option<String>, + pub(crate) response_expires: Option<String>, +} + fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -49,11 +64,37 @@ fn object_headers( resp } +/// Override headers according to specific query parameters, see +/// section "Overriding response header values through the request" in +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html +fn getobject_override_headers( + overrides: GetObjectOverrides, + resp: &mut http::response::Builder, +) -> Result<(), Error> { + // TODO: this only applies for signed requests, so when we support + // anonymous access in the future we will have to do a permission check here + let overrides = [ + (CACHE_CONTROL, overrides.response_cache_control), + (CONTENT_DISPOSITION, overrides.response_content_disposition), + (CONTENT_ENCODING, overrides.response_content_encoding), + (CONTENT_LANGUAGE, overrides.response_content_language), + (CONTENT_TYPE, overrides.response_content_type), + (EXPIRES, overrides.response_expires), + ]; + for (hdr, val_opt) in overrides { + if let Some(val) = val_opt { + let val = val.try_into().ok_or_bad_request("invalid header value")?; + resp.headers_mut().unwrap().insert(hdr, val); + } + } + Ok(()) +} + fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<Body>, -) -> Option<Response<Body>> { + req: &Request<impl Body>, +) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational @@ -80,7 +121,7 @@ fn try_answer_cached( Some( Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ) } else { @@ -91,11 +132,11 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -138,7 +179,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, "1") .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -163,7 +204,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } _ => unreachable!(), } @@ -171,18 +212,19 @@ pub async fn handle_head( Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } /// Handle GET request pub async fn handle_get( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { + overrides: GetObjectOverrides, +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -233,18 +275,18 @@ pub async fn handle_get( (None, None) => (), } - let resp_builder = object_headers(last_v, last_v_meta) + let mut resp_builder = object_headers(last_v, last_v_meta) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size)) .status(StatusCode::OK); + getobject_override_headers(overrides, &mut resp_builder)?; match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - let body: Body = Body::from(bytes.to_vec()); - Ok(resp_builder.body(body)?) + Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel(2); + let (tx, rx) = mpsc::channel::<BlockStream>(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -282,20 +324,12 @@ pub async fn handle_get( { Ok(()) => (), Err(e) => { - let err = std::io::Error::new( - std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - let _ = tx - .send(Box::pin(stream::once(future::ready(Err(err))))) - .await; + let _ = tx.send(error_stream_item(e)).await; } } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); - - let body = hyper::body::Body::wrap_stream(body_stream); + let body = response_body_from_block_stream(rx); Ok(resp_builder.body(body)?) } } @@ -308,7 +342,10 @@ async fn handle_get_range( version_meta: &ObjectVersionMeta, begin: u64, end: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { + // Here we do not use getobject_override_headers because we don't + // want to add any overridden headers (those should not be added + // when returning PARTIAL_CONTENT) let resp_builder = object_headers(version, version_meta) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( @@ -321,7 +358,7 @@ async fn handle_get_range( ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { if end as usize <= bytes.len() { - let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); + let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) } else { Err(Error::internal_error( @@ -348,7 +385,8 @@ async fn handle_get_part( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, part_number: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { + // Same as for get_range, no getobject_override_headers let resp_builder = object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); @@ -364,7 +402,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(Body::from(bytes.to_vec()))?) + .body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -392,7 +430,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<Body>, + req: &Request<impl Body>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -434,7 +472,7 @@ fn body_from_blocks_range( all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, -) -> Body { +) -> ResBody { // We will store here the list of blocks that have an intersection with the requested // range, as well as their "true offset", which is their actual offset in the complete // file (whereas block.offset designates the offset of the block WITHIN THE PART @@ -456,17 +494,17 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { + let (tx, rx) = mpsc::channel::<BlockStream>(2); + + tokio::spawn(async move { + match async { let garage = garage.clone(); - async move { - garage + for (i, (block, block_offset)) in blocks.iter().enumerate() { + let block_stream = garage .block_manager .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { + .await? + .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { let chunk_len = chunk_bytes.len() as u64; @@ -502,20 +540,42 @@ fn body_from_blocks_range( }; futures::future::ready(r) }) - .filter_map(futures::future::ready) + .filter_map(futures::future::ready); + + let block_stream: BlockStream = Box::pin(block_stream); + tx.send(Box::pin(block_stream)) + .await + .ok_or_message("channel closed")?; } - }) - .buffered(2) - .flatten(); - hyper::body::Body::wrap_stream(body_stream) + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let _ = tx.send(error_stream_item(e)).await; + } + } + }); + + response_body_from_block_stream(rx) +} + +fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) +fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + Box::pin(stream::once(future::ready(Err(err)))) } diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 1e7d6755..35757e8c 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,10 +1,13 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -16,7 +19,7 @@ use garage_model::bucket_table::{ use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 1b9e8cd5..b832a4f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable}; use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Response}; +use hyper::Response; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -16,7 +16,8 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; use crate::encoding::*; -use crate::helpers::key_after_prefix; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -63,7 +64,7 @@ pub struct ListPartsQuery { pub async fn handle_list( garage: Arc<Garage>, query: &ListObjectsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -162,13 +163,13 @@ pub async fn handle_list( let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_multipart_upload( garage: Arc<Garage>, query: &ListMultipartUploadsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_parts( garage: Arc<Garage>, query: &ListPartsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; @@ -319,7 +320,7 @@ pub async fn handle_list_parts( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } /* diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 6b786318..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use futures::prelude::*; -use hyper::body::Body; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -17,6 +16,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -26,11 +27,11 @@ use crate::signature::verify_signed_content; pub async fn handle_create_multipart_upload( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_name: &str, bucket_id: Uuid, key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -65,18 +66,18 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -87,8 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = req.into_body().map_err(Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), @@ -172,7 +173,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) + .body(empty_body()) .unwrap(); Ok(response) } @@ -210,14 +211,16 @@ impl Drop for InterruptedCleanup { pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -365,7 +368,7 @@ pub async fn handle_complete_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( @@ -373,7 +376,7 @@ pub async fn handle_abort_multipart_upload( bucket_id: Uuid, key: &str, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = @@ -383,7 +386,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Body::from(vec![]))) + Ok(Response::new(empty_body())) } // ======== helpers ============ diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 542b7a81..bca8d6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{Infallible, TryInto}; use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; @@ -9,12 +9,15 @@ use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; use futures::{Stream, StreamExt}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use crate::helpers::*; +use crate::s3::api_server::ResBody; +use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; @@ -22,9 +25,9 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc<Garage>, - req: Request<Body>, + req: Request<IncomingBody>, bucket_name: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let boundary = req .headers() .get(header::CONTENT_TYPE) @@ -41,7 +44,8 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let mut multipart = Multipart::with_constraints(body, boundary, constraints); + let stream = body_stream::<_, Error>(body); + let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { @@ -242,7 +246,7 @@ pub async fn handle_post_object( let etag = format!("\"{}\"", md5); - let resp = if let Some(mut target) = params + let mut resp = if let Some(mut target) = params .get("success_action_redirect") .and_then(|h| h.to_str().ok()) .and_then(|u| url::Url::parse(u).ok()) @@ -258,12 +262,11 @@ pub async fn handle_post_object( .status(StatusCode::SEE_OTHER) .header(header::LOCATION, target.clone()) .header(header::ETAG, etag) - .body(target.into())? + .body(string_body(target))? } else { let path = head .uri - .into_parts() - .path_and_query + .path_and_query() .map(|paq| paq.path().to_string()) .unwrap_or_else(|| "/".to_string()); let authority = head @@ -290,7 +293,7 @@ pub async fn handle_post_object( .header(header::LOCATION, location.clone()) .header(header::ETAG, etag.clone()); match action { - "200" => builder.status(StatusCode::OK).body(Body::empty())?, + "200" => builder.status(StatusCode::OK).body(empty_body())?, "201" => { let xml = s3_xml::PostObject { xmlns: (), @@ -302,12 +305,21 @@ pub async fn handle_post_object( let body = s3_xml::to_xml_with_header(&xml)?; builder .status(StatusCode::CREATED) - .body(Body::from(body.into_bytes()))? + .body(string_body(body))? } - _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?, + _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?, } }; + let matching_cors_rule = find_matching_cors_rule( + &bucket, + &Request::from_parts(head, empty_body::<Infallible>()), + )?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index d1c88a76..8902b14c 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,12 +4,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use hyper::body::{Body, Bytes}; -use hyper::header::{HeaderMap, HeaderValue}; -use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use hyper::body::Bytes; +use hyper::header::{HeaderMap, HeaderValue}; +use hyper::{Request, Response}; + use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, Context, @@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; pub async fn handle_put( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket: &Bucket, key: &String, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); @@ -48,13 +51,12 @@ pub async fn handle_put( None => None, }; - let (_head, body) = req.into_parts(); - let body = body.map_err(Error::from); + let stream = body_stream(req.into_body()); save_stream( garage, headers, - body, + stream, bucket, key, content_md5, @@ -434,11 +436,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { +pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap() } diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index 821b0e07..e7ac1d77 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -125,6 +125,12 @@ pub enum Endpoint { key: String, part_number: Option<u64>, version_id: Option<String>, + response_cache_control: Option<String>, + response_content_disposition: Option<String>, + response_content_encoding: Option<String>, + response_content_language: Option<String>, + response_content_type: Option<String>, + response_expires: Option<String>, }, GetObjectAcl { key: String, @@ -170,7 +176,7 @@ pub enum Endpoint { }, ListBuckets, ListMultipartUploads { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, key_marker: Option<String>, max_uploads: Option<usize>, @@ -178,7 +184,7 @@ pub enum Endpoint { upload_id_marker: Option<String>, }, ListObjects { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, marker: Option<String>, max_keys: Option<usize>, @@ -188,7 +194,7 @@ pub enum Endpoint { // This value should always be 2. It is not checked when constructing the struct list_type: String, continuation_token: Option<String>, - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, fetch_owner: Option<bool>, max_keys: Option<usize>, @@ -196,7 +202,7 @@ pub enum Endpoint { start_after: Option<String>, }, ListObjectVersions { - delimiter: Option<char>, + delimiter: Option<String>, encoding_type: Option<String>, key_marker: Option<String>, max_keys: Option<u64>, @@ -358,7 +364,14 @@ impl Endpoint { (query.keyword.take().unwrap_or_default(), key, query, None), key: [ EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker), - EMPTY => GetObject (query_opt::version_id, opt_parse::part_number), + EMPTY => GetObject (query_opt::version_id, + opt_parse::part_number, + query_opt::response_cache_control, + query_opt::response_content_disposition, + query_opt::response_content_encoding, + query_opt::response_content_language, + query_opt::response_content_type, + query_opt::response_expires), ACL => GetObjectAcl (query_opt::version_id), LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id), RETENTION => GetObjectRetention (query_opt::version_id), @@ -671,6 +684,12 @@ generateQueryParameters! { "partNumber" => part_number, "part-number-marker" => part_number_marker, "prefix" => prefix, + "response-cache-control" => response_cache_control, + "response-content-disposition" => response_content_disposition, + "response-content-encoding" => response_content_encoding, + "response-content-language" => response_content_language, + "response-content-type" => response_content_type, + "response-expires" => response_expires, "select-type" => select_type, "start-after" => start_after, "uploadId" => upload_id, diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 7f2ab925..1c1dbf20 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,9 +1,12 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -12,7 +15,7 @@ use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_website( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -54,16 +57,16 @@ pub async fn handle_delete_website( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_website( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -83,7 +86,7 @@ pub async fn handle_put_website( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs index f0d7c816..2d92a072 100644 --- a/src/api/signature/error.rs +++ b/src/api/signature/error.rs @@ -18,10 +18,6 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8Str(#[error(source)] std::str::Utf8Error), - - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), } impl<T> From<T> for Error diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index b50fb3bb..423aad93 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc}; use hmac::Mac; -use hyper::{Body, Method, Request}; +use hyper::{body::Incoming as IncomingBody, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; @@ -20,7 +20,7 @@ use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, service: &'static str, - request: &Request<Body>, + request: &Request<IncomingBody>, ) -> Result<(Option<Key>, Option<Hash>), Error> { let mut headers = HashMap::new(); for (key, val) in request.headers() { @@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; - Ok(DateTime::from_utc(date, Utc)) + Ok(Utc.from_utc_datetime(&date)) } pub async fn verify_v4( diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index c8358c4f..39147ca0 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,26 +1,30 @@ use std::pin::Pin; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; -use hyper::body::Bytes; -use hyper::{Body, Request}; +use http_body_util::StreamBody; +use hyper::body::{Bytes, Incoming as IncomingBody}; +use hyper::Request; use garage_util::data::Hash; use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; +use crate::helpers::*; use crate::signature::error::*; +pub type ReqBody = BoxBody<Error>; + pub fn parse_streaming_body( api_key: &Key, - req: Request<Body>, + req: Request<IncomingBody>, content_sha256: &mut Option<Hash>, region: &str, service: &str, -) -> Result<Request<Body>, Error> { +) -> Result<Request<ReqBody>, Error> { match req.headers().get("x-amz-content-sha256") { Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { let signature = content_sha256 @@ -40,26 +44,22 @@ pub fn parse_streaming_body( .to_str()?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) .ok_or_bad_request("Invalid date")?; - let date: DateTime<Utc> = DateTime::from_utc(date, Utc); + let date: DateTime<Utc> = Utc.from_utc_datetime(&date); let scope = compute_scope(&date, region, service); let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) + let stream = body_stream::<_, Error>(body); + let signed_payload_stream = + SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) + .map(|x| x.map(hyper::body::Frame::data)) + .map_err(Error::from); + ReqBody::new(StreamBody::new(signed_payload_stream)) })) } - _ => Ok(req), + _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), } } diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index e4265cbe..7cf82ce6 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -19,26 +19,26 @@ garage_rpc.workspace = true garage_util.workspace = true garage_table.workspace = true -opentelemetry = "0.17" - -arc-swap = "1.5" -async-trait = "0.1.7" -bytes = "1.0" -bytesize = "1.2" -hex = "0.4" -tracing = "0.1" -rand = "0.8" - -async-compression = { version = "0.4", features = ["tokio", "zstd"] } -zstd = { version = "0.12", default-features = false } - -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" - -futures = "0.3" -futures-util = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-util = { version = "0.7", features = ["io"] } +opentelemetry.workspace = true + +arc-swap.workspace = true +async-trait.workspace = true +bytes.workspace = true +bytesize.workspace = true +hex.workspace = true +tracing.workspace = true +rand.workspace = true + +async-compression.workspace = true +zstd.workspace = true + +serde.workspace = true +serde_bytes.workspace = true + +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true +tokio-util.workspace = true [features] system-libs = [ "zstd/pkg-config" ] diff --git a/src/block/manager.rs b/src/block/manager.rs index bfd390ee..aae1ce45 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); +pub type BlockStream = + Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>; + /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -326,10 +329,7 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option<OrderTag>, - ) -> Result< - Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>, - Error, - > { + ) -> Result<BlockStream, Error> { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(stream), diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 530f1966..fddc5cca 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -12,24 +12,19 @@ readme = "../../README.md" path = "lib.rs" [dependencies] -err-derive = "0.3" -hexdump = "0.1" -tracing = "0.1" +err-derive.workspace = true +hexdump.workspace = true +tracing.workspace = true -heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true } -rusqlite = { version = "0.29", optional = true } -sled = { version = "0.34", optional = true } - -# cli deps -clap = { version = "4.1", optional = true, features = ["derive", "env"] } -pretty_env_logger = { version = "0.5", optional = true } +heed = { workspace = true, optional = true } +rusqlite = { workspace = true, optional = true } +sled = { workspace = true, optional = true } [dev-dependencies] -mktemp = "0.5" +mktemp.workspace = true [features] default = [ "sled", "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] -cli = ["clap", "pretty_env_logger"] lmdb = [ "heed" ] sqlite = [ "rusqlite" ] diff --git a/src/db/lib.rs b/src/db/lib.rs index fe44b01e..eef3e177 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -171,6 +171,48 @@ impl Db { } } +/// List of supported database engine types +/// +/// The `enum` holds list of *all* database engines that are are be supported by crate, no matter +/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine +/// and valid engine, whose support is not enabled via feature flag. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Engine { + Lmdb, + Sqlite, + Sled, +} + +impl Engine { + /// Return variant name as static `&str` + pub fn as_str(&self) -> &'static str { + match self { + Self::Lmdb => "lmdb", + Self::Sqlite => "sqlite", + Self::Sled => "sled", + } + } +} + +impl std::fmt::Display for Engine { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(fmt) + } +} + +impl std::str::FromStr for Engine { + type Err = Error; + + fn from_str(text: &str) -> Result<Engine> { + match text { + "lmdb" | "heed" => Ok(Self::Lmdb), + "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), + "sled" => Ok(Self::Sled), + kind => Err(Error(format!("Invalid DB engine: {}", kind).into())), + } + } +} + #[allow(clippy::len_without_is_empty)] impl Tree { #[inline] diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index dce7ea73..02a72502 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -31,48 +31,51 @@ garage_table.workspace = true garage_util.workspace = true garage_web.workspace = true -backtrace = "0.3" -bytes = "1.0" -bytesize = "1.2" -timeago = { version = "0.4", default-features = false } -parse_duration = "2.1" -hex = "0.4" -tracing = { version = "0.1" } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -rand = "0.8" -async-trait = "0.1.7" -sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } -git-version = "0.3.4" - -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" -structopt = { version = "0.3", default-features = false } -toml = "0.6" - -futures = "0.3" -futures-util = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } - -netapp = "0.10" - -opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } -opentelemetry-prometheus = { version = "0.10", optional = true } -opentelemetry-otlp = { version = "0.10", optional = true } -prometheus = { version = "0.13", optional = true } +backtrace.workspace = true +bytes.workspace = true +bytesize.workspace = true +timeago.workspace = true +parse_duration.workspace = true +hex.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +rand.workspace = true +async-trait.workspace = true +sodiumoxide.workspace = true +structopt.workspace = true +git-version.workspace = true + +serde.workspace = true +serde_bytes.workspace = true +toml.workspace = true + +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true + +netapp.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +opentelemetry-otlp = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } [dev-dependencies] -aws-config = "0.55.2" -aws-sdk-s3 = "0.28" -chrono = "0.4" -http = "0.2" -hmac = "0.12" -hyper = { version = "0.14", features = ["client", "http1", "runtime"] } -sha2 = "0.10" - -static_init = "1.0" -assert-json-diff = "2.0" -serde_json = "1.0" -base64 = "0.21" +aws-config.workspace = true +aws-sdk-s3.workspace = true +chrono.workspace = true +http.workspace = true +hmac.workspace = true +http-body-util.workspace = true +hyper.workspace = true +hyper-util.workspace = true +mktemp.workspace = true +sha2.workspace = true + +static_init.workspace = true +assert-json-diff.workspace = true +serde_json.workspace = true +base64.workspace = true k2v-client.workspace = true diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index c4a45738..edeb88c0 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -25,8 +25,7 @@ impl AdminRpcHandler { } async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let hash = self.find_block_hash_by_prefix(hash)?; let refcount = self.garage.block_manager.get_block_rc(&hash)?; let block_refs = self .garage @@ -189,4 +188,48 @@ impl AdminRpcHandler { Ok(()) } + + // ---- helper function ---- + fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> { + if prefix.len() < 4 { + return Err(Error::BadRequest( + "Please specify at least 4 characters of the block hash".into(), + )); + } + + let prefix_bin = + hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = self + .garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::BadRequest(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::BadRequest("No matching block found".into())) + } } diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/convert_db.rs index 3c6ce69c..6b854ccb 100644 --- a/src/garage/cli/convert_db.rs +++ b/src/garage/cli/convert_db.rs @@ -14,44 +14,73 @@ pub struct ConvertDbOpt { /// Input database engine (sled, lmdb or sqlite; limited by db engines /// enabled in this build) #[structopt(short = "a")] - input_engine: String, + input_engine: Engine, /// Output database path #[structopt(short = "o")] output_path: PathBuf, /// Output database engine #[structopt(short = "b")] - output_engine: String, + output_engine: Engine, + + #[structopt(flatten)] + db_open: OpenDbOpt, +} + +/// Overrides for database open operation +#[derive(StructOpt, Debug, Default)] +pub struct OpenDbOpt { + #[cfg(feature = "lmdb")] + #[structopt(flatten)] + lmdb: OpenLmdbOpt, +} + +/// Overrides for LMDB database open operation +#[cfg(feature = "lmdb")] +#[derive(StructOpt, Debug, Default)] +pub struct OpenLmdbOpt { + /// LMDB map size override + /// (supported suffixes: B, KiB, MiB, GiB, TiB, PiB) + #[cfg(feature = "lmdb")] + #[structopt(long = "lmdb-map-size", name = "bytes", display_order = 1_000)] + map_size: Option<bytesize::ByteSize>, } pub(crate) fn do_conversion(args: ConvertDbOpt) -> Result<()> { - let input = open_db(args.input_path, args.input_engine)?; - let output = open_db(args.output_path, args.output_engine)?; + if args.input_engine == args.output_engine { + return Err(Error("input and output database engine must differ".into())); + } + + let input = open_db(args.input_path, args.input_engine, &args.db_open)?; + let output = open_db(args.output_path, args.output_engine, &args.db_open)?; output.import(&input)?; Ok(()) } -fn open_db(path: PathBuf, engine: String) -> Result<Db> { - match engine.as_str() { +fn open_db(path: PathBuf, engine: Engine, open: &OpenDbOpt) -> Result<Db> { + match engine { #[cfg(feature = "sled")] - "sled" => { + Engine::Sled => { let db = sled_adapter::sled::Config::default().path(&path).open()?; Ok(sled_adapter::SledDb::init(db)) } #[cfg(feature = "sqlite")] - "sqlite" | "sqlite3" | "rusqlite" => { + Engine::Sqlite => { let db = sqlite_adapter::rusqlite::Connection::open(&path)?; - db.pragma_update(None, "journal_mode", &"WAL")?; - db.pragma_update(None, "synchronous", &"NORMAL")?; + db.pragma_update(None, "journal_mode", "WAL")?; + db.pragma_update(None, "synchronous", "NORMAL")?; Ok(sqlite_adapter::SqliteDb::init(db)) } #[cfg(feature = "lmdb")] - "lmdb" | "heed" => { + Engine::Lmdb => { std::fs::create_dir_all(&path).map_err(|e| { Error(format!("Unable to create LMDB data directory: {}", e).into()) })?; - let map_size = lmdb_adapter::recommended_map_size(); + let map_size = match open.lmdb.map_size { + Some(c) => c.as_u64() as usize, + None => lmdb_adapter::recommended_map_size(), + }; let mut env_builder = lmdb_adapter::heed::EnvOpenOptions::new(); env_builder.max_dbs(100); @@ -62,8 +91,13 @@ fn open_db(path: PathBuf, engine: String) -> Result<Db> { let db = env_builder.open(&path)?; Ok(lmdb_adapter::LmdbDb::init(db)) } - e => Err(Error( - format!("Invalid or unsupported DB engine: {}", e).into(), + + // Pattern is unreachable when all supported DB engines are compiled into binary. The allow + // attribute is added so that we won't have to change this match in case stop building + // support for one or more engines by default. + #[allow(unreachable_patterns)] + engine => Err(Error( + format!("Engine support not available in this build: {}", engine).into(), )), } } diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs index 20813f1c..43ca5c09 100644 --- a/src/garage/cli/init.rs +++ b/src/garage/cli/init.rs @@ -43,7 +43,7 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { idstr ); eprintln!( - "where <remote_node> is their own node identifier in the format: <pubkey>@<ip>:<port>" + "where <remote_node> is their own node identifier in the format: <full-node-id>@<ip>:<port>" ); eprintln!(); eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:"); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 6bc3da22..40e47ee1 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -64,7 +64,8 @@ pub enum Command { #[derive(StructOpt, Debug)] pub enum NodeOperation { - /// Print identifier (public key) of this Garage node + /// Print the full node ID (public key) of this Garage node, and its publicly reachable IP + /// address and port if they are specified in config file under `rpc_public_addr` #[structopt(name = "id", version = garage_version())] NodeId(NodeIdOpt), @@ -82,8 +83,9 @@ pub struct NodeIdOpt { #[derive(StructOpt, Debug)] pub struct ConnectNodeOpt { - /// Node public key and address, in the format: - /// `<public key hexadecimal>@<ip or hostname>:<port>` + /// Full node ID (public key) and IP address and port, in the format: + /// `<full node ID>@<ip or hostname>:<port>`. + /// You can retrieve this information on the target node using `garage node id`. pub(crate) node: String, } diff --git a/src/garage/main.rs b/src/garage/main.rs index 66403d05..1a6a6e32 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -7,6 +7,7 @@ extern crate tracing; mod admin; mod cli; mod repair; +mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] mod tracing_setup; @@ -28,7 +29,6 @@ use structopt::StructOpt; use netapp::util::parse_and_resolve_peer_addr; use netapp::NetworkKey; -use garage_util::config::Config; use garage_util::error::*; use garage_rpc::system::*; @@ -38,6 +38,7 @@ use garage_model::helper::error::Error as HelperError; use admin::*; use cli::*; +use secrets::Secrets; #[derive(StructOpt, Debug)] #[structopt( @@ -45,8 +46,7 @@ use cli::*; about = "S3-compatible object store for self-hosted geo-distributed deployments" )] struct Opt { - /// Host to connect to for admin operations, in the format: - /// <public-key>@<ip>:<port> + /// Host to connect to for admin operations, in the format: <full-node-id>@<ip>:<port> #[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")] pub rpc_host: Option<String>, @@ -66,24 +66,6 @@ struct Opt { cmd: Command, } -#[derive(StructOpt, Debug)] -pub struct Secrets { - /// RPC secret network key, used to replace rpc_secret in config.toml when running the - /// daemon or doing admin operations - #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] - pub rpc_secret: Option<String>, - - /// Metrics API authentication token, replaces admin.metrics_token in config.toml when - /// running the Garage daemon - #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")] - pub admin_token: Option<String>, - - /// Metrics API authentication token, replaces admin.metrics_token in config.toml when - /// running the Garage daemon - #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")] - pub metrics_token: Option<String>, -} - #[tokio::main] async fn main() { // Initialize version and features info @@ -192,7 +174,9 @@ async fn main() { } async fn cli_command(opt: Opt) -> Result<(), Error> { - let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() { + let config = if (opt.secrets.rpc_secret.is_none() && opt.secrets.rpc_secret_file.is_none()) + || opt.rpc_host.is_none() + { Some(garage_util::config::read_config(opt.config_file.clone()) .err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?) } else { @@ -200,14 +184,19 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { }; // Find and parse network RPC secret - let net_key_hex_str = opt - .secrets - .rpc_secret - .as_ref() - .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref())) - .ok_or("No RPC secret provided")?; + let mut rpc_secret = config.as_ref().and_then(|c| c.rpc_secret.clone()); + secrets::fill_secret( + &mut rpc_secret, + &config.as_ref().and_then(|c| c.rpc_secret_file.clone()), + &opt.secrets.rpc_secret, + &opt.secrets.rpc_secret_file, + "rpc_secret", + true, + )?; + + let net_key_hex_str = rpc_secret.ok_or("No RPC secret provided")?; let network_key = NetworkKey::from_slice( - &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], + &hex::decode(&net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], ) .ok_or("Invalid RPC secret provided (wrong length)")?; @@ -218,7 +207,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Find and parse the address of the target host let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { - let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?; + let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <full node id>@<IP or hostname>:<port>.", h))?; (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) @@ -248,7 +237,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { addr ); } - Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; + Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct full-length node ID (public key).")?; } let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); @@ -261,16 +250,3 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { Ok(x) => Ok(x), } } - -fn fill_secrets(mut config: Config, secrets: Secrets) -> Config { - if secrets.rpc_secret.is_some() { - config.rpc_secret = secrets.rpc_secret; - } - if secrets.admin_token.is_some() { - config.admin.admin_token = secrets.admin_token; - } - if secrets.metrics_token.is_some() { - config.admin.metrics_token = secrets.metrics_token; - } - config -} diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index f4edcf03..45024e71 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -6,7 +6,7 @@ use garage_util::error::*; use garage_model::garage::Garage; use crate::cli::structs::*; -use crate::{fill_secrets, Secrets}; +use crate::secrets::{fill_secrets, Secrets}; pub async fn offline_repair( config_file: PathBuf, @@ -20,7 +20,7 @@ pub async fn offline_repair( } info!("Loading configuration..."); - let config = fill_secrets(read_config(config_file)?, secrets); + let config = fill_secrets(read_config(config_file)?, secrets)?; info!("Initializing Garage main data store..."); let garage = Garage::new(config)?; diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs new file mode 100644 index 00000000..c3d704aa --- /dev/null +++ b/src/garage/secrets.rs @@ -0,0 +1,320 @@ +use std::path::PathBuf; + +use structopt::StructOpt; + +use garage_util::config::Config; +use garage_util::error::Error; + +/// Structure for secret values or paths that are passed as CLI arguments or environment +/// variables, instead of in the config file. +#[derive(StructOpt, Debug, Default, Clone)] +pub struct Secrets { + /// Skip permission check on files containing secrets + #[cfg(unix)] + #[structopt( + long = "allow-world-readable-secrets", + env = "GARAGE_ALLOW_WORLD_READABLE_SECRETS" + )] + pub allow_world_readable_secrets: Option<bool>, + + /// RPC secret network key, used to replace rpc_secret in config.toml when running the + /// daemon or doing admin operations + #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] + pub rpc_secret: Option<String>, + + /// RPC secret network key, used to replace rpc_secret in config.toml and rpc-secret + /// when running the daemon or doing admin operations + #[structopt(long = "rpc-secret-file", env = "GARAGE_RPC_SECRET_FILE")] + pub rpc_secret_file: Option<PathBuf>, + + /// Admin API authentication token, replaces admin.admin_token in config.toml when + /// running the Garage daemon + #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")] + pub admin_token: Option<String>, + + /// Admin API authentication token file path, replaces admin.admin_token in config.toml + /// and admin-token when running the Garage daemon + #[structopt(long = "admin-token-file", env = "GARAGE_ADMIN_TOKEN_FILE")] + pub admin_token_file: Option<PathBuf>, + + /// Metrics API authentication token, replaces admin.metrics_token in config.toml when + /// running the Garage daemon + #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")] + pub metrics_token: Option<String>, + + /// Metrics API authentication token file path, replaces admin.metrics_token in config.toml + /// and metrics-token when running the Garage daemon + #[structopt(long = "metrics-token-file", env = "GARAGE_METRICS_TOKEN_FILE")] + pub metrics_token_file: Option<PathBuf>, +} + +/// Single function to fill all secrets in the Config struct from their correct source (value +/// from config or CLI param or env variable or read from a file specified in config or CLI +/// param or env variable) +pub fn fill_secrets(mut config: Config, secrets: Secrets) -> Result<Config, Error> { + let allow_world_readable = secrets + .allow_world_readable_secrets + .unwrap_or(config.allow_world_readable_secrets); + + fill_secret( + &mut config.rpc_secret, + &config.rpc_secret_file, + &secrets.rpc_secret, + &secrets.rpc_secret_file, + "rpc_secret", + allow_world_readable, + )?; + + fill_secret( + &mut config.admin.admin_token, + &config.admin.admin_token_file, + &secrets.admin_token, + &secrets.admin_token_file, + "admin.admin_token", + allow_world_readable, + )?; + fill_secret( + &mut config.admin.metrics_token, + &config.admin.metrics_token_file, + &secrets.metrics_token, + &secrets.metrics_token_file, + "admin.metrics_token", + allow_world_readable, + )?; + + Ok(config) +} + +pub(crate) fn fill_secret( + config_secret: &mut Option<String>, + config_secret_file: &Option<PathBuf>, + cli_secret: &Option<String>, + cli_secret_file: &Option<PathBuf>, + name: &'static str, + allow_world_readable: bool, +) -> Result<(), Error> { + let cli_value = match (&cli_secret, &cli_secret_file) { + (Some(_), Some(_)) => { + return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into()); + } + (Some(secret), None) => Some(secret.to_string()), + (None, Some(file)) => Some(read_secret_file(file, allow_world_readable)?), + (None, None) => None, + }; + + if let Some(val) = cli_value { + if config_secret.is_some() || config_secret_file.is_some() { + debug!("Overriding secret `{}` using value specified using CLI argument or environnement variable.", name); + } + + *config_secret = Some(val); + } else if let Some(file_path) = &config_secret_file { + if config_secret.is_some() { + return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into()); + } + + *config_secret = Some(read_secret_file(file_path, allow_world_readable)?); + } + + Ok(()) +} + +fn read_secret_file(file_path: &PathBuf, allow_world_readable: bool) -> Result<String, Error> { + if !allow_world_readable { + #[cfg(unix)] + { + use std::os::unix::fs::MetadataExt; + let metadata = std::fs::metadata(file_path)?; + if metadata.mode() & 0o077 != 0 { + return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path.display(), metadata.mode()).into()); + } + } + } + + let secret_buf = std::fs::read_to_string(file_path)?; + + // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`. + // also editors sometimes add a trailing newline + Ok(String::from(secret_buf.trim_end())) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::Write; + + use garage_util::config::read_config; + use garage_util::error::Error; + + use super::*; + + #[test] + fn test_rpc_secret_file_works() -> Result<(), Error> { + let path_secret = mktemp::Temp::new_file()?; + let mut file_secret = File::create(path_secret.as_path())?; + writeln!(file_secret, "foo")?; + drop(file_secret); + + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + let path_secret_path = path_secret.as_path(); + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret_file = "{}" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "#, + path_secret_path.display() + )?; + drop(file_config); + + // Second configuration file, same as previous one + // except it allows world-readable secrets. + let path_config_allow_world_readable = mktemp::Temp::new_file()?; + let mut file_config_allow_world_readable = + File::create(path_config_allow_world_readable.as_path())?; + writeln!( + file_config_allow_world_readable, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret_file = "{}" + allow_world_readable_secrets = true + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "#, + path_secret_path.display() + )?; + drop(file_config_allow_world_readable); + + let config = read_config(path_config.to_path_buf())?; + let config = fill_secrets(config, Secrets::default())?; + assert_eq!("foo", config.rpc_secret.unwrap()); + + // ---- Check non world-readable secrets config ---- + #[cfg(unix)] + { + let secrets_allow_world_readable = Secrets { + allow_world_readable_secrets: Some(true), + ..Default::default() + }; + let secrets_no_allow_world_readable = Secrets { + allow_world_readable_secrets: Some(false), + ..Default::default() + }; + + use std::os::unix::fs::PermissionsExt; + let metadata = std::fs::metadata(path_secret_path)?; + let mut perm = metadata.permissions(); + perm.set_mode(0o660); + std::fs::set_permissions(path_secret_path, perm)?; + + // Config file that just specifies the path + let config = read_config(path_config.to_path_buf())?; + assert!(fill_secrets(config, Secrets::default()).is_err()); + + let config = read_config(path_config.to_path_buf())?; + assert!(fill_secrets(config, secrets_allow_world_readable.clone()).is_ok()); + + let config = read_config(path_config.to_path_buf())?; + assert!(fill_secrets(config, secrets_no_allow_world_readable.clone()).is_err()); + + // Config file that also specifies to allow world_readable_secrets + let config = read_config(path_config_allow_world_readable.to_path_buf())?; + assert!(fill_secrets(config, Secrets::default()).is_ok()); + + let config = read_config(path_config_allow_world_readable.to_path_buf())?; + assert!(fill_secrets(config, secrets_allow_world_readable).is_ok()); + + let config = read_config(path_config_allow_world_readable.to_path_buf())?; + assert!(fill_secrets(config, secrets_no_allow_world_readable).is_err()); + } + + // ---- Check alternative secrets specified on CLI ---- + + let path_secret2 = mktemp::Temp::new_file()?; + let mut file_secret2 = File::create(path_secret2.as_path())?; + writeln!(file_secret2, "bar")?; + drop(file_secret2); + + let config = read_config(path_config.to_path_buf())?; + let config = fill_secrets( + config, + Secrets { + rpc_secret: Some("baz".into()), + ..Default::default() + }, + )?; + assert_eq!(config.rpc_secret.as_deref(), Some("baz")); + + let config = read_config(path_config.to_path_buf())?; + let config = fill_secrets( + config, + Secrets { + rpc_secret_file: Some(path_secret2.clone()), + ..Default::default() + }, + )?; + assert_eq!(config.rpc_secret.as_deref(), Some("bar")); + + let config = read_config(path_config.to_path_buf())?; + assert!(fill_secrets( + config, + Secrets { + rpc_secret: Some("baz".into()), + rpc_secret_file: Some(path_secret2.clone()), + ..Default::default() + } + ) + .is_err()); + + drop(path_secret); + drop(path_secret2); + drop(path_config); + drop(path_config_allow_world_readable); + + Ok(()) + } + + #[test] + fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> { + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret= "dummy" + rpc_secret_file = "dummy" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "# + )?; + let config = read_config(path_config.to_path_buf())?; + assert_eq!( + "only one of `rpc_secret` and `rpc_secret_file` can be set", + fill_secrets(config, Secrets::default()) + .unwrap_err() + .to_string() + ); + drop(path_config); + drop(file_config); + Ok(()) + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs index 3ad10b72..51b06b8e 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -15,9 +15,9 @@ use garage_web::WebServer; use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; +use crate::secrets::{fill_secrets, Secrets}; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; -use crate::{fill_secrets, Secrets}; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -29,12 +29,19 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> { info!("Loading configuration..."); - let config = fill_secrets(read_config(config_file)?, secrets); + let config = fill_secrets(read_config(config_file)?, secrets)?; // ---- Initialize Garage internals ---- #[cfg(feature = "metrics")] - let metrics_exporter = opentelemetry_prometheus::exporter().init(); + let metrics_exporter = opentelemetry_prometheus::exporter() + .with_default_summary_quantiles(vec![0.25, 0.5, 0.75, 0.9, 0.95, 0.99]) + .with_default_histogram_boundaries(vec![ + 0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07, 0.1, + 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30., 40., 50., 60., + 70., 100., + ]) + .init(); info!("Initializing Garage main data store..."); let garage = Garage::new(config.clone())?; @@ -81,7 +88,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er garage.clone(), s3_bind_addr.clone(), config.s3_api.s3_region.clone(), - wait_from(watch_cancel.clone()), + watch_cancel.clone(), )), )); } @@ -96,7 +103,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er garage.clone(), config.k2v_api.as_ref().unwrap().api_bind_addr.clone(), config.s3_api.s3_region.clone(), - wait_from(watch_cancel.clone()), + watch_cancel.clone(), )), )); } @@ -106,14 +113,10 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er if let Some(web_config) = &config.s3_web { info!("Initializing web server..."); + let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone()); servers.push(( "Web", - tokio::spawn(WebServer::run( - garage.clone(), - web_config.bind_addr.clone(), - web_config.root_domain.clone(), - wait_from(watch_cancel.clone()), - )), + tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())), )); } @@ -121,9 +124,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching Admin API server..."); servers.push(( "Admin", - tokio::spawn( - admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())), - ), + tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())), )); } diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs index ef4daa5d..ffa4cae8 100644 --- a/src/garage/tests/common/client.rs +++ b/src/garage/tests/common/client.rs @@ -1,3 +1,4 @@ +use aws_sdk_s3::config::BehaviorVersion; use aws_sdk_s3::config::Credentials; use aws_sdk_s3::{Client, Config}; @@ -11,6 +12,7 @@ pub fn build_client(key: &Key) -> Client { .endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT)) .region(super::REGION) .credentials_provider(credentials) + .behavior_version(BehaviorVersion::v2023_11_09()) .build(); Client::from_conf(config) diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 4133bb8b..e5f4cca1 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -5,12 +5,17 @@ use std::convert::TryFrom; use chrono::{offset::Utc, DateTime}; use hmac::{Hmac, Mac}; -use hyper::client::HttpConnector; -use hyper::{Body, Client, Method, Request, Response, Uri}; +use http_body_util::BodyExt; +use http_body_util::Full as FullBody; +use hyper::{Method, Request, Response, Uri}; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; +use hyper_util::rt::TokioExecutor; use super::garage::{Instance, Key}; use garage_api::signature; +pub type Body = FullBody<hyper::body::Bytes>; + /// You should ever only use this to send requests AWS sdk won't send, /// like to reproduce behavior of unusual implementations found to be /// problematic. @@ -19,7 +24,7 @@ pub struct CustomRequester { key: Key, uri: Uri, service: &'static str, - client: Client<HttpConnector>, + client: Client<HttpConnector, Body>, } impl CustomRequester { @@ -28,7 +33,7 @@ impl CustomRequester { key: key.clone(), uri: instance.s3_uri(), service: "s3", - client: Client::new(), + client: Client::builder(TokioExecutor::new()).build_http(), } } @@ -37,7 +42,7 @@ impl CustomRequester { key: key.clone(), uri: instance.k2v_uri(), service: "k2v", - client: Client::new(), + client: Client::builder(TokioExecutor::new()).build_http(), } } @@ -139,7 +144,7 @@ impl<'a> RequestBuilder<'a> { self } - pub async fn send(&mut self) -> hyper::Result<Response<Body>> { + pub async fn send(&mut self) -> Result<Response<Body>, String> { // TODO this is a bit incorrect in that path and query params should be url-encoded and // aren't, but this is good enought for now. @@ -200,8 +205,8 @@ impl<'a> RequestBuilder<'a> { all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone()); let mut signed_headers = all_headers - .iter() - .map(|(k, _)| k.as_ref()) + .keys() + .map(|k| k.as_ref()) .collect::<Vec<&str>>(); signed_headers.sort(); let signed_headers = signed_headers.join(";"); @@ -242,7 +247,22 @@ impl<'a> RequestBuilder<'a> { .method(self.method.clone()) .body(Body::from(body)) .unwrap(); - self.requester.client.request(request).await + + let result = self + .requester + .client + .request(request) + .await + .map_err(|err| format!("hyper client error: {}", err))?; + + let (head, body) = result.into_parts(); + let body = Body::new( + body.collect() + .await + .map_err(|err| format!("hyper client error in body.collect: {}", err))? + .to_bytes(), + ); + Ok(Response::from_parts(head, body)) } } diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 71de91bf..39554d4d 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -7,6 +7,7 @@ use base64::prelude::*; use serde_json::json; use crate::json_body; +use http_body_util::BodyExt; use hyper::{Method, StatusCode}; #[tokio::test] @@ -77,10 +78,7 @@ async fn test_batch() { .unwrap() .to_string(), ); - let res_body = hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec(); + let res_body = res.into_body().collect().await.unwrap().to_bytes(); assert_eq!(res_body, values.get(sk).unwrap().as_bytes()); } diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 20add889..5a347bd9 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -7,6 +7,7 @@ use base64::prelude::*; use serde_json::json; use crate::json_body; +use http_body_util::BodyExt; use hyper::{Method, StatusCode}; #[tokio::test] @@ -83,10 +84,7 @@ async fn test_items_and_indices() { .to_str() .unwrap() .to_string(); - let res_body = hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec(); + let res_body = res.into_body().collect().await.unwrap().to_bytes(); assert_eq!(res_body, content); // ReadIndex -- now there should be some stuff @@ -152,10 +150,7 @@ async fn test_items_and_indices() { res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); - let res_body = hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec(); + let res_body = res.into_body().collect().await.unwrap().to_bytes(); assert_eq!(res_body, content2); // ReadIndex -- now there should be some stuff @@ -394,10 +389,7 @@ async fn test_item_return_format() { .to_str() .unwrap() .to_string(); - let res_body = hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec(); + let res_body = res.into_body().collect().await.unwrap().to_bytes(); assert_eq!(res_body, single_value); // f1: not specified @@ -434,10 +426,7 @@ async fn test_item_return_format() { res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); - let res_body = hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec(); + let res_body = res.into_body().collect().await.unwrap().to_bytes(); assert_eq!(res_body, single_value); // f3: json diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index 452317c2..277f8bc8 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,4 +1,5 @@ use base64::prelude::*; +use http_body_util::BodyExt; use hyper::{Method, StatusCode}; use std::time::Duration; @@ -47,11 +48,8 @@ async fn test_poll_item() { .unwrap() .to_string(); - let res2_body = hyper::body::to_bytes(res2.into_body()) - .await - .unwrap() - .to_vec(); - assert_eq!(res2_body, b"Initial value"); + let res2_body = res2.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(res2_body, b"Initial value"[..]); // Start poll operation let poll = { @@ -95,11 +93,8 @@ async fn test_poll_item() { assert_eq!(poll_res.status(), StatusCode::OK); - let poll_res_body = hyper::body::to_bytes(poll_res.into_body()) - .await - .unwrap() - .to_vec(); - assert_eq!(poll_res_body, b"New value"); + let poll_res_body = poll_res.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(poll_res_body, b"New value"[..]); } #[tokio::test] diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs index 465fc24d..1017330d 100644 --- a/src/garage/tests/k2v/simple.rs +++ b/src/garage/tests/k2v/simple.rs @@ -1,5 +1,6 @@ use crate::common; +use http_body_util::BodyExt; use hyper::{Method, StatusCode}; #[tokio::test] @@ -32,9 +33,6 @@ async fn test_simple() { .unwrap(); assert_eq!(res2.status(), StatusCode::OK); - let res2_body = hyper::body::to_bytes(res2.into_body()) - .await - .unwrap() - .to_vec(); - assert_eq!(res2_body, b"Hello, world!"); + let res2_body = res2.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(res2_body, b"Hello, world!"[..]); } diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index ab92bc0a..ef370db3 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -11,15 +11,15 @@ mod k2v; #[cfg(feature = "k2v")] mod k2v_client; -use hyper::{Body, Response}; +use http_body_util::BodyExt; +use hyper::{body::Body, Response}; -pub async fn json_body(res: Response<Body>) -> serde_json::Value { - let res_body: serde_json::Value = serde_json::from_slice( - &hyper::body::to_bytes(res.into_body()) - .await - .unwrap() - .to_vec()[..], - ) - .unwrap(); +pub async fn json_body<B>(res: Response<B>) -> serde_json::Value +where + B: Body, + <B as Body>::Error: std::fmt::Debug, +{ + let body = res.into_body().collect().await.unwrap().to_bytes(); + let res_body: serde_json::Value = serde_json::from_slice(&body).unwrap(); res_body } diff --git a/src/garage/tests/s3/list.rs b/src/garage/tests/s3/list.rs index bb03f250..1b0c006d 100644 --- a/src/garage/tests/s3/list.rs +++ b/src/garage/tests/s3/list.rs @@ -613,3 +613,63 @@ async fn test_listmultipart() { assert!(r.common_prefixes.is_none()); } } + +#[tokio::test] +async fn test_multichar_delimiter() { + // Test case from dpape from issue #692 with reference results from Amazon + + let ctx = common::context(); + let bucket = ctx.create_bucket("multichardelim"); + + for k in [ + "a/", "a/b/", "a/b/c/", "a/b/c/d", "a/c/", "a/c/b/", "a/c/b/e", + ] { + ctx.client + .put_object() + .bucket(&bucket) + .key(k) + .send() + .await + .unwrap(); + } + + // With delimiter / + { + let r = ctx + .client + .list_objects_v2() + .bucket(&bucket) + .delimiter("/") + .send() + .await + .unwrap(); + + assert!(r.contents.is_none()); + + let common_prefixes = r.common_prefixes.unwrap(); + assert_eq!(common_prefixes.len(), 1); + assert_eq!(common_prefixes[0].prefix.as_deref().unwrap(), "a/"); + } + + // With delimiter b/ + { + let r = ctx + .client + .list_objects_v2() + .bucket(&bucket) + .delimiter("b/") + .send() + .await + .unwrap(); + + let contents = r.contents.unwrap(); + assert_eq!(contents.len(), 2); + assert_eq!(contents[0].key.as_deref().unwrap(), "a/"); + assert_eq!(contents[1].key.as_deref().unwrap(), "a/c/"); + + let common_prefixes = r.common_prefixes.unwrap(); + assert_eq!(common_prefixes.len(), 2); + assert_eq!(common_prefixes[0].prefix.as_deref().unwrap(), "a/b/"); + assert_eq!(common_prefixes[1].prefix.as_deref().unwrap(), "a/c/b/"); + } +} diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 09ae5e5b..51c9df74 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -154,7 +154,7 @@ async fn test_multipart_upload() { .await .unwrap(); - assert_eq!(r.content_length, (SZ_5MB * 3) as i64); + assert_eq!(r.content_length.unwrap(), (SZ_5MB * 3) as i64); } { @@ -183,7 +183,7 @@ async fn test_multipart_upload() { .unwrap(); eprintln!("get_object with part_number = {}", part_number); - assert_eq!(o.content_length, SZ_5MB as i64); + assert_eq!(o.content_length.unwrap(), SZ_5MB as i64); assert_bytes_eq!(o.body, data); } } @@ -249,14 +249,14 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 1); - assert_eq!(ps[0].part_number, 2); + assert_eq!(ps[0].part_number.unwrap(), 2); let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3366bb9dcf710d6801b5926467d02e19\"" ); - assert_eq!(fp.size, SZ_5MB as i64); + assert_eq!(fp.size.unwrap(), SZ_5MB as i64); } let p2 = ctx @@ -286,23 +286,23 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 2); - assert_eq!(ps[0].part_number, 1); + assert_eq!(ps[0].part_number.unwrap(), 1); let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" ); - assert_eq!(fp.size, SZ_5MB as i64); + assert_eq!(fp.size.unwrap(), SZ_5MB as i64); - assert_eq!(ps[1].part_number, 2); + assert_eq!(ps[1].part_number.unwrap(), 2); let sp = &ps[1]; assert!(sp.last_modified.is_some()); assert_eq!( sp.e_tag.as_ref().unwrap(), "\"3366bb9dcf710d6801b5926467d02e19\"" ); - assert_eq!(sp.size, SZ_5MB as i64); + assert_eq!(sp.size.unwrap(), SZ_5MB as i64); } { @@ -320,14 +320,14 @@ async fn test_uploadlistpart() { assert!(r.part_number_marker.is_none()); assert_eq!(r.next_part_number_marker.as_deref(), Some("1")); - assert_eq!(r.max_parts, 1_i32); - assert!(r.is_truncated); + assert_eq!(r.max_parts.unwrap(), 1_i32); + assert!(r.is_truncated.unwrap()); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); let parts = r.parts.unwrap(); assert_eq!(parts.len(), 1); let fp = &parts[0]; - assert_eq!(fp.part_number, 1); + assert_eq!(fp.part_number.unwrap(), 1); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" @@ -349,19 +349,19 @@ async fn test_uploadlistpart() { r2.part_number_marker.as_ref().unwrap(), r.next_part_number_marker.as_ref().unwrap() ); - assert_eq!(r2.max_parts, 1_i32); + assert_eq!(r2.max_parts.unwrap(), 1_i32); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); let parts = r2.parts.unwrap(); assert_eq!(parts.len(), 1); let fp = &parts[0]; - assert_eq!(fp.part_number, 2); + assert_eq!(fp.part_number.unwrap(), 2); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3366bb9dcf710d6801b5926467d02e19\"" ); //assert!(r2.is_truncated); // WHY? (this was the test before) - assert!(!r2.is_truncated); + assert!(!r2.is_truncated.unwrap()); } let cmp = CompletedMultipartUpload::builder() @@ -411,7 +411,7 @@ async fn test_uploadlistpart() { .await .unwrap(); - assert_eq!(r.content_length, (SZ_5MB * 2) as i64); + assert_eq!(r.content_length.unwrap(), (SZ_5MB * 2) as i64); } } diff --git a/src/garage/tests/s3/objects.rs b/src/garage/tests/s3/objects.rs index 27697d45..ad5f63f1 100644 --- a/src/garage/tests/s3/objects.rs +++ b/src/garage/tests/s3/objects.rs @@ -50,9 +50,9 @@ async fn test_putobject() { // assert_eq!(o.version_id.unwrap(), _version); assert_eq!(o.content_type.unwrap(), content_type); assert!(o.last_modified.is_some()); - assert_eq!(o.content_length, 0); - assert_eq!(o.parts_count, 0); - assert_eq!(o.tag_count, 0); + assert_eq!(o.content_length.unwrap(), 0); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); } { @@ -86,9 +86,9 @@ async fn test_putobject() { assert_bytes_eq!(o.body, b"hi"); assert_eq!(o.e_tag.unwrap(), etag); assert!(o.last_modified.is_some()); - assert_eq!(o.content_length, 2); - assert_eq!(o.parts_count, 0); - assert_eq!(o.tag_count, 0); + assert_eq!(o.content_length.unwrap(), 2); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); } { @@ -119,9 +119,9 @@ async fn test_putobject() { assert_bytes_eq!(o.body, b""); assert_eq!(o.e_tag.unwrap(), etag); assert!(o.last_modified.is_some()); - assert_eq!(o.content_length, 0); - assert_eq!(o.parts_count, 0); - assert_eq!(o.tag_count, 0); + assert_eq!(o.content_length.unwrap(), 0); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); } } @@ -185,6 +185,30 @@ async fn test_getobject() { assert_eq!(o.content_range.unwrap().as_str(), "bytes 57-61/62"); assert_bytes_eq!(o.body, &BODY[57..]); } + { + let exp = aws_sdk_s3::primitives::DateTime::from_secs(10000000000); + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key(STD_KEY) + .response_content_type("application/x-dummy-test") + .response_cache_control("ccdummy") + .response_content_disposition("cddummy") + .response_content_encoding("cedummy") + .response_content_language("cldummy") + .response_expires(exp) + .send() + .await + .unwrap(); + assert_eq!(o.content_type.unwrap().as_str(), "application/x-dummy-test"); + assert_eq!(o.cache_control.unwrap().as_str(), "ccdummy"); + assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy"); + assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy"); + assert_eq!(o.content_language.unwrap().as_str(), "cldummy"); + assert_eq!(o.expires.unwrap(), exp); + assert_bytes_eq!(o.body, &BODY[..]); + } } #[tokio::test] @@ -205,7 +229,7 @@ async fn test_deleteobject() { .await .unwrap(); if i > 0 { - to_del = to_del.objects(ObjectIdentifier::builder().key(k).build()); + to_del = to_del.objects(ObjectIdentifier::builder().key(k).build().unwrap()); } } @@ -223,7 +247,7 @@ async fn test_deleteobject() { .unwrap(); if i > 0 { - to_del = to_del.objects(ObjectIdentifier::builder().key(k).build()); + to_del = to_del.objects(ObjectIdentifier::builder().key(k).build().unwrap()); } } @@ -247,7 +271,7 @@ async fn test_deleteobject() { .client .delete_objects() .bucket(&bucket) - .delete(to_del.build()) + .delete(to_del.build().unwrap()) .send() .await .unwrap(); diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index b7a1acae..224b9ed5 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -57,9 +57,9 @@ async fn test_putobject_streaming() { // assert_eq!(o.version_id.unwrap(), _version); assert_eq!(o.content_type.unwrap(), content_type); assert!(o.last_modified.is_some()); - assert_eq!(o.content_length, 0); - assert_eq!(o.parts_count, 0); - assert_eq!(o.tag_count, 0); + assert_eq!(o.content_length.unwrap(), 0); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); } { @@ -95,9 +95,9 @@ async fn test_putobject_streaming() { assert_bytes_eq!(o.body, BODY); assert_eq!(o.e_tag.unwrap(), etag); assert!(o.last_modified.is_some()); - assert_eq!(o.content_length, 62); - assert_eq!(o.parts_count, 0); - assert_eq!(o.tag_count, 0); + assert_eq!(o.content_length.unwrap(), 62); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); } } @@ -187,7 +187,7 @@ async fn test_put_website_streaming() { .await .unwrap(); - assert_eq!(o.index_document.unwrap().suffix.unwrap(), "home.html"); - assert_eq!(o.error_document.unwrap().key.unwrap(), "err/error.html"); + assert_eq!(o.index_document.unwrap().suffix, "home.html"); + assert_eq!(o.error_document.unwrap().key, "err/error.html"); } } diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs index eeafb5fa..0cadc388 100644 --- a/src/garage/tests/s3/website.rs +++ b/src/garage/tests/s3/website.rs @@ -8,15 +8,18 @@ use aws_sdk_s3::{ types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration}, }; use http::{Request, StatusCode}; -use hyper::{ - body::{to_bytes, Body}, - Client, -}; +use http_body_util::BodyExt; +use http_body_util::Full as FullBody; +use hyper::body::Bytes; +use hyper_util::client::legacy::Client; +use hyper_util::rt::TokioExecutor; use serde_json::json; const BODY: &[u8; 16] = b"<h1>bonjour</h1>"; const BODY_ERR: &[u8; 6] = b"erreur"; +pub type Body = FullBody<Bytes>; + #[tokio::test] async fn test_website() { const BCKT_NAME: &str = "my-website"; @@ -34,14 +37,14 @@ async fn test_website() { .await .unwrap(); - let client = Client::new(); + let client = Client::builder(TokioExecutor::new()).build_http(); let req = || { Request::builder() .method("GET") .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -49,7 +52,7 @@ async fn test_website() { assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + BodyExt::collect(resp.into_body()).await.unwrap().to_bytes(), BODY.as_ref() ); /* check that we do not leak body */ @@ -58,10 +61,9 @@ async fn test_website() { .method("GET") .uri(format!( "http://127.0.0.1:{0}/check?domain={1}", - ctx.garage.admin_port, - BCKT_NAME.to_string() + ctx.garage.admin_port, BCKT_NAME )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -87,7 +89,7 @@ async fn test_website() { resp = client.request(req()).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); assert_eq!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); @@ -103,14 +105,14 @@ async fn test_website() { "http://127.0.0.1:{0}/check?domain={1}", ctx.garage.admin_port, bname )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; - let mut admin_resp = client.request(admin_req()).await.unwrap(); + let admin_resp = client.request(admin_req()).await.unwrap(); assert_eq!(admin_resp.status(), StatusCode::OK); assert_eq!( - to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(), + admin_resp.into_body().collect().await.unwrap().to_bytes(), format!("Domain '{bname}' is managed by Garage").as_bytes() ); } @@ -124,7 +126,7 @@ async fn test_website() { resp = client.request(req()).await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); /* check that we do not leak body */ @@ -133,10 +135,9 @@ async fn test_website() { .method("GET") .uri(format!( "http://127.0.0.1:{0}/check?domain={1}", - ctx.garage.admin_port, - BCKT_NAME.to_string() + ctx.garage.admin_port, BCKT_NAME )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -181,8 +182,18 @@ async fn test_website_s3_api() { .unwrap(); let conf = WebsiteConfiguration::builder() - .index_document(IndexDocument::builder().suffix("home.html").build()) - .error_document(ErrorDocument::builder().key("err/error.html").build()) + .index_document( + IndexDocument::builder() + .suffix("home.html") + .build() + .unwrap(), + ) + .error_document( + ErrorDocument::builder() + .key("err/error.html") + .build() + .unwrap(), + ) .build(); ctx.client @@ -201,9 +212,11 @@ async fn test_website_s3_api() { .allowed_methods("GET") .allowed_methods("PUT") .allowed_origins("*") - .build(), + .build() + .unwrap(), ) - .build(); + .build() + .unwrap(); ctx.client .put_bucket_cors() @@ -222,24 +235,21 @@ async fn test_website_s3_api() { .await .unwrap(); - let main_rule = cors_res.cors_rules().unwrap().iter().next().unwrap(); + let main_rule = cors_res.cors_rules().iter().next().unwrap(); assert_eq!(main_rule.id.as_ref().unwrap(), "main-rule"); assert_eq!( main_rule.allowed_headers.as_ref().unwrap(), &vec!["*".to_string()] ); + assert_eq!(&main_rule.allowed_origins, &vec!["*".to_string()]); assert_eq!( - main_rule.allowed_origins.as_ref().unwrap(), - &vec!["*".to_string()] - ); - assert_eq!( - main_rule.allowed_methods.as_ref().unwrap(), + &main_rule.allowed_methods, &vec!["GET".to_string(), "PUT".to_string()] ); } - let client = Client::new(); + let client = Client::builder(TokioExecutor::new()).build_http(); // Test direct requests with CORS { @@ -248,10 +258,10 @@ async fn test_website_s3_api() { .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); assert_eq!( @@ -259,7 +269,7 @@ async fn test_website_s3_api() { "*" ); assert_eq!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); } @@ -273,14 +283,14 @@ async fn test_website_s3_api() { ctx.garage.web_port )) .header("Host", format!("{}.web.garage", BCKT_NAME)) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_eq!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY_ERR.as_ref() ); } @@ -293,10 +303,10 @@ async fn test_website_s3_api() { .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); assert_eq!( @@ -304,7 +314,7 @@ async fn test_website_s3_api() { "*" ); assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); } @@ -317,14 +327,14 @@ async fn test_website_s3_api() { .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "DELETE") - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::FORBIDDEN); assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); } @@ -358,14 +368,14 @@ async fn test_website_s3_api() { .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::FORBIDDEN); assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), + resp.into_body().collect().await.unwrap().to_bytes(), BODY.as_ref() ); } @@ -384,20 +394,15 @@ async fn test_website_s3_api() { .method("GET") .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap(); - let mut resp = client.request(req).await.unwrap(); + let resp = client.request(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); - assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), - BODY_ERR.as_ref() - ); - assert_ne!( - to_bytes(resp.body_mut()).await.unwrap().as_ref(), - BODY.as_ref() - ); + let resp_bytes = resp.into_body().collect().await.unwrap().to_bytes(); + assert_ne!(resp_bytes, BODY_ERR.as_ref()); + assert_ne!(resp_bytes, BODY.as_ref()); } } @@ -405,13 +410,13 @@ async fn test_website_s3_api() { async fn test_website_check_domain() { let ctx = common::context(); - let client = Client::new(); + let client = Client::builder(TokioExecutor::new()).build_http(); let admin_req = || { Request::builder() .method("GET") .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port)) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -435,7 +440,7 @@ async fn test_website_check_domain() { "http://127.0.0.1:{}/check?domain=", ctx.garage.admin_port )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -459,7 +464,7 @@ async fn test_website_check_domain() { "http://127.0.0.1:{}/check?domain=foobar", ctx.garage.admin_port )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; @@ -483,7 +488,7 @@ async fn test_website_check_domain() { "http://127.0.0.1:{}/check?domain=%E2%98%B9", ctx.garage.admin_port )) - .body(Body::empty()) + .body(Body::new(Bytes::new())) .unwrap() }; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 2ccb9fe5..694be1f8 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -9,25 +9,28 @@ repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" readme = "../../README.md" [dependencies] -base64 = "0.21" -sha2 = "0.10" -hex = "0.4" -http = "0.2" -log = "0.4" -aws-sigv4 = "0.55" -percent-encoding = "2.2" -hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] } -hyper-rustls = { version = "0.24", features = ["http2"] } -serde = { version = "1.0", features = [ "derive" ] } -serde_json = "1.0" -thiserror = "1.0" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +base64.workspace = true +sha2.workspace = true +hex.workspace = true +http.workspace = true +http-body-util.workspace = true +log.workspace = true +aws-sigv4.workspace = true +aws-sdk-config.workspace = true +percent-encoding.workspace = true +hyper = { workspace = true, default-features = false, features = ["http1", "http2"] } +hyper-util.workspace = true +hyper-rustls.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true # cli deps -clap = { version = "4.1", optional = true, features = ["derive", "env"] } +clap = { workspace = true, optional = true } format_table = { workspace = true, optional = true } -tracing = { version = "0.1", optional = true } -tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] } +tracing = { workspace = true, optional = true } +tracing-subscriber = { workspace = true, optional = true } [features] diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs index 564ce497..96f5674a 100644 --- a/src/k2v-client/error.rs +++ b/src/k2v-client/error.rs @@ -22,12 +22,14 @@ pub enum Error { Http(#[from] http::Error), #[error("hyper error: {0}")] Hyper(#[from] hyper::Error), + #[error("hyper client error: {0}")] + HyperClient(#[from] hyper_util::client::legacy::Error), #[error("invalid header: {0}")] Header(#[from] hyper::header::ToStrError), #[error("deserialization error: {0}")] Deserialization(#[from] serde_json::Error), #[error("invalid signature parameters: {0}")] - SignParameters(#[from] aws_sigv4::signing_params::BuildError), + SignParameters(#[from] aws_sigv4::sign::v4::signing_params::BuildError), #[error("could not sign request: {0}")] SignRequest(#[from] aws_sigv4::http_request::SigningError), #[error("request timed out")] diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index 4aa7a20a..852274a7 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -9,11 +9,15 @@ use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC}; use http::header::{ACCEPT, CONTENT_TYPE}; use http::status::StatusCode; use http::{HeaderName, HeaderValue, Request}; -use hyper::{body::Bytes, Body}; -use hyper::{client::connect::HttpConnector, Client as HttpClient}; +use http_body_util::{BodyExt, Full as FullBody}; +use hyper::body::Bytes; use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; +use hyper_util::rt::TokioExecutor; -use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings}; +use aws_sdk_config::config::Credentials; +use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings}; +use aws_sigv4::sign::v4::SigningParams; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -22,6 +26,8 @@ mod error; pub use error::Error; +pub type Body = FullBody<Bytes>; + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const SERVICE: &str = "k2v"; @@ -53,19 +59,19 @@ pub struct K2vClientConfig { pub struct K2vClient { config: K2vClientConfig, user_agent: HeaderValue, - client: HttpClient<HttpsConnector<HttpConnector>>, + client: HttpClient<HttpsConnector<HttpConnector>, Body>, } impl K2vClient { /// Create a new K2V client. pub fn new(config: K2vClientConfig) -> Result<Self, Error> { let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + .with_native_roots()? .https_or_http() .enable_http1() .enable_http2() .build(); - let client = HttpClient::builder().build(connector); + let client = HttpClient::builder(TokioExecutor::new()).build(connector); let user_agent: std::borrow::Cow<str> = match &config.user_agent { Some(ua) => ua.into(), None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), @@ -363,21 +369,37 @@ impl K2vClient { // Sign request let signing_settings = SigningSettings::default(); + let identity = Credentials::new( + &self.config.aws_access_key_id, + &self.config.aws_secret_access_key, + None, + None, + "k2v-client", + ) + .into(); let signing_params = SigningParams::builder() - .access_key(&self.config.aws_access_key_id) - .secret_key(&self.config.aws_secret_access_key) + .identity(&identity) .region(&self.config.region) - .service_name(SERVICE) + .name(SERVICE) .time(SystemTime::now()) .settings(signing_settings) - .build()?; + .build()? + .into(); // Convert the HTTP request into a signable request - let signable_request = SignableRequest::from(&req); + let signable_request = SignableRequest::new( + req.method().as_str(), + req.uri().to_string(), + // TODO: get rid of Unwrap + req.headers() + .iter() + .map(|(x, y)| (x.as_str(), y.to_str().unwrap())), + SignableBody::Bytes(req.body().as_ref()), + )?; // Sign and then apply the signature to the request let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts(); - signing_instructions.apply_to_request(&mut req); + signing_instructions.apply_to_request_http1x(&mut req); // Send and wait for timeout let res = tokio::select! { @@ -398,12 +420,16 @@ impl K2vClient { }; let body = match res.status { - StatusCode::OK => hyper::body::to_bytes(body).await?, + StatusCode::OK => BodyExt::collect(body).await?.to_bytes(), StatusCode::NO_CONTENT => Bytes::new(), StatusCode::NOT_FOUND => return Err(Error::NotFound), StatusCode::NOT_MODIFIED => Bytes::new(), s => { - let err_body = hyper::body::to_bytes(body).await.unwrap_or_default(); + let err_body = body + .collect() + .await + .map(|x| x.to_bytes()) + .unwrap_or_default(); let err_body_str = std::str::from_utf8(&err_body) .map(String::from) .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body)); @@ -451,7 +477,11 @@ impl K2vClient { } fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String { - let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket); + let mut url = format!( + "{}/{}", + self.config.endpoint.trim_end_matches('/'), + self.config.bucket + ); if let Some(pk) = partition_key { url.push('/'); url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET)); diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 124b84b0..fd0abc3a 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -20,26 +20,26 @@ garage_table.workspace = true garage_block.workspace = true garage_util.workspace = true -async-trait = "0.1.7" -arc-swap = "1.0" -blake2 = "0.10" -chrono = "0.4" -err-derive = "0.3" -hex = "0.4" -base64 = "0.21" -tracing = "0.1" -rand = "0.8" -zstd = { version = "0.12", default-features = false } - -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" - -futures = "0.3" -futures-util = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -opentelemetry = "0.17" - -netapp = "0.10" +async-trait.workspace = true +arc-swap.workspace = true +blake2.workspace = true +chrono.workspace = true +err-derive.workspace = true +hex.workspace = true +base64.workspace = true +tracing.workspace = true +rand.workspace = true +zstd.workspace = true + +serde.workspace = true +serde_bytes.workspace = true + +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true +opentelemetry.workspace = true + +netapp.workspace = true [features] default = [ "sled", "lmdb", "sqlite" ] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index efa3e27b..222cfc83 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -126,7 +126,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); let alias_ts = increment_logical_clock_2( bucket_p.aliases.get_timestamp(alias_name), @@ -163,7 +163,7 @@ impl<'a> BucketHelper<'a> { alias_name: &String, ) -> Result<(), Error> { let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut bucket_state = bucket.state.as_option_mut().unwrap(); + let bucket_state = bucket.state.as_option_mut().unwrap(); let mut alias = self .0 @@ -245,7 +245,7 @@ impl<'a> BucketHelper<'a> { self.0.bucket_alias_table.insert(&alias).await?; } - if let Some(mut bucket_state) = bucket.state.as_option_mut() { + if let Some(bucket_state) = bucket.state.as_option_mut() { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); self.0.bucket_table.insert(&bucket).await?; } @@ -274,7 +274,7 @@ impl<'a> BucketHelper<'a> { let mut bucket = self.get_existing_bucket(bucket_id).await?; let mut key = key_helper.get_existing_key(key_id).await?; - let mut key_param = key.state.as_option_mut().unwrap(); + let key_param = key.state.as_option_mut().unwrap(); if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) { if *existing_alias != bucket_id { @@ -283,7 +283,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); // Calculate the timestamp to assign to this aliasing in the two local_aliases maps @@ -326,7 +326,7 @@ impl<'a> BucketHelper<'a> { let mut bucket = self.get_existing_bucket(bucket_id).await?; let mut key = key_helper.get_existing_key(key_id).await?; - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); if key .state @@ -359,7 +359,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, remove alias - let mut key_param = key.state.as_option_mut().unwrap(); + let key_param = key.state.as_option_mut().unwrap(); let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); let alias_ts = increment_logical_clock_2( diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index e8702bf1..aa13ee7b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -232,7 +232,7 @@ impl<T: CountedItem> IndexCounter<T> { let now = now_msec(); for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + let ent = entry.values.entry(s.to_string()).or_insert((0, 0)); ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -348,7 +348,7 @@ impl<T: CountedItem> IndexCounter<T> { }, }; for (s, v) in counts.iter() { - let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + let tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 += v; } diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e19f80a8..1b2867a5 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -18,38 +18,38 @@ format_table.workspace = true garage_db.workspace = true garage_util.workspace = true -arc-swap = "1.0" -bytes = "1.0" -bytesize = "1.1" -gethostname = "0.4" -hex = "0.4" -tracing = "0.1" -rand = "0.8" -itertools="0.10" -sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } -nix = { version = "0.27", default-features = false, features = ["fs"] } - -async-trait = "0.1.7" -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" -serde_json = "1.0" -err-derive = { version = "0.3", optional = true } +arc-swap.workspace = true +bytes.workspace = true +bytesize.workspace = true +gethostname.workspace = true +hex.workspace = true +tracing.workspace = true +rand.workspace = true +itertools.workspace = true +sodiumoxide.workspace = true +nix.workspace = true + +async-trait.workspace = true +serde.workspace = true +serde_bytes.workspace = true +serde_json.workspace = true +err-derive = { workspace = true, optional = true } # newer version requires rust edition 2021 -kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true } -k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true } -schemars = { version = "0.8", optional = true } -reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] } +kube = { workspace = true, optional = true } +k8s-openapi = { workspace = true, optional = true } +schemars = { workspace = true, optional = true } +reqwest = { workspace = true, optional = true } -pnet_datalink = "0.33" +pnet_datalink.workspace = true -futures = "0.3" -futures-util = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-stream = { version = "0.1", features = ["net"] } -opentelemetry = "0.17" +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +opentelemetry.workspace = true -netapp = { version = "0.10", features = ["telemetry"] } +netapp.workspace = true [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index ab8d1112..71fd71b0 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -148,7 +148,7 @@ impl ConsulDiscovery { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { warn!( - "Could not process node spec from Consul: {:?} (invalid IP or public key)", + "Could not process node spec from Consul: {:?} (invalid IP address or node ID/pubkey)", ent ); } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 83cc6816..f22247c3 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -57,7 +57,7 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc"; pub enum SystemRpc { /// Response to successfull advertisements Ok, - /// Request to connect to a specific node (in <pubkey>@<host>:<port> format) + /// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID) Connect(String), /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 62cffac7..cac17da6 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -18,19 +18,19 @@ garage_db.workspace = true garage_rpc.workspace = true garage_util.workspace = true -opentelemetry = "0.17" +opentelemetry.workspace = true -async-trait = "0.1.7" -arc-swap = "1.0" -bytes = "1.0" -hex = "0.4" -hexdump = "0.1" -tracing = "0.1" -rand = "0.8" +async-trait.workspace = true +arc-swap.workspace = true +bytes.workspace = true +hex.workspace = true +hexdump.workspace = true +tracing.workspace = true +rand.workspace = true -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" +serde.workspace = true +serde_bytes.workspace = true -futures = "0.3" -futures-util = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index afc4d3c3..d72245dd 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -16,42 +16,42 @@ path = "lib.rs" [dependencies] garage_db.workspace = true -arc-swap = "1.0" -async-trait = "0.1" -blake2 = "0.10" -bytes = "1.0" -bytesize = "1.2" -digest = "0.10" -err-derive = "0.3" -hexdump = "0.1" -xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } -hex = "0.4" -lazy_static = "1.4" -tracing = "0.1" -rand = "0.8" -sha2 = "0.10" - -chrono = "0.4" -rmp-serde = "1.1.2" -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_json = "1.0" -toml = "0.6" - -futures = "0.3" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } - -netapp = "0.10" - -http = "0.2" -hyper = "0.14" - -opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } +arc-swap.workspace = true +async-trait.workspace = true +blake2.workspace = true +bytes.workspace = true +bytesize.workspace = true +digest.workspace = true +err-derive.workspace = true +hexdump.workspace = true +xxhash-rust.workspace = true +hex.workspace = true +lazy_static.workspace = true +tracing.workspace = true +rand.workspace = true +sha2.workspace = true + +chrono.workspace = true +rmp-serde.workspace = true +serde.workspace = true +serde_json.workspace = true +toml.workspace = true + +futures.workspace = true +tokio.workspace = true + +netapp.workspace = true + +http.workspace = true +hyper.workspace = true + +opentelemetry.workspace = true [build-dependencies] -rustc_version = "0.4.0" +rustc_version.workspace = true [dev-dependencies] -mktemp = "0.5" +mktemp.workspace = true [features] k2v = [] diff --git a/src/util/config.rs b/src/util/config.rs index ad5c8e1f..a9a72110 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -1,6 +1,5 @@ //! Contains type and functions related to Garage configuration file use std::convert::TryFrom; -use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; @@ -45,11 +44,15 @@ pub struct Config { )] pub compression_level: Option<i32>, + /// Skip the permission check of secret files. Useful when + /// POSIX ACLs (or more complex chmods) are used. + #[serde(default)] + pub allow_world_readable_secrets: bool, + /// RPC secret key: 32 bytes hex encoded pub rpc_secret: Option<String>, /// Optional file where RPC secret key is read from - pub rpc_secret_file: Option<String>, - + pub rpc_secret_file: Option<PathBuf>, /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, /// Public IP address of this node @@ -163,12 +166,12 @@ pub struct AdminConfig { /// Bearer token to use to scrape metrics pub metrics_token: Option<String>, /// File to read metrics token from - pub metrics_token_file: Option<String>, + pub metrics_token_file: Option<PathBuf>, /// Bearer token to use to access Admin API endpoints pub admin_token: Option<String>, /// File to read admin token from - pub admin_token_file: Option<String>, + pub admin_token_file: Option<PathBuf>, /// OTLP server to where to export traces pub trace_sink: Option<String>, @@ -221,6 +224,13 @@ pub struct KubernetesDiscoveryConfig { pub skip_crd: bool, } +/// Read and parse configuration +pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { + let config = std::fs::read_to_string(config_file)?; + + Ok(toml::from_str(&config)?) +} + fn default_db_engine() -> String { "lmdb".into() } @@ -235,68 +245,6 @@ fn default_block_size() -> usize { 1048576 } -/// Read and parse configuration -pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - let mut parsed_config: Config = toml::from_str(&config)?; - - secret_from_file( - &mut parsed_config.rpc_secret, - &parsed_config.rpc_secret_file, - "rpc_secret", - )?; - secret_from_file( - &mut parsed_config.admin.metrics_token, - &parsed_config.admin.metrics_token_file, - "admin.metrics_token", - )?; - secret_from_file( - &mut parsed_config.admin.admin_token, - &parsed_config.admin.admin_token_file, - "admin.admin_token", - )?; - - Ok(parsed_config) -} - -fn secret_from_file( - secret: &mut Option<String>, - secret_file: &Option<String>, - name: &'static str, -) -> Result<(), Error> { - match (&secret, &secret_file) { - (_, None) => { - // no-op - } - (Some(_), Some(_)) => { - return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into()); - } - (None, Some(file_path)) => { - #[cfg(unix)] - if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") { - use std::os::unix::fs::MetadataExt; - let metadata = std::fs::metadata(file_path)?; - if metadata.mode() & 0o077 != 0 { - return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into()); - } - } - let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?; - let mut secret_buf = String::new(); - file.read_to_string(&mut secret_buf)?; - // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`. - // also editors sometimes add a trailing newline - *secret = Some(String::from(secret_buf.trim_end())); - } - } - Ok(()) -} - fn default_compression() -> Option<i32> { Some(1) } @@ -425,83 +373,4 @@ mod tests { Ok(()) } - - #[test] - fn test_rpc_secret_file_works() -> Result<(), Error> { - let path_secret = mktemp::Temp::new_file()?; - let mut file_secret = File::create(path_secret.as_path())?; - writeln!(file_secret, "foo")?; - drop(file_secret); - - let path_config = mktemp::Temp::new_file()?; - let mut file_config = File::create(path_config.as_path())?; - let path_secret_path = path_secret.as_path(); - writeln!( - file_config, - r#" - metadata_dir = "/tmp/garage/meta" - data_dir = "/tmp/garage/data" - replication_mode = "3" - rpc_bind_addr = "[::]:3901" - rpc_secret_file = "{}" - - [s3_api] - s3_region = "garage" - api_bind_addr = "[::]:3900" - "#, - path_secret_path.display() - )?; - let config = super::read_config(path_config.to_path_buf())?; - assert_eq!("foo", config.rpc_secret.unwrap()); - - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let metadata = std::fs::metadata(&path_secret_path)?; - let mut perm = metadata.permissions(); - perm.set_mode(0o660); - std::fs::set_permissions(&path_secret_path, perm)?; - - std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false"); - assert!(super::read_config(path_config.to_path_buf()).is_err()); - - std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true"); - assert!(super::read_config(path_config.to_path_buf()).is_ok()); - } - - drop(path_config); - drop(path_secret); - drop(file_config); - Ok(()) - } - - #[test] - fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> { - let path_config = mktemp::Temp::new_file()?; - let mut file_config = File::create(path_config.as_path())?; - writeln!( - file_config, - r#" - metadata_dir = "/tmp/garage/meta" - data_dir = "/tmp/garage/data" - replication_mode = "3" - rpc_bind_addr = "[::]:3901" - rpc_secret= "dummy" - rpc_secret_file = "dummy" - - [s3_api] - s3_region = "garage" - api_bind_addr = "[::]:3900" - "# - )?; - assert_eq!( - "only one of `rpc_secret` and `rpc_secret_file` can be set", - super::read_config(path_config.to_path_buf()) - .unwrap_err() - .to_string() - ); - drop(path_config); - drop(file_config); - Ok(()) - } } diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 9f7720da..49549c9b 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -19,16 +19,17 @@ garage_model.workspace = true garage_util.workspace = true garage_table.workspace = true -err-derive = "0.3" -tracing = "0.1" -percent-encoding = "2.1.0" +err-derive.workspace = true +tracing.workspace = true +percent-encoding.workspace = true -futures = "0.3" +futures.workspace = true -http = "0.2" -hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } -hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } +http.workspace = true +http-body-util.workspace = true +hyper.workspace = true +hyper-util.workspace = true -tokio = { version = "1.0", default-features = false, features = ["net"] } +tokio.workspace = true -opentelemetry = "0.17" +opentelemetry.workspace = true diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 73780efb..0f9b5dc8 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -2,19 +2,15 @@ use std::fs::{self, Permissions}; use std::os::unix::prelude::PermissionsExt; use std::{convert::Infallible, sync::Arc}; -use futures::future::Future; +use tokio::net::{TcpListener, UnixListener}; +use tokio::sync::watch; use hyper::{ + body::Incoming as IncomingBody, header::{HeaderValue, HOST}, - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, StatusCode, + Method, Request, Response, StatusCode, }; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; - use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -24,7 +20,8 @@ use opentelemetry::{ use crate::error::*; -use garage_api::helpers::{authority_to_host, host_to_bucket}; +use garage_api::generic_server::{server_loop, UnixListenerOn}; +use garage_api::helpers::*; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, @@ -74,78 +71,53 @@ pub struct WebServer { impl WebServer { /// Run a web server - pub async fn run( - garage: Arc<Garage>, - addr: UnixOrTCPSocketAddress, - root_domain: String, - shutdown_signal: impl Future<Output = ()>, - ) -> Result<(), GarageError> { + pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> { let metrics = Arc::new(WebMetrics::new()); - let web_server = Arc::new(WebServer { + Arc::new(WebServer { garage, metrics, root_domain, - }); - - let tcp_service = make_service_fn(|conn: &AddrStream| { - let web_server = web_server.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let web_server = web_server.clone(); - - web_server.handle_request(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let web_server = web_server.clone(); - - let path = addr.to_string(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let web_server = web_server.clone(); - - web_server.handle_request(req, path.clone()) - })) - } - }); + }) + } - info!("Web server listening on {}", addr); + pub async fn run( + self: Arc<Self>, + bind_addr: UnixOrTCPSocketAddress, + must_exit: watch::Receiver<bool>, + ) -> Result<(), GarageError> { + let server_name = "Web".into(); + info!("Web server listening on {}", bind_addr); - match addr { + match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + let listener = TcpListener::bind(addr).await?; + + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions(path, Permissions::from_mode(0o222))?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(server_name, listener, handler, must_exit).await } - }; - - Ok(()) + } } async fn handle_request( self: Arc<Self>, - req: Request<Body>, + req: Request<IncomingBody>, addr: String, - ) -> Result<Response<Body>, Infallible> { + ) -> Result<Response<BoxBody<Error>>, http::Error> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { @@ -187,7 +159,8 @@ impl WebServer { match res { Ok(res) => { debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) + Ok(res + .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from)))) } Err(error) => { info!( @@ -220,7 +193,10 @@ impl WebServer { Ok(exists) } - async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> { + async fn serve_file( + self: &Arc<Self>, + req: &Request<IncomingBody>, + ) -> Result<Response<BoxBody<ApiError>>, Error> { // Get http authority string (eg. [::1]:3902 or garage.tld:80) let authority = req .headers() @@ -267,9 +243,21 @@ impl WebServer { ); let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + Method::OPTIONS => handle_options_for_bucket(req, &bucket) + .map_err(ApiError::from) + .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), + Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, + Method::GET => { + handle_get( + self.garage.clone(), + &req, + bucket_id, + &key, + None, + Default::default(), + ) + .await + } _ => Err(ApiError::bad_request("HTTP method not supported")), }; @@ -281,7 +269,7 @@ impl WebServer { Ok(Response::builder() .status(StatusCode::FOUND) .header("Location", url) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } _ => ret_doc, @@ -310,10 +298,18 @@ impl WebServer { // Create a fake HTTP request with path = the error document let req2 = Request::builder() .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) + .body(empty_body::<Infallible>()) .unwrap(); - match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await + match handle_get( + self.garage.clone(), + &req2, + bucket_id, + &error_document, + None, + Default::default(), + ) + .await { Ok(mut error_doc) => { // The error won't be logged back in handle_request, @@ -358,7 +354,7 @@ impl WebServer { } } -fn error_to_res(e: Error) -> Response<Body> { +fn error_to_res(e: Error) -> Response<BoxBody<Error>> { // If we are here, it is either that: // - there was an error before trying to get the requested URL // from the bucket (e.g. bucket not found) @@ -366,7 +362,7 @@ fn error_to_res(e: Error) -> Response<Body> { // was a HEAD request or we couldn't get the error document) // We do NOT enter this code path when returning the bucket's // error document (this is handled in serve_file) - let body = Body::from(format!("{}\n", e)); + let body = string_body(format!("{}\n", e)); let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); e.add_headers(http_error.headers_mut()); |