From ac04934daefe48ac4f41d22b9129d1fe2ce44833 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Oct 2023 10:29:03 +0200 Subject: s3 api: add missing CORS headers to PostObject responses (fix #609) --- src/api/s3/post_object.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 542b7a81..f9eccb7f 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -15,6 +15,7 @@ use serde::Deserialize; use garage_model::garage::Garage; +use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; @@ -242,7 +243,7 @@ pub async fn handle_post_object( let etag = format!("\"{}\"", md5); - let resp = if let Some(mut target) = params + let mut resp = if let Some(mut target) = params .get("success_action_redirect") .and_then(|h| h.to_str().ok()) .and_then(|u| url::Url::parse(u).ok()) @@ -262,8 +263,7 @@ pub async fn handle_post_object( } else { let path = head .uri - .into_parts() - .path_and_query + .path_and_query() .map(|paq| paq.path().to_string()) .unwrap_or_else(|| "/".to_string()); let authority = head @@ -308,6 +308,13 @@ pub async fn handle_post_object( } }; + let matching_cors_rule = + find_matching_cors_rule(&bucket, &Request::from_parts(head, Body::empty()))?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) } -- cgit v1.2.3 From 50643e61bfb4ef0782e4364ed368bdfa62be7e5e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Jan 2024 10:47:33 +0100 Subject: Bump version to 0.8.5 --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index cb9e2e55..43167fdb 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.8.4" +version = "0.8.5" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" -- cgit v1.2.3 From ee57dd922b9c396298473b41e4046c8d00ee77d5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Jan 2024 16:28:17 +0100 Subject: Bump version to 0.9.1 --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index e8cbc1c8..5a667359 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.9.0" +version = "0.9.1" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" -- cgit v1.2.3 From fe1af5d98b8e6c87f39f1d94586e61bff100e7d2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 13:02:02 +0100 Subject: [dep-upgrade-202402] refactor dependencies: move all as workspace deps --- src/api/Cargo.toml | 71 +++++++++++++++++++++++++++--------------------------- 1 file changed, 36 insertions(+), 35 deletions(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5a667359..7d6ad4af 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -20,44 +20,45 @@ garage_block.workspace = true garage_util.workspace = true garage_rpc.workspace = true -async-trait = "0.1.7" -base64 = "0.21" -bytes = "1.0" -chrono = "0.4" -crypto-common = "0.1" -err-derive = "0.3" -hex = "0.4" -hmac = "0.12" -idna = "0.4" -tracing = "0.1" -md-5 = "0.10" -nom = "7.1" -sha2 = "0.10" +async-trait.workspace = true +base64.workspace = true +bytes.workspace = true +chrono.workspace = true +crypto-common.workspace = true +err-derive.workspace = true +hex.workspace = true +hmac.workspace = true +idna.workspace = true +tracing.workspace = true +md-5.workspace = true +nom.workspace = true +pin-project.workspace = true +sha2.workspace = true -futures = "0.3" -futures-util = "0.3" -pin-project = "1.0.12" -tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -tokio-stream = "0.1" +futures.workspace = true +futures-util.workspace = true +tokio.workspace = true +tokio-stream.workspace = true -form_urlencoded = "1.0.0" -http = "0.2" -httpdate = "1.0" -http-range = "0.1" -hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } -hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } -multer = "2.0" -percent-encoding = "2.1.0" -roxmltree = "0.18" -serde = { version = "1.0", features = ["derive"] } -serde_bytes = "0.11" -serde_json = "1.0" -quick-xml = { version = "0.26", features = [ "serialize" ] } -url = "2.3" +form_urlencoded.workspace = true +http.workspace = true +httpdate.workspace = true +http-range.workspace = true +hyper.workspace = true +hyperlocal.workspace = true +multer.workspace = true +percent-encoding.workspace = true +roxmltree.workspace = true +url.workspace = true -opentelemetry = "0.17" -opentelemetry-prometheus = { version = "0.10", optional = true } -prometheus = { version = "0.13", optional = true } +serde.workspace = true +serde_bytes.workspace = true +serde_json.workspace = true +quick-xml.workspace = true + +opentelemetry.workspace = true +opentelemetry-prometheus = { workspace = true, optional = true } +prometheus = { workspace = true, optional = true } [features] k2v = [ "garage_util/k2v", "garage_model/k2v" ] -- cgit v1.2.3 From 6e69a1fffc715c752a399750c1e26aa46683dbb2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 14:44:12 +0100 Subject: [dep-upgrade-202402] prepare migration to http/hyper 1.0 --- src/api/helpers.rs | 4 ++-- src/api/k2v/item.rs | 5 +++-- src/api/s3/bucket.rs | 4 ++-- src/api/s3/cors.rs | 4 ++-- src/api/s3/delete.rs | 4 ++-- src/api/s3/lifecycle.rs | 4 ++-- src/api/s3/multipart.rs | 8 ++++---- src/api/s3/website.rs | 4 ++-- 8 files changed, 19 insertions(+), 18 deletions(-) (limited to 'src/api') diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 1d55ebd5..8efaa231 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,4 @@ -use hyper::{Body, Request, Response}; +use hyper::{body::HttpBody, Body, Request, Response}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; @@ -139,7 +139,7 @@ pub fn key_after_prefix(pfx: &str) -> Option { } pub async fn parse_json_body Deserialize<'de>>(req: Request) -> Result { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index e13a0f30..33f4da53 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use http::header; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use garage_util::data::*; @@ -137,7 +137,8 @@ pub async fn handle_insert_item( .map(CausalContext::parse_helper) .transpose()?; - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); + let value = DvvsValue::Value(body.to_vec()); garage diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 733981e1..a2437524 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; @@ -119,7 +119,7 @@ pub async fn handle_create_bucket( api_key: Key, bucket_name: String, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 49097ad1..d2bcf125 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -5,7 +5,7 @@ use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, }; -use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; +use hyper::{body::HttpBody, header::HeaderName, Body, Method, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; @@ -64,7 +64,7 @@ pub async fn handle_put_cors( req: Request, content_sha256: Option, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 1c491eac..685ce004 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use garage_util::data::*; @@ -75,7 +75,7 @@ pub async fn handle_delete_objects( req: Request, content_sha256: Option, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 1e7d6755..ae8fbc37 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,7 +1,7 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; @@ -57,7 +57,7 @@ pub async fn handle_put_lifecycle( req: Request, content_sha256: Option, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 6b786318..96c4d044 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; use std::sync::Arc; -use futures::prelude::*; +use futures::{prelude::*, TryStreamExt}; use hyper::body::Body; -use hyper::{Request, Response}; +use hyper::{body::HttpBody, Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; @@ -87,7 +87,7 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = req.into_body().map_err(Error::from); + let body = TryStreamExt::map_err(req.into_body(), Error::from); let mut chunker = StreamChunker::new(body, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( @@ -217,7 +217,7 @@ pub async fn handle_complete_multipart_upload( upload_id: &str, content_sha256: Option, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = HttpBody::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 7f2ab925..f754ff1b 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,7 +1,7 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use crate::s3::error::*; @@ -63,7 +63,7 @@ pub async fn handle_put_website( req: Request, content_sha256: Option, ) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = req.into_body().collect().await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; -- cgit v1.2.3 From 0bb5b77530ad432e4c77f13b395fe74613812337 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 18:49:54 +0100 Subject: [dep-upgrade-202402] wip: port to http/hyper crates v1 --- src/api/Cargo.toml | 3 +- src/api/admin/api_server.rs | 31 ++++---- src/api/admin/bucket.rs | 35 ++++----- src/api/admin/cluster.rs | 25 ++++--- src/api/admin/error.rs | 11 +-- src/api/admin/key.rs | 30 ++++---- src/api/generic_server.rs | 113 +++++++++++++++------------- src/api/helpers.rs | 27 ++++++- src/api/s3/api_server.rs | 18 +++-- src/api/s3/bucket.rs | 32 ++++---- src/api/s3/copy.rs | 21 +++--- src/api/s3/cors.rs | 45 ++++++----- src/api/s3/delete.rs | 17 +++-- src/api/s3/error.rs | 16 ++-- src/api/s3/get.rs | 166 +++++++++++++++++++++++------------------ src/api/s3/lifecycle.rs | 23 +++--- src/api/s3/list.rs | 17 +++-- src/api/s3/multipart.rs | 38 ++++++---- src/api/s3/post_object.rs | 32 +++++--- src/api/s3/put.rs | 20 +++-- src/api/s3/website.rs | 23 +++--- src/api/signature/payload.rs | 4 +- src/api/signature/streaming.rs | 32 ++++---- 23 files changed, 446 insertions(+), 333 deletions(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7d6ad4af..011a64d8 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -44,8 +44,9 @@ form_urlencoded.workspace = true http.workspace = true httpdate.workspace = true http-range.workspace = true +http-body-util.workspace = true hyper.workspace = true -hyperlocal.workspace = true +hyper-util.workspace = true multer.workspace = true percent-encoding.workspace = true roxmltree.workspace = true diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 0ce3ca0d..d5e1c777 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use opentelemetry::trace::SpanRef; @@ -27,7 +27,9 @@ use crate::admin::error::*; use crate::admin::key::*; use crate::admin::router_v0; use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::host_to_bucket; +use crate::helpers::*; + +pub type ResBody = BoxBody; pub struct AdminApiServer { garage: Arc, @@ -71,16 +73,19 @@ impl AdminApiServer { .await } - fn handle_options(&self, _req: &Request) -> Result, Error> { + fn handle_options(&self, _req: &Request) -> Result, Error> { Ok(Response::builder() .status(StatusCode::NO_CONTENT) .header(ALLOW, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .body(Body::empty())?) + .body(empty_body())?) } - async fn handle_check_domain(&self, req: Request) -> Result, Error> { + async fn handle_check_domain( + &self, + req: Request, + ) -> Result, Error> { let query_params: HashMap = req .uri() .query() @@ -104,7 +109,7 @@ impl AdminApiServer { if self.check_domain(domain).await? { Ok(Response::builder() .status(StatusCode::OK) - .body(Body::from(format!( + .body(string_body(format!( "Domain '{domain}' is managed by Garage" )))?) } else { @@ -167,7 +172,7 @@ impl AdminApiServer { } } - fn handle_health(&self) -> Result, Error> { + fn handle_health(&self) -> Result, Error> { let health = self.garage.system.health(); let (status, status_str) = match health.status { @@ -189,10 +194,10 @@ impl AdminApiServer { Ok(Response::builder() .status(status) .header(http::header::CONTENT_TYPE, "text/plain") - .body(Body::from(status_str))?) + .body(string_body(status_str))?) } - fn handle_metrics(&self) -> Result, Error> { + fn handle_metrics(&self) -> Result, Error> { #[cfg(feature = "metrics")] { use opentelemetry::trace::Tracer; @@ -212,7 +217,7 @@ impl AdminApiServer { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer))?) + .body(bytes_body(buffer.into()))?) } #[cfg(not(feature = "metrics"))] Err(Error::bad_request( @@ -229,7 +234,7 @@ impl ApiHandler for AdminApiServer { type Endpoint = Endpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request) -> Result { + fn parse_endpoint(&self, req: &Request) -> Result { if req.uri().path().starts_with("/v0/") { let endpoint_v0 = router_v0::Endpoint::from_request(req)?; Endpoint::from_v0(endpoint_v0) @@ -240,9 +245,9 @@ impl ApiHandler for AdminApiServer { async fn handle( &self, - req: Request, + req: Request, endpoint: Endpoint, - ) -> Result, Error> { + ) -> Result, Error> { let expected_auth_header = match endpoint.authorization_type() { Authorization::None => None, diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 17f46c30..0bfb87c5 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -17,12 +17,13 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::admin::key::ApiBucketKeyPerm; use crate::common_error::CommonError; -use crate::helpers::{json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_buckets(garage: &Arc) -> Result, Error> { +pub async fn handle_list_buckets(garage: &Arc) -> Result, Error> { let buckets = garage .bucket_table .get_range( @@ -90,7 +91,7 @@ pub async fn handle_get_bucket_info( garage: &Arc, id: Option, global_alias: Option, -) -> Result, Error> { +) -> Result, Error> { let bucket_id = match (id, global_alias) { (Some(id), None) => parse_bucket_id(&id)?, (None, Some(ga)) => garage @@ -111,7 +112,7 @@ pub async fn handle_get_bucket_info( async fn bucket_info_results( garage: &Arc, bucket_id: Uuid, -) -> Result, Error> { +) -> Result, Error> { let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) @@ -268,8 +269,8 @@ struct GetBucketInfoKey { pub async fn handle_create_bucket( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::(req).await?; if let Some(ga) = &req.global_alias { @@ -360,7 +361,7 @@ struct CreateBucketLocalAlias { pub async fn handle_delete_bucket( garage: &Arc, id: String, -) -> Result, Error> { +) -> Result, Error> { let helper = garage.bucket_helper(); let bucket_id = parse_bucket_id(&id)?; @@ -403,14 +404,14 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_update_bucket( garage: &Arc, id: String, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::(req).await?; let bucket_id = parse_bucket_id(&id)?; @@ -470,9 +471,9 @@ struct UpdateBucketWebsiteAccess { pub async fn handle_bucket_change_key_perm( garage: &Arc, - req: Request, + req: Request, new_perm_flag: bool, -) -> Result, Error> { +) -> Result, Error> { let req = parse_json_body::(req).await?; let bucket_id = parse_bucket_id(&req.bucket_id)?; @@ -526,7 +527,7 @@ pub async fn handle_global_alias_bucket( garage: &Arc, bucket_id: String, alias: String, -) -> Result, Error> { +) -> Result, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -541,7 +542,7 @@ pub async fn handle_global_unalias_bucket( garage: &Arc, bucket_id: String, alias: String, -) -> Result, Error> { +) -> Result, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -557,7 +558,7 @@ pub async fn handle_local_alias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result, Error> { +) -> Result, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -573,7 +574,7 @@ pub async fn handle_local_unalias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result, Error> { +) -> Result, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index c8107b82..1ec8d6de 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -11,10 +11,11 @@ use garage_rpc::layout; use garage_model::garage::Garage; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; -pub async fn handle_get_cluster_status(garage: &Arc) -> Result, Error> { +pub async fn handle_get_cluster_status(garage: &Arc) -> Result, Error> { let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), @@ -39,7 +40,7 @@ pub async fn handle_get_cluster_status(garage: &Arc) -> Result) -> Result, Error> { +pub async fn handle_get_cluster_health(garage: &Arc) -> Result, Error> { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = ClusterHealth { @@ -61,8 +62,8 @@ pub async fn handle_get_cluster_health(garage: &Arc) -> Result, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) @@ -83,7 +84,7 @@ pub async fn handle_connect_cluster_nodes( Ok(json_ok_response(&res)?) } -pub async fn handle_get_cluster_layout(garage: &Arc) -> Result, Error> { +pub async fn handle_get_cluster_layout(garage: &Arc) -> Result, Error> { let res = format_cluster_layout(&garage.system.get_cluster_layout()); Ok(json_ok_response(&res)?) @@ -203,8 +204,8 @@ struct KnownNodeResp { pub async fn handle_update_cluster_layout( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let updates = parse_json_body::(req).await?; let mut layout = garage.system.get_cluster_layout(); @@ -243,8 +244,8 @@ pub async fn handle_update_cluster_layout( pub async fn handle_apply_cluster_layout( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); @@ -261,8 +262,8 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index ed1a07bd..98cc7a9e 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -1,13 +1,13 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; +use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::{BytesBody, CustomApiErrorBody}; /// Errors of this crate #[derive(Debug, Error)] @@ -77,14 +77,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -92,6 +92,7 @@ impl ApiError for Error { } "# .into() - })) + }); + BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 8d1c6890..3e5d2cab 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_table::*; @@ -9,10 +9,11 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; -use crate::helpers::{is_default, json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_keys(garage: &Arc) -> Result, Error> { +pub async fn handle_list_keys(garage: &Arc) -> Result, Error> { let res = garage .key_table .get_range( @@ -45,7 +46,7 @@ pub async fn handle_get_key_info( id: Option, search: Option, show_secret_key: bool, -) -> Result, Error> { +) -> Result, Error> { let key = if let Some(id) = id { garage.key_helper().get_existing_key(&id).await? } else if let Some(search) = search { @@ -62,8 +63,8 @@ pub async fn handle_get_key_info( pub async fn handle_create_key( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); @@ -80,8 +81,8 @@ struct CreateKeyRequest { pub async fn handle_import_key( garage: &Arc, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::(req).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; @@ -111,8 +112,8 @@ struct ImportKeyRequest { pub async fn handle_update_key( garage: &Arc, id: String, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { let req = parse_json_body::(req).await?; let mut key = garage.key_helper().get_existing_key(&id).await?; @@ -146,7 +147,10 @@ struct UpdateKeyRequest { deny: Option, } -pub async fn handle_delete_key(garage: &Arc, id: String) -> Result, Error> { +pub async fn handle_delete_key( + garage: &Arc, + id: String, +) -> Result, Error> { let mut key = garage.key_helper().get_existing_key(&id).await?; key.state.as_option().unwrap(); @@ -155,14 +159,14 @@ pub async fn handle_delete_key(garage: &Arc, id: String) -> Result, key: Key, show_secret: bool, -) -> Result, Error> { +) -> Result, Error> { let mut relevant_buckets = HashMap::new(); let key_state = key.state.as_option().unwrap(); diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index fa346f48..832f2da3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -6,15 +6,16 @@ use async_trait::async_trait; use futures::future::Future; +use http_body_util::BodyExt; use hyper::header::HeaderValue; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use hyper::{HeaderMap, StatusCode}; +use hyper_util::rt::TokioIo; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ global, @@ -28,6 +29,8 @@ use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; +use crate::helpers::{BoxBody, BytesBody}; + pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); @@ -36,7 +39,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_status_code(&self) -> StatusCode; fn add_http_headers(&self, header_map: &mut HeaderMap); - fn http_body(&self, garage_region: &str, path: &str) -> Body; + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody; } #[async_trait] @@ -47,12 +50,12 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Endpoint: ApiEndpoint; type Error: ApiError; - fn parse_endpoint(&self, r: &Request) -> Result; + fn parse_endpoint(&self, r: &Request) -> Result; async fn handle( &self, - req: Request, + req: Request, endpoint: Self::Endpoint, - ) -> Result, Self::Error>; + ) -> Result>, Self::Error>; } pub(crate) struct ApiServer { @@ -101,72 +104,79 @@ impl ApiServer { unix_bind_addr_mode: Option, shutdown_signal: impl Future, ) -> Result<(), GarageError> { - let tcp_service = make_service_fn(|conn: &AddrStream| { - let this = self.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request| { - let this = this.clone(); - - this.handler(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let this = self.clone(); - - let path = bind_addr.to_string(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request| { - let this = this.clone(); - - this.handler(req, path.clone()) - })) - } - }); - info!( "{} API server listening on {}", A::API_NAME_DISPLAY, bind_addr ); + tokio::pin!(shutdown_signal); + match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + self.launch_handler(stream, client_addr.to_string()); + } } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + loop { + let (stream, _) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + self.launch_handler(stream, path.display().to_string()); + } } }; Ok(()) } + fn launch_handler(self: &Arc, stream: S, client_addr: String) + where + S: AsyncRead + AsyncWrite + Send + Sync + 'static, + { + let this = self.clone(); + let io = TokioIo::new(stream); + + let serve = + move |req: Request| this.clone().handler(req, client_addr.to_string()); + + tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + } + async fn handler( self: Arc, - req: Request, + req: Request, addr: String, - ) -> Result, GarageError> { + ) -> Result>, GarageError> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = @@ -205,7 +215,7 @@ impl ApiServer { Ok(x) } Err(e) => { - let body: Body = e.http_body(&self.region, uri.path()); + let body = e.http_body(&self.region, uri.path()); let mut http_error_builder = Response::builder().status(e.http_status_code()); if let Some(header_map) = http_error_builder.headers_mut() { @@ -219,12 +229,15 @@ impl ApiServer { } else { info!("Response: error {}, {}", e.http_status_code(), e); } - Ok(http_error) + Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!())))) } } } - async fn handler_stage2(&self, req: Request) -> Result, A::Error> { + async fn handler_stage2( + &self, + req: Request, + ) -> Result>, A::Error> { let endpoint = self.api_handler.parse_endpoint(&req)?; debug!("Endpoint: {}", endpoint.name()); diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 8efaa231..541b2def 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,5 @@ -use hyper::{body::HttpBody, Body, Request, Response}; +use http_body_util::{BodyExt, Full as FullBody}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; @@ -138,18 +139,36 @@ pub fn key_after_prefix(pfx: &str) -> Option { None } -pub async fn parse_json_body Deserialize<'de>>(req: Request) -> Result { +// =============== body helpers ================= + +pub type BytesBody = FullBody; +pub type BoxBody = http_body_util::combinators::BoxBody; + +pub fn string_body(s: String) -> BoxBody { + bytes_body(bytes::Bytes::from(s.into_bytes())) +} +pub fn bytes_body(b: bytes::Bytes) -> BoxBody { + BoxBody::new(FullBody::new(b).map_err(|_| unreachable!())) +} +pub fn empty_body() -> BoxBody { + BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) +} + +pub async fn parse_json_body(req: Request) -> Result +where + T: for<'de> Deserialize<'de>, +{ let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } -pub fn json_ok_response(res: &T) -> Result, Error> { +pub fn json_ok_response(res: &T) -> Result>, Error> { let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; Ok(Response::builder() .status(hyper::StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(resp_json))?) + .body(string_body(resp_json))?) } pub fn is_default(v: &T) -> bool { diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 887839dd..7717fd49 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use futures::future::Future; use hyper::header; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -34,6 +34,9 @@ use crate::s3::put::*; use crate::s3::router::Endpoint; use crate::s3::website::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody; + pub struct S3ApiServer { garage: Arc, } @@ -57,10 +60,10 @@ impl S3ApiServer { async fn handle_request_without_bucket( &self, - _req: Request, + _req: Request, api_key: Key, endpoint: Endpoint, - ) -> Result, Error> { + ) -> Result, Error> { match endpoint { Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), @@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer { type Endpoint = S3ApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request) -> Result { + fn parse_endpoint(&self, req: &Request) -> Result { let authority = req .headers() .get(header::HOST) @@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer { async fn handle( &self, - req: Request, + req: Request, endpoint: S3ApiEndpoint, - ) -> Result, Error> { + ) -> Result, Error> { let S3ApiEndpoint { bucket_name, endpoint, @@ -235,8 +238,7 @@ impl ApiHandler for S3ApiServer { } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); + let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } Endpoint::DeleteBucket {} => { diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index a2437524..fa2f1b6d 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; @@ -14,11 +15,13 @@ use garage_util::data::*; use garage_util::time::*; use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc) -> Result, Error> { +pub fn handle_get_bucket_location(garage: Arc) -> Result, Error> { let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc) -> Result, Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub fn handle_get_bucket_versioning() -> Result, Error> { +pub fn handle_get_bucket_versioning() -> Result, Error> { let versioning = s3_xml::VersioningConfiguration { xmlns: (), status: None, @@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result, Error> { Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result, Error> { +pub async fn handle_list_buckets( + garage: &Garage, + api_key: &Key, +) -> Result, Error> { let key_p = api_key.params().ok_or_internal_error( "Key should not be in deleted state at this point (in handle_list_buckets)", )?; @@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result, + req: Request, content_sha256: Option, api_key: Key, bucket_name: String, -) -> Result, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -194,7 +200,7 @@ pub async fn handle_create_bucket( Ok(Response::builder() .header("Location", format!("/{}", bucket_name)) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } @@ -203,7 +209,7 @@ pub async fn handle_delete_bucket( bucket_id: Uuid, bucket_name: String, api_key: Key, -) -> Result, Error> { +) -> Result, Error> { let key_params = api_key .params() .ok_or_internal_error("Key should not be deleted at this point")?; @@ -282,7 +288,7 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option> { diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 68b4f0c9..ba9bfc88 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; -use hyper::{Body, Request, Response}; +use hyper::{Request, Response}; use serde::Serialize; use garage_rpc::netapp::bytes_buf::BytesBuf; @@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::parse_bucket_key; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::multipart; use crate::s3::put::get_headers; @@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc, api_key: &Key, - req: &Request, + req: &Request, dest_bucket_id: Uuid, dest_key: &str, -) -> Result, Error> { +) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let source_object = get_copy_source(&garage, api_key, req).await?; @@ -176,18 +177,18 @@ pub async fn handle_copy( "x-amz-copy-source-version-id", hex::encode(source_version.uuid), ) - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_upload_part_copy( garage: Arc, api_key: &Key, - req: &Request, + req: &Request, dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, -) -> Result, Error> { +) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let dest_upload_id = multipart::decode_upload_id(upload_id)?; @@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), ) - .body(Body::from(resp_xml))?) + .body(string_body(resp_xml))?) } async fn get_copy_source( garage: &Garage, api_key: &Key, - req: &Request, + req: &Request, ) -> Result { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; @@ -501,7 +502,7 @@ struct CopyPreconditionHeaders { } impl CopyPreconditionHeaders { - fn parse(req: &Request) -> Result { + fn parse(req: &Request) -> Result { Ok(Self { copy_source_if_match: req .headers() diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index d2bcf125..4b8754a9 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -5,10 +5,17 @@ use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, }; -use hyper::{body::HttpBody, header::HeaderName, Body, Method, Request, Response, StatusCode}; +use hyper::{ + body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, + StatusCode, +}; + +use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -17,7 +24,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error> { +pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -34,18 +41,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_cors( garage: Arc, mut bucket: Bucket, -) -> Result, Error> { +) -> Result, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -55,16 +62,16 @@ pub async fn handle_delete_cors( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_cors( garage: Arc, mut bucket: Bucket, - req: Request, + req: Request, content_sha256: Option, -) -> Result, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -84,14 +91,14 @@ pub async fn handle_put_cors( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_options_s3api( garage: Arc, - req: &Request, + req: &Request, bucket_name: Option, -) -> Result, Error> { +) -> Result, Error> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -121,7 +128,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } else { // If there is no bucket name in the request, @@ -131,14 +138,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } pub fn handle_options_for_bucket( - req: &Request, + req: &Request, bucket: &Bucket, -) -> Result, Error> { +) -> Result, Error> { let origin = req .headers() .get("Origin") @@ -161,7 +168,7 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(Body::empty())?; + .body(empty_body())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } @@ -172,7 +179,7 @@ pub fn handle_options_for_bucket( pub fn find_matching_cors_rule<'a>( bucket: &'a Bucket, - req: &Request, + req: &Request, ) -> Result, Error> { if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -209,7 +216,7 @@ where } pub fn add_cors_headers( - resp: &mut Response, + resp: &mut Response, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 685ce004..3fb39147 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,12 +1,15 @@ use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; @@ -59,11 +62,11 @@ pub async fn handle_delete( garage: Arc, bucket_id: Uuid, key: &str, -) -> Result, Error> { +) -> Result, Error> { match handle_delete_internal(&garage, bucket_id, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap()), Err(e) => Err(e), } @@ -72,10 +75,10 @@ pub async fn handle_delete( pub async fn handle_delete_objects( garage: Arc, bucket_id: Uuid, - req: Request, + req: Request, content_sha256: Option, -) -> Result, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -118,7 +121,7 @@ pub async fn handle_delete_objects( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } struct DeleteRequest { diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index c50cff9f..8afe4954 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -2,13 +2,14 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; +use hyper::{HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; +use crate::helpers::*; use crate::s3::xml as s3_xml; use crate::signature::error::Error as SignatureError; @@ -189,22 +190,23 @@ impl ApiError for Error { } } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = s3_xml::Error { code: s3_xml::Value(self.aws_code().to_string()), message: s3_xml::Value(format!("{}", self)), resource: Some(s3_xml::Value(path.to_string())), region: Some(s3_xml::Value(garage_region.to_string())), }; - Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { r#" - InternalError - XML encoding of error failed + InternalError + XML encoding of error failed - "# + "# .into() - })) + }); + BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 5e682726..71f0b158 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -8,7 +8,7 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; @@ -20,6 +20,8 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -52,8 +54,8 @@ fn object_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request, -) -> Option> { + req: &Request, +) -> Option> { // It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational @@ -80,7 +82,7 @@ fn try_answer_cached( Some( Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ) } else { @@ -91,11 +93,11 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( garage: Arc, - req: &Request, + req: &Request, bucket_id: Uuid, key: &str, part_number: Option, -) -> Result, Error> { +) -> Result, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -138,7 +140,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, "1") .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -163,7 +165,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } _ => unreachable!(), } @@ -171,18 +173,18 @@ pub async fn handle_head( Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } /// Handle GET request pub async fn handle_get( garage: Arc, - req: &Request, + req: &Request, bucket_id: Uuid, key: &str, part_number: Option, -) -> Result, Error> { +) -> Result, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -240,8 +242,7 @@ pub async fn handle_get( match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - let body: Body = Body::from(bytes.to_vec()); - Ok(resp_builder.body(body)?) + Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { let (tx, rx) = mpsc::channel(2); @@ -293,10 +294,14 @@ pub async fn handle_get( } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); - - let body = hyper::body::Body::wrap_stream(body_stream); - Ok(resp_builder.body(body)?) + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + let body = http_body_util::StreamBody::new(body_stream); + Ok(resp_builder.body(ResBody::new(body))?) } } } @@ -308,7 +313,7 @@ async fn handle_get_range( version_meta: &ObjectVersionMeta, begin: u64, end: u64, -) -> Result, Error> { +) -> Result, Error> { let resp_builder = object_headers(version, version_meta) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( @@ -321,7 +326,7 @@ async fn handle_get_range( ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { if end as usize <= bytes.len() { - let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); + let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) } else { Err(Error::internal_error( @@ -348,7 +353,7 @@ async fn handle_get_part( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, part_number: u64, -) -> Result, Error> { +) -> Result, Error> { let resp_builder = object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); @@ -364,7 +369,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(Body::from(bytes.to_vec()))?) + .body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -392,7 +397,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request, + req: &Request, total_size: u64, ) -> Result, Error> { let range = match req.headers().get(RANGE) { @@ -434,7 +439,7 @@ fn body_from_blocks_range( all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, -) -> Body { +) -> ResBody { // We will store here the list of blocks that have an intersection with the requested // range, as well as their "true offset", which is their actual offset in the complete // file (whereas block.offset designates the offset of the block WITHIN THE PART @@ -456,59 +461,74 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { - let garage = garage.clone(); - async move { - garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { - let r = match chunk { - Ok(chunk_bytes) => { - let chunk_len = chunk_bytes.len() as u64; - let r = if *chunk_offset >= end { - // The current chunk is after the part we want to read. - // Returning None here will stop the scan, the rest of the - // stream will be ignored - None - } else if *chunk_offset + chunk_len <= begin { - // The current chunk is before the part we want to read. - // We return a None that will be removed by the filter_map - // below. - Some(None) - } else { - // The chunk has an intersection with the requested range - let start_in_chunk = if *chunk_offset > begin { - 0 - } else { - begin - *chunk_offset - }; - let end_in_chunk = if *chunk_offset + chunk_len < end { - chunk_len + let mut body_stream = + futures::stream::iter(blocks) + .enumerate() + .map(move |(i, (block, block_offset))| { + let garage = garage.clone(); + async move { + garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) } else { - end - *chunk_offset + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes.slice( + start_in_chunk as usize..end_in_chunk as usize, + )))) }; - Some(Some(Ok(chunk_bytes - .slice(start_in_chunk as usize..end_in_chunk as usize)))) - }; - *chunk_offset += chunk_bytes.len() as u64; - r - } - Err(e) => Some(Some(Err(e))), - }; - futures::future::ready(r) - }) - .filter_map(futures::future::ready) - } - }) - .buffered(2) - .flatten(); + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) + } + }); - hyper::body::Body::wrap_stream(body_stream) + let (tx, rx) = mpsc::channel(2); + tokio::spawn(async move { + while let Some(item) = body_stream.next().await { + if tx.send(item.await).await.is_err() { + break; // connection closed by client + } + } + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + ResBody::new(http_body_util::StreamBody::new(body_stream)) } fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index ae8fbc37..35757e8c 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,10 +1,13 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -16,7 +19,7 @@ use garage_model::bucket_table::{ use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Error> { +pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Err Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_lifecycle( garage: Arc, mut bucket: Bucket, -) -> Result, Error> { +) -> Result, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_lifecycle( garage: Arc, mut bucket: Bucket, - req: Request, + req: Request, content_sha256: Option, -) -> Result, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 1b9e8cd5..b832a4f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable}; use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Response}; +use hyper::Response; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -16,7 +16,8 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; use crate::encoding::*; -use crate::helpers::key_after_prefix; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -63,7 +64,7 @@ pub struct ListPartsQuery { pub async fn handle_list( garage: Arc, query: &ListObjectsQuery, -) -> Result, Error> { +) -> Result, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -162,13 +163,13 @@ pub async fn handle_list( let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_multipart_upload( garage: Arc, query: &ListMultipartUploadsQuery, -) -> Result, Error> { +) -> Result, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_parts( garage: Arc, query: &ListPartsQuery, -) -> Result, Error> { +) -> Result, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; @@ -319,7 +320,7 @@ pub async fn handle_list_parts( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } /* diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 96c4d044..4aa27eaf 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use futures::{prelude::*, TryStreamExt}; -use hyper::body::Body; -use hyper::{body::HttpBody, Request, Response}; +use http_body_util::BodyStream; +use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; @@ -17,6 +17,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -26,11 +28,11 @@ use crate::signature::verify_signed_content; pub async fn handle_create_multipart_upload( garage: Arc, - req: &Request, + req: &Request, bucket_name: &str, bucket_id: Uuid, key: &String, -) -> Result, Error> { +) -> Result, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -65,18 +67,18 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc, - req: Request, + req: Request, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option, -) -> Result, Error> { +) -> Result, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -87,8 +89,10 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = TryStreamExt::map_err(req.into_body(), Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); + let body_stream = BodyStream::new(req.into_body()) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let mut chunker = StreamChunker::new(body_stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), @@ -172,7 +176,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) + .body(empty_body()) .unwrap(); Ok(response) } @@ -210,14 +214,16 @@ impl Drop for InterruptedCleanup { pub async fn handle_complete_multipart_upload( garage: Arc, - req: Request, + req: Request, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option, -) -> Result, Error> { - let body = HttpBody::collect(req.into_body()).await?.to_bytes(); +) -> Result, Error> { + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -365,7 +371,7 @@ pub async fn handle_complete_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( @@ -373,7 +379,7 @@ pub async fn handle_abort_multipart_upload( bucket_id: Uuid, key: &str, upload_id: &str, -) -> Result, Error> { +) -> Result, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = @@ -383,7 +389,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Body::from(vec![]))) + Ok(Response::new(empty_body())) } // ======== helpers ============ diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index f9eccb7f..e9732dc4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{Infallible, TryInto}; use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; @@ -7,14 +7,17 @@ use std::task::{Context, Poll}; use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use http_body_util::BodyStream; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; @@ -23,9 +26,9 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc, - req: Request, + req: Request, bucket_name: String, -) -> Result, Error> { +) -> Result, Error> { let boundary = req .headers() .get(header::CONTENT_TYPE) @@ -42,7 +45,10 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let mut multipart = Multipart::with_constraints(body, boundary, constraints); + let body_stream = BodyStream::new(body) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { @@ -259,7 +265,7 @@ pub async fn handle_post_object( .status(StatusCode::SEE_OTHER) .header(header::LOCATION, target.clone()) .header(header::ETAG, etag) - .body(target.into())? + .body(string_body(target))? } else { let path = head .uri @@ -290,7 +296,7 @@ pub async fn handle_post_object( .header(header::LOCATION, location.clone()) .header(header::ETAG, etag.clone()); match action { - "200" => builder.status(StatusCode::OK).body(Body::empty())?, + "200" => builder.status(StatusCode::OK).body(empty_body())?, "201" => { let xml = s3_xml::PostObject { xmlns: (), @@ -302,14 +308,16 @@ pub async fn handle_post_object( let body = s3_xml::to_xml_with_header(&xml)?; builder .status(StatusCode::CREATED) - .body(Body::from(body.into_bytes()))? + .body(string_body(body))? } - _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?, + _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?, } }; - let matching_cors_rule = - find_matching_cors_rule(&bucket, &Request::from_parts(head, Body::empty()))?; + let matching_cors_rule = find_matching_cors_rule( + &bucket, + &Request::from_parts(head, empty_body::()), + )?; if let Some(rule) = matching_cors_rule { add_cors_headers(&mut resp, rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 606facc4..3d43eee8 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use hyper::body::{Body, Bytes}; +use http_body_util::BodyStream; +use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; pub async fn handle_put( garage: Arc, - req: Request, + req: Request, bucket: &Bucket, key: &String, content_sha256: Option, -) -> Result, Error> { +) -> Result, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); @@ -48,13 +51,14 @@ pub async fn handle_put( None => None, }; - let (_head, body) = req.into_parts(); - let body = body.map_err(Error::from); + let body_stream = BodyStream::new(req.into_body()) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); save_stream( garage, headers, - body, + body_stream, bucket, key, content_md5, @@ -434,11 +438,11 @@ impl> + Unpin> StreamChunker { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { +pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap() } diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index f754ff1b..1c1dbf20 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,9 +1,12 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -12,7 +15,7 @@ use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result, Error> { +pub async fn handle_get_website(bucket: &Bucket) -> Result, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result, Error Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_website( garage: Arc, mut bucket: Bucket, -) -> Result, Error> { +) -> Result, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -54,16 +57,16 @@ pub async fn handle_delete_website( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_website( garage: Arc, mut bucket: Bucket, - req: Request, + req: Request, content_sha256: Option, -) -> Result, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -83,7 +86,7 @@ pub async fn handle_put_website( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index b50fb3bb..652c606e 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use hmac::Mac; -use hyper::{Body, Method, Request}; +use hyper::{body::Incoming as IncomingBody, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; @@ -20,7 +20,7 @@ use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, service: &'static str, - request: &Request, + request: &Request, ) -> Result<(Option, Option), Error> { let mut headers = HashMap::new(); for (key, val) in request.headers() { diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index c8358c4f..8fb0c4ef 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -5,22 +5,26 @@ use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; -use hyper::body::Bytes; -use hyper::{Body, Request}; +use http_body_util::{BodyStream, StreamBody}; +use hyper::body::{Bytes, Incoming as IncomingBody}; +use hyper::Request; use garage_util::data::Hash; use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; +use crate::helpers::*; use crate::signature::error::*; +pub type ReqBody = BoxBody; + pub fn parse_streaming_body( api_key: &Key, - req: Request, + req: Request, content_sha256: &mut Option, region: &str, service: &str, -) -> Result, Error> { +) -> Result, Error> { match req.headers().get("x-amz-content-sha256") { Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { let signature = content_sha256 @@ -47,19 +51,17 @@ pub fn parse_streaming_body( .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) + let body_stream = BodyStream::new(body) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let signed_payload_stream = + SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) + .map(|x| x.map(hyper::body::Frame::data)) + .map_err(Error::from); + ReqBody::new(StreamBody::new(signed_payload_stream)) })) } - _ => Ok(req), + _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), } } -- cgit v1.2.3 From a22bd319202f05bce4ad13072238c7ba81d518fb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 19:27:12 +0100 Subject: [dep-upgrade-202402] migration to http/hyper 1.0 for k2v api --- src/api/admin/bucket.rs | 6 +++--- src/api/admin/cluster.rs | 8 ++++---- src/api/admin/error.rs | 12 ------------ src/api/admin/key.rs | 6 +++--- src/api/common_error.rs | 23 ++++++++++++++++++++++- src/api/helpers.rs | 24 +++++++++++++++++++----- src/api/k2v/api_server.rs | 16 ++++++++++------ src/api/k2v/batch.rs | 31 ++++++++++++++++--------------- src/api/k2v/error.rs | 32 ++++++-------------------------- src/api/k2v/index.rs | 7 ++++--- src/api/k2v/item.rs | 46 +++++++++++++++++++++++++--------------------- src/api/s3/api_server.rs | 3 ++- src/api/s3/cors.rs | 19 +++++++++++-------- src/api/s3/error.rs | 28 +++------------------------- src/api/signature/error.rs | 4 ---- 15 files changed, 128 insertions(+), 137 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 0bfb87c5..1b22dd03 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -271,7 +271,7 @@ pub async fn handle_create_bucket( garage: &Arc, req: Request, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; if let Some(ga) = &req.global_alias { if !is_valid_bucket_name(ga) { @@ -412,7 +412,7 @@ pub async fn handle_update_bucket( id: String, req: Request, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; let bucket_id = parse_bucket_id(&id)?; let mut bucket = garage @@ -474,7 +474,7 @@ pub async fn handle_bucket_change_key_perm( req: Request, new_perm_flag: bool, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; let bucket_id = parse_bucket_id(&req.bucket_id)?; diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 1ec8d6de..3876c608 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -64,7 +64,7 @@ pub async fn handle_connect_cluster_nodes( garage: &Arc, req: Request, ) -> Result, Error> { - let req = parse_json_body::>(req).await?; + let req = parse_json_body::, _, Error>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) .await @@ -206,7 +206,7 @@ pub async fn handle_update_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { - let updates = parse_json_body::(req).await?; + let updates = parse_json_body::(req).await?; let mut layout = garage.system.get_cluster_layout(); @@ -246,7 +246,7 @@ pub async fn handle_apply_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { - let param = parse_json_body::(req).await?; + let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; @@ -264,7 +264,7 @@ pub async fn handle_revert_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { - let param = parse_json_body::(req).await?; + let param = parse_json_body::(req).await?; let layout = garage.system.get_cluster_layout(); let layout = layout.revert_staged_changes(Some(param.version))?; diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 98cc7a9e..011c903f 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -40,18 +40,6 @@ where impl CommonErrorDerivative for Error {} -impl From for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n), - } - } -} - impl Error { fn code(&self) -> &'static str { match self { diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 3e5d2cab..1efaca16 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -65,7 +65,7 @@ pub async fn handle_create_key( garage: &Arc, req: Request, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); garage.key_table.insert(&key).await?; @@ -83,7 +83,7 @@ pub async fn handle_import_key( garage: &Arc, req: Request, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; if prev_key.is_some() { @@ -114,7 +114,7 @@ pub async fn handle_update_key( id: String, req: Request, ) -> Result, Error> { - let req = parse_json_body::(req).await?; + let req = parse_json_body::(req).await?; let mut key = garage.key_helper().get_existing_key(&id).await?; diff --git a/src/api/common_error.rs b/src/api/common_error.rs index 20f9f266..4381f227 100644 --- a/src/api/common_error.rs +++ b/src/api/common_error.rs @@ -3,6 +3,8 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +use garage_model::helper::error::Error as HelperError; + /// Errors of this crate #[derive(Debug, Error)] pub enum CommonError { @@ -28,6 +30,10 @@ pub enum CommonError { #[error(display = "Bad request: {}", _0)] BadRequest(String), + /// The client sent a header with invalid value + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + // ---- SPECIFIC ERROR CONDITIONS ---- // These have to be error codes referenced in the S3 spec here: // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList @@ -64,7 +70,9 @@ impl CommonError { CommonError::Forbidden(_) => StatusCode::FORBIDDEN, CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, - CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, + CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => { + StatusCode::BAD_REQUEST + } } } @@ -84,6 +92,7 @@ impl CommonError { CommonError::BucketAlreadyExists => "BucketAlreadyExists", CommonError::BucketNotEmpty => "BucketNotEmpty", CommonError::InvalidBucketName(_) => "InvalidBucketName", + CommonError::InvalidHeader(_) => "InvalidHeaderValue", } } @@ -92,6 +101,18 @@ impl CommonError { } } +impl From for CommonError { + fn from(err: HelperError) -> Self { + match err { + HelperError::Internal(i) => Self::InternalError(i), + HelperError::BadRequest(b) => Self::BadRequest(b), + HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n), + HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n), + e => Self::bad_request(format!("{}", e)), + } + } +} + pub trait CommonErrorDerivative: From { fn internal_error(msg: M) -> Self { Self::from(CommonError::InternalError(GarageError::Message( diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 541b2def..57aa1ea1 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,8 +1,10 @@ use http_body_util::{BodyExt, Full as FullBody}; -use hyper::{body::Incoming as IncomingBody, Request, Response}; +use hyper::{body::Body, Request, Response}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; +use garage_util::error::Error as GarageError; + use crate::common_error::{CommonError as Error, *}; /// What kind of authorization is required to perform a given action @@ -141,6 +143,7 @@ pub fn key_after_prefix(pfx: &str) -> Option { // =============== body helpers ================= +pub type EmptyBody = http_body_util::Empty; pub type BytesBody = FullBody; pub type BoxBody = http_body_util::combinators::BoxBody; @@ -153,22 +156,33 @@ pub fn bytes_body(b: bytes::Bytes) -> BoxBody { pub fn empty_body() -> BoxBody { BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) } +pub fn string_bytes_body(s: String) -> BytesBody { + BytesBody::from(bytes::Bytes::from(s.into_bytes())) +} -pub async fn parse_json_body(req: Request) -> Result +pub async fn parse_json_body(req: Request) -> Result where T: for<'de> Deserialize<'de>, + B: Body, + E: From<::Error> + From, { let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } -pub fn json_ok_response(res: &T) -> Result>, Error> { - let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; +pub fn json_ok_response(res: &T) -> Result>, E> +where + E: From, +{ + let resp_json = serde_json::to_string_pretty(res) + .map_err(GarageError::from) + .map_err(Error::from)?; Ok(Response::builder() .status(hyper::StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(string_body(resp_json))?) + .body(string_body(resp_json)) + .unwrap()) } pub fn is_default(v: &T) -> bool { diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 3a032aba..128742c4 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; -use hyper::{Body, Method, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -25,6 +25,9 @@ use crate::k2v::item::*; use crate::k2v::router::Endpoint; use crate::s3::cors::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody; + pub struct K2VApiServer { garage: Arc, } @@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer { type Endpoint = K2VApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request) -> Result { + fn parse_endpoint(&self, req: &Request) -> Result { let (endpoint, bucket_name) = Endpoint::from_request(req)?; Ok(K2VApiEndpoint { @@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer { async fn handle( &self, - req: Request, + req: Request, endpoint: K2VApiEndpoint, - ) -> Result, Error> { + ) -> Result, Error> { let K2VApiEndpoint { bucket_name, endpoint, @@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is procesed early, before we even check for an API key if let Endpoint::Options = endpoint { - return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + let options_res = handle_options_api(garage, &req, Some(bucket_name)) .await - .ok_or_bad_request("Error handling OPTIONS")?); + .ok_or_bad_request("Error handling OPTIONS")?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 294380ea..ae2778b1 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -13,15 +13,16 @@ use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( garage: Arc, bucket_id: Uuid, - req: Request, -) -> Result, Error> { - let items = parse_json_body::>(req).await?; + req: Request, +) -> Result, Error> { + let items = parse_json_body::, _, Error>(req).await?; let mut items2 = vec![]; for it in items { @@ -41,15 +42,15 @@ pub async fn handle_insert_batch( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_read_batch( garage: Arc, bucket_id: Uuid, - req: Request, -) -> Result, Error> { - let queries = parse_json_body::>(req).await?; + req: Request, +) -> Result, Error> { + let queries = parse_json_body::, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -139,9 +140,9 @@ async fn handle_read_batch_query( pub async fn handle_delete_batch( garage: Arc, bucket_id: Uuid, - req: Request, -) -> Result, Error> { - let queries = parse_json_body::>(req).await?; + req: Request, +) -> Result, Error> { + let queries = parse_json_body::, _, Error>(req).await?; let resp_results = futures::future::join_all( queries @@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range( garage: Arc, bucket_id: Uuid, partition_key: &str, - req: Request, -) -> Result, Error> { + req: Request, +) -> Result, Error> { use garage_model::k2v::sub::PollRange; - let query = parse_json_body::(req).await?; + let query = parse_json_body::(req).await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; @@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 4eb017ab..72e712bf 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -1,13 +1,11 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; - -use garage_model::helper::error::Error as HelperError; +use hyper::{HeaderMap, StatusCode}; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::*; use crate::signature::error::Error as SignatureError; /// Errors of this crate @@ -30,10 +28,6 @@ pub enum Error { #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client asked for an invalid return format (invalid Accept header) #[error(display = "Not acceptable: {}", _0)] NotAcceptable(String), @@ -54,18 +48,6 @@ where impl CommonErrorDerivative for Error {} -impl From for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::Common(CommonError::BadRequest(format!("{}", e))), - } - } -} - impl From for Error { fn from(err: SignatureError) -> Self { match err { @@ -74,7 +56,6 @@ impl From for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -90,7 +71,6 @@ impl Error { Error::NotAcceptable(_) => "NotAcceptable", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::InvalidBase64(_) => "InvalidBase64", - Error::InvalidHeader(_) => "InvalidHeaderValue", Error::InvalidUtf8Str(_) => "InvalidUtf8String", } } @@ -105,7 +85,6 @@ impl ApiError for Error { Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) - | Error::InvalidHeader(_) | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, } } @@ -115,14 +94,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -130,6 +109,7 @@ impl ApiError for Error { } "# .into() - })) + }); + string_bytes_body(error_str) } } diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 6c1d4a91..1baec1db 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use hyper::{Body, Response}; +use hyper::Response; use serde::Serialize; use garage_util::data::*; @@ -12,6 +12,7 @@ use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; +use crate::k2v::api_server::ResBody; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -23,7 +24,7 @@ pub async fn handle_read_index( end: Option, limit: Option, reverse: Option, -) -> Result, Error> { +) -> Result, Error> { let reverse = reverse.unwrap_or(false); let ring: Arc = garage.system.ring.borrow().clone(); @@ -68,7 +69,7 @@ pub async fn handle_read_index( next_start, }; - Ok(json_ok_response(&resp)?) + json_ok_response::(&resp) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 33f4da53..0c5931a1 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use http::header; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -11,6 +11,8 @@ use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; +use crate::helpers::*; +use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; @@ -22,7 +24,7 @@ pub enum ReturnFormat { } impl ReturnFormat { - pub fn from(req: &Request) -> Result { + pub fn from(req: &Request) -> Result { let accept = match req.headers().get(header::ACCEPT) { Some(a) => a.to_str()?, None => return Ok(Self::Json), @@ -40,7 +42,7 @@ impl ReturnFormat { } } - pub fn make_response(&self, item: &K2VItem) -> Result, Error> { + pub fn make_response(&self, item: &K2VItem) -> Result, Error> { let vals = item.values(); if vals.is_empty() { @@ -52,7 +54,7 @@ impl ReturnFormat { Self::Binary if vals.len() > 1 => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .status(StatusCode::CONFLICT) - .body(Body::empty())?), + .body(empty_body())?), Self::Binary => { assert!(vals.len() == 1); Self::make_binary_response(ct, vals[0]) @@ -62,22 +64,22 @@ impl ReturnFormat { } } - fn make_binary_response(ct: String, v: &DvvsValue) -> Result, Error> { + fn make_binary_response(ct: String, v: &DvvsValue) -> Result, Error> { match v { DvvsValue::Deleted => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::NO_CONTENT) - .body(Body::empty())?), + .body(empty_body())?), DvvsValue::Value(v) => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::OK) - .body(Body::from(v.to_vec()))?), + .body(bytes_body(v.to_vec().into()))?), } } - fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result, Error> { + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result, Error> { let items = v .iter() .map(|v| match v { @@ -91,7 +93,7 @@ impl ReturnFormat { .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/json") .status(StatusCode::OK) - .body(Body::from(json_body))?) + .body(string_body(json_body))?) } } @@ -99,11 +101,11 @@ impl ReturnFormat { #[allow(clippy::ptr_arg)] pub async fn handle_read_item( garage: Arc, - req: &Request, + req: &Request, bucket_id: Uuid, partition_key: &str, sort_key: &String, -) -> Result, Error> { +) -> Result, Error> { let format = ReturnFormat::from(req)?; let item = garage @@ -124,11 +126,11 @@ pub async fn handle_read_item( pub async fn handle_insert_item( garage: Arc, - req: Request, + req: Request, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result, Error> { +) -> Result, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -137,7 +139,9 @@ pub async fn handle_insert_item( .map(CausalContext::parse_helper) .transpose()?; - let body = req.into_body().collect().await?.to_bytes(); + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); let value = DvvsValue::Value(body.to_vec()); @@ -155,16 +159,16 @@ pub async fn handle_insert_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_delete_item( garage: Arc, - req: Request, + req: Request, bucket_id: Uuid, partition_key: &str, sort_key: &str, -) -> Result, Error> { +) -> Result, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -189,20 +193,20 @@ pub async fn handle_delete_item( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( garage: Arc, - req: &Request, + req: &Request, bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option, -) -> Result, Error> { +) -> Result, Error> { let format = ReturnFormat::from(req)?; let causal_context = @@ -227,6 +231,6 @@ pub async fn handle_poll_item( } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty())?) + .body(empty_body())?) } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 7717fd49..495c5832 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -121,7 +121,8 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; + let options_res = handle_options_api(garage, &req, bucket_name).await?; + return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 4b8754a9..e069cae4 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -14,6 +14,7 @@ use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::common_error::CommonError; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; @@ -94,11 +95,11 @@ pub async fn handle_put_cors( .body(empty_body())?) } -pub async fn handle_options_s3api( +pub async fn handle_options_api( garage: Arc, req: &Request, bucket_name: Option, -) -> Result, Error> { +) -> Result, CommonError> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -128,7 +129,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(empty_body())?) + .body(EmptyBody::new())?) } } else { // If there is no bucket name in the request, @@ -138,14 +139,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(empty_body())?) + .body(EmptyBody::new())?) } } pub fn handle_options_for_bucket( req: &Request, bucket: &Bucket, -) -> Result, Error> { +) -> Result, CommonError> { let origin = req .headers() .get("Origin") @@ -168,13 +169,15 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(empty_body())?; + .body(EmptyBody::new())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } } - Err(Error::forbidden("This CORS request is not allowed.")) + Err(CommonError::Forbidden( + "This CORS request is not allowed.".into(), + )) } pub fn find_matching_cors_rule<'a>( @@ -216,7 +219,7 @@ where } pub fn add_cors_headers( - resp: &mut Response, + resp: &mut Response, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 8afe4954..a4d222de 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -4,8 +4,6 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; -use garage_model::helper::error::Error as HelperError; - use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; @@ -63,10 +61,6 @@ pub enum Error { #[error(display = "Invalid XML: {}", _0)] InvalidXml(String), - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), - /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), @@ -87,18 +81,6 @@ where impl CommonErrorDerivative for Error {} -impl From for Error { - fn from(err: HelperError) -> Self { - match err { - HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)), - HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)), - HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)), - HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)), - e => Self::bad_request(format!("{}", e)), - } - } -} - impl From for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -119,7 +101,6 @@ impl From for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), - SignatureError::InvalidHeader(h) => Self::InvalidHeader(h), } } } @@ -144,9 +125,7 @@ impl Error { Error::NotImplemented(_) => "NotImplemented", Error::InvalidXml(_) => "MalformedXML", Error::InvalidRange(_) => "InvalidRange", - Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { - "InvalidRequest" - } + Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", } } } @@ -166,8 +145,7 @@ impl ApiError for Error { | Error::EntityTooSmall | Error::InvalidXml(_) | Error::InvalidUtf8Str(_) - | Error::InvalidUtf8String(_) - | Error::InvalidHeader(_) => StatusCode::BAD_REQUEST, + | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, } } @@ -207,6 +185,6 @@ impl ApiError for Error { "# .into() }); - BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) + string_bytes_body(error_str) } } diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs index f0d7c816..2d92a072 100644 --- a/src/api/signature/error.rs +++ b/src/api/signature/error.rs @@ -18,10 +18,6 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8Str(#[error(source)] std::str::Utf8Error), - - /// The client sent a header with invalid value - #[error(display = "Invalid header value: {}", _0)] - InvalidHeader(#[error(source)] hyper::header::ToStrError), } impl From for Error -- cgit v1.2.3 From 22332e6c3536159656e773a85d656352882ffc32 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 20:26:33 +0100 Subject: [dep-upgrade-202402] simplify/refactor GetObject --- src/api/s3/get.rs | 152 ++++++++++++++++++++++++++---------------------------- 1 file changed, 74 insertions(+), 78 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 71f0b158..f70dad7d 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -11,7 +11,8 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; +use garage_block::manager::BlockStream; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; @@ -245,7 +246,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel(2); + let (tx, rx) = mpsc::channel::(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -283,25 +284,13 @@ pub async fn handle_get( { Ok(()) => (), Err(e) => { - let err = std::io::Error::new( - std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - let _ = tx - .send(Box::pin(stream::once(future::ready(Err(err))))) - .await; + let _ = tx.send(error_stream_item(e)).await; } } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) - .flatten() - .map(|x| { - x.map(hyper::body::Frame::data) - .map_err(|e| Error::from(garage_util::error::Error::from(e))) - }); - let body = http_body_util::StreamBody::new(body_stream); - Ok(resp_builder.body(ResBody::new(body))?) + let body = response_body_from_block_stream(rx); + Ok(resp_builder.body(body)?) } } } @@ -461,67 +450,75 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let mut body_stream = - futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { - let garage = garage.clone(); - async move { - garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { - let r = match chunk { - Ok(chunk_bytes) => { - let chunk_len = chunk_bytes.len() as u64; - let r = if *chunk_offset >= end { - // The current chunk is after the part we want to read. - // Returning None here will stop the scan, the rest of the - // stream will be ignored - None - } else if *chunk_offset + chunk_len <= begin { - // The current chunk is before the part we want to read. - // We return a None that will be removed by the filter_map - // below. - Some(None) + let (tx, rx) = mpsc::channel::(2); + + tokio::spawn(async move { + match async { + let garage = garage.clone(); + for (i, (block, block_offset)) in blocks.iter().enumerate() { + let block_stream = garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await? + .scan(*block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 } else { - // The chunk has an intersection with the requested range - let start_in_chunk = if *chunk_offset > begin { - 0 - } else { - begin - *chunk_offset - }; - let end_in_chunk = if *chunk_offset + chunk_len < end { - chunk_len - } else { - end - *chunk_offset - }; - Some(Some(Ok(chunk_bytes.slice( - start_in_chunk as usize..end_in_chunk as usize, - )))) + begin - *chunk_offset }; - *chunk_offset += chunk_bytes.len() as u64; - r - } - Err(e) => Some(Some(Err(e))), - }; - futures::future::ready(r) - }) - .filter_map(futures::future::ready) - } - }); + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready); + + let block_stream: BlockStream = Box::pin(block_stream); + tx.send(Box::pin(block_stream)) + .await + .ok_or_message("channel closed")?; + } - let (tx, rx) = mpsc::channel(2); - tokio::spawn(async move { - while let Some(item) = body_stream.next().await { - if tx.send(item.await).await.is_err() { - break; // connection closed by client + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let _ = tx.send(error_stream_item(e)).await; } } }); + response_body_from_block_stream(rx) +} + +fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -531,11 +528,10 @@ fn body_from_blocks_range( ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) +fn error_stream_item(e: E) -> BlockStream { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + Box::pin(stream::once(future::ready(Err(err)))) } -- cgit v1.2.3 From fe48d60d2b63a9914297558bd5dbccae8d96c947 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:33:07 +0100 Subject: [dep-upgrade-202402] refactor http listener code --- src/api/generic_server.rs | 155 +++++++++++++++++++++++++++++++++------------- 1 file changed, 111 insertions(+), 44 deletions(-) (limited to 'src/api') diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 832f2da3..e3005f8a 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; +use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; use http_body_util::BodyExt; use hyper::header::HeaderValue; @@ -15,7 +16,7 @@ use hyper::{HeaderMap, StatusCode}; use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::{TcpListener, UnixListener}; +use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use opentelemetry::{ global, @@ -110,20 +111,12 @@ impl ApiServer { bind_addr ); - tokio::pin!(shutdown_signal); - match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; - loop { - let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - self.launch_handler(stream, client_addr.to_string()); - } + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(listener, handler, shutdown_signal).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -131,52 +124,24 @@ impl ApiServer { } let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; - loop { - let (stream, _) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - self.launch_handler(stream, path.display().to_string()); - } - } - }; - - Ok(()) - } - - fn launch_handler(self: &Arc, stream: S, client_addr: String) - where - S: AsyncRead + AsyncWrite + Send + Sync + 'static, - { - let this = self.clone(); - let io = TokioIo::new(stream); - - let serve = - move |req: Request| this.clone().handler(req, client_addr.to_string()); - - tokio::task::spawn(async move { - let io = Box::pin(io); - if let Err(e) = http1::Builder::new() - .serve_connection(io, service_fn(serve)) - .await - { - debug!("Error handling HTTP connection: {}", e); + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(listener, handler, shutdown_signal).await } - }); + } } async fn handler( self: Arc, req: Request, addr: String, - ) -> Result>, GarageError> { + ) -> Result>, http::Error> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = @@ -278,3 +243,105 @@ impl ApiServer { res } } + +// ==== helper functions ==== + +#[async_trait] +pub trait Accept: Send + Sync + 'static { + type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; +} + +#[async_trait] +impl Accept for TcpListener { + type Stream = TcpStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.accept() + .await + .map(|(stream, addr)| (stream, addr.to_string())) + } +} + +pub struct UnixListenerOn(pub UnixListener, pub String); + +#[async_trait] +impl Accept for UnixListenerOn { + type Stream = UnixStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.0 + .accept() + .await + .map(|(stream, _addr)| (stream, self.1.clone())) + } +} + +pub async fn server_loop( + listener: A, + handler: H, + shutdown_signal: impl Future, +) -> Result<(), GarageError> +where + A: Accept, + H: Fn(Request, String) -> F + Send + Sync + Clone + 'static, + F: Future>, http::Error>> + Send + 'static, + E: Send + Sync + std::error::Error + 'static, +{ + tokio::pin!(shutdown_signal); + + let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); + let connection_collector = tokio::spawn(async move { + let mut collection = FuturesUnordered::new(); + loop { + let collect_next = async { + if collection.is_empty() { + futures::future::pending().await + } else { + collection.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("HTTP connection finished: {:?}", result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => collection.push(f), + None => break, + } + } + } + } + debug!("Collecting last open HTTP connections."); + while let Some(conn_res) = collection.next().await { + trace!("HTTP connection finished: {:?}", conn_res); + } + debug!("No more HTTP connections to collect"); + }); + + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + let io = TokioIo::new(stream); + + let handler = handler.clone(); + let serve = move |req: Request| handler(req, client_addr.clone()); + + let fut = tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + conn_in.send(fut)?; + } + + connection_collector.await?; + + Ok(()) +} -- cgit v1.2.3 From e524e7a30d4d81c84f1c110017ad972dc5617bf6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:45:52 +0100 Subject: [dep-upgrade-202402] rename BytesBody into ErrorBody for clarity --- src/api/admin/error.rs | 6 +++--- src/api/generic_server.rs | 4 ++-- src/api/helpers.rs | 6 +++--- src/api/k2v/error.rs | 4 ++-- src/api/s3/error.rs | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 011c903f..2668b42d 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -7,7 +7,7 @@ pub use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::{BytesBody, CustomApiErrorBody}; +use crate::helpers::*; /// Errors of this crate #[derive(Debug, Error)] @@ -65,7 +65,7 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), @@ -81,6 +81,6 @@ impl ApiError for Error { "# .into() }); - BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) + error_body(error_str) } } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index e3005f8a..20d70833 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -30,7 +30,7 @@ use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; -use crate::helpers::{BoxBody, BytesBody}; +use crate::helpers::{BoxBody, ErrorBody}; pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; @@ -40,7 +40,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_status_code(&self) -> StatusCode; fn add_http_headers(&self, header_map: &mut HeaderMap); - fn http_body(&self, garage_region: &str, path: &str) -> BytesBody; + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } #[async_trait] diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 57aa1ea1..66f4c465 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -144,7 +144,7 @@ pub fn key_after_prefix(pfx: &str) -> Option { // =============== body helpers ================= pub type EmptyBody = http_body_util::Empty; -pub type BytesBody = FullBody; +pub type ErrorBody = FullBody; pub type BoxBody = http_body_util::combinators::BoxBody; pub fn string_body(s: String) -> BoxBody { @@ -156,8 +156,8 @@ pub fn bytes_body(b: bytes::Bytes) -> BoxBody { pub fn empty_body() -> BoxBody { BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) } -pub fn string_bytes_body(s: String) -> BytesBody { - BytesBody::from(bytes::Bytes::from(s.into_bytes())) +pub fn error_body(s: String) -> ErrorBody { + ErrorBody::from(bytes::Bytes::from(s.into_bytes())) } pub async fn parse_json_body(req: Request) -> Result diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 72e712bf..16479227 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -94,7 +94,7 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), @@ -110,6 +110,6 @@ impl ApiError for Error { "# .into() }); - string_bytes_body(error_str) + error_body(error_str) } } diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index a4d222de..f86c19a6 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -168,7 +168,7 @@ impl ApiError for Error { } } - fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { + fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody { let error = s3_xml::Error { code: s3_xml::Value(self.aws_code().to_string()), message: s3_xml::Value(format!("{}", self)), @@ -185,6 +185,6 @@ impl ApiError for Error { "# .into() }); - string_bytes_body(error_str) + error_body(error_str) } } -- cgit v1.2.3 From a31d1bd4969654e6041070b6e80ae025925808c2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:48:27 +0100 Subject: [dep-upgrade-202402] fix obsolete DateTime::from_utc calls --- src/api/signature/payload.rs | 4 ++-- src/api/signature/streaming.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'src/api') diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 652c606e..423aad93 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc}; use hmac::Mac; use hyper::{body::Incoming as IncomingBody, Method, Request}; use sha2::{Digest, Sha256}; @@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { pub fn parse_date(date: &str) -> Result, Error> { let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; - Ok(DateTime::from_utc(date, Utc)) + Ok(Utc.from_utc_datetime(&date)) } pub async fn verify_v4( diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 8fb0c4ef..ea5a64e2 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use garage_model::key_table::Key; @@ -44,7 +44,7 @@ pub fn parse_streaming_body( .to_str()?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) .ok_or_bad_request("Invalid date")?; - let date: DateTime = DateTime::from_utc(date, Utc); + let date: DateTime = Utc.from_utc_datetime(&date); let scope = compute_scope(&date, region, service); let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) -- cgit v1.2.3 From 53746b59e525ff5f518ed59d7831b05e2732785d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:53:13 +0100 Subject: [dep-upgrade-202402] slightly more explicit error management --- src/api/generic_server.rs | 4 +++- src/api/helpers.rs | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) (limited to 'src/api') diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 20d70833..7b37417e 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use std::fs::{self, Permissions}; use std::os::unix::fs::PermissionsExt; use std::sync::Arc; @@ -194,7 +195,8 @@ impl ApiServer { } else { info!("Response: error {}, {}", e.http_status_code(), e); } - Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!())))) + Ok(http_error + .map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!())))) } } } diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 66f4c465..ba7b1599 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,3 +1,5 @@ +use std::convert::Infallible; + use http_body_util::{BodyExt, Full as FullBody}; use hyper::{body::Body, Request, Response}; use idna::domain_to_unicode; @@ -151,10 +153,10 @@ pub fn string_body(s: String) -> BoxBody { bytes_body(bytes::Bytes::from(s.into_bytes())) } pub fn bytes_body(b: bytes::Bytes) -> BoxBody { - BoxBody::new(FullBody::new(b).map_err(|_| unreachable!())) + BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!())) } pub fn empty_body() -> BoxBody { - BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) + BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!())) } pub fn error_body(s: String) -> ErrorBody { ErrorBody::from(bytes::Bytes::from(s.into_bytes())) -- cgit v1.2.3 From e011941964b1c1e0b90f85014d166d64a83ae8e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 15:25:49 +0100 Subject: [dep-upgrade-202402] refactor use of BodyStream --- src/api/helpers.rs | 23 ++++++++++++++++++++++- src/api/s3/multipart.rs | 9 +++------ src/api/s3/post_object.rs | 9 +++------ src/api/s3/put.rs | 14 ++++++-------- src/api/signature/streaming.rs | 8 +++----- 5 files changed, 37 insertions(+), 26 deletions(-) (limited to 'src/api') diff --git a/src/api/helpers.rs b/src/api/helpers.rs index ba7b1599..5f488912 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,7 +1,12 @@ use std::convert::Infallible; +use futures::{Stream, StreamExt, TryStreamExt}; + use http_body_util::{BodyExt, Full as FullBody}; -use hyper::{body::Body, Request, Response}; +use hyper::{ + body::{Body, Bytes}, + Request, Response, +}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; @@ -187,6 +192,22 @@ where .unwrap()) } +pub fn body_stream(body: B) -> impl Stream> +where + B: Body, + ::Error: Into, + E: From, +{ + let stream = http_body_util::BodyStream::new(body); + let stream = TryStreamExt::map_err(stream, Into::into); + stream.map(|x| { + x.and_then(|f| { + f.into_data() + .map_err(|_| E::from(Error::bad_request("non-data frame"))) + }) + }) +} + pub fn is_default(v: &T) -> bool { *v == T::default() } diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 4aa27eaf..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use futures::{prelude::*, TryStreamExt}; -use http_body_util::BodyStream; +use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -89,10 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut chunker = StreamChunker::new(body_stream, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index e9732dc4..bca8d6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -7,8 +7,7 @@ use std::task::{Context, Poll}; use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::{Stream, StreamExt, TryStreamExt}; -use http_body_util::BodyStream; +use futures::{Stream, StreamExt}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; @@ -45,10 +44,8 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let body_stream = BodyStream::new(body) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints); + let stream = body_stream::<_, Error>(body); + let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 3d43eee8..17424862 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use http_body_util::BodyStream; -use hyper::body::Bytes; -use hyper::header::{HeaderMap, HeaderValue}; -use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use hyper::body::{Body, Bytes}; +use hyper::header::{HeaderMap, HeaderValue}; +use hyper::{Request, Response}; + use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, Context, @@ -51,14 +51,12 @@ pub async fn handle_put( None => None, }; - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); + let stream = body_stream(req.into_body()); save_stream( garage, headers, - body_stream, + stream, bucket, key, content_md5, diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index ea5a64e2..39147ca0 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -5,7 +5,7 @@ use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; -use http_body_util::{BodyStream, StreamBody}; +use http_body_util::StreamBody; use hyper::body::{Bytes, Incoming as IncomingBody}; use hyper::Request; @@ -51,11 +51,9 @@ pub fn parse_streaming_body( .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { - let body_stream = BodyStream::new(body) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); + let stream = body_stream::<_, Error>(body); let signed_payload_stream = - SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) + SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) .map(|x| x.map(hyper::body::Frame::data)) .map_err(Error::from); ReqBody::new(StreamBody::new(signed_payload_stream)) -- cgit v1.2.3 From ad5ce968d212d951f7459c36bcbdd78ce39be585 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 8 Feb 2024 18:57:18 +0100 Subject: [dep-upgrade-202402] remove useless mut --- src/api/s3/put.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 17424862..5ac5fb6b 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -7,7 +7,7 @@ use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; -use hyper::body::{Body, Bytes}; +use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; -- cgit v1.2.3 From 5c63193d1de909cdecc501b482f5dc269a84874d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 8 Feb 2024 23:43:59 +0100 Subject: [dep-upgrade-202402] fix shutdown issue introduced when upgrading hyper --- src/api/admin/api_server.rs | 6 ++-- src/api/generic_server.rs | 88 ++++++++++++++++++++++++++------------------- src/api/k2v/api_server.rs | 6 ++-- src/api/s3/api_server.rs | 6 ++-- 4 files changed, 61 insertions(+), 45 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index d5e1c777..50813d11 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use tokio::sync::watch; use opentelemetry::trace::SpanRef; @@ -65,11 +65,11 @@ impl AdminApiServer { pub async fn run( self, bind_addr: UnixOrTCPSocketAddress, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, Some(0o220), shutdown_signal) + .run_server(bind_addr, Some(0o220), must_exit) .await } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 7b37417e..9c49fdf3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -18,6 +18,7 @@ use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; +use tokio::sync::watch; use opentelemetry::{ global, @@ -104,20 +105,17 @@ impl ApiServer { self: Arc, bind_addr: UnixOrTCPSocketAddress, unix_bind_addr_mode: Option, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { - info!( - "{} API server listening on {}", - A::API_NAME_DISPLAY, - bind_addr - ); + let server_name = format!("{} API", A::API_NAME_DISPLAY); + info!("{} server listening on {}", server_name, bind_addr); match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -133,7 +131,7 @@ impl ApiServer { )?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } } } @@ -278,9 +276,10 @@ impl Accept for UnixListenerOn { } pub async fn server_loop( + server_name: String, listener: A, handler: H, - shutdown_signal: impl Future, + mut must_exit: watch::Receiver, ) -> Result<(), GarageError> where A: Accept, @@ -288,42 +287,57 @@ where F: Future>, http::Error>> + Send + 'static, E: Send + Sync + std::error::Error + 'static, { - tokio::pin!(shutdown_signal); - let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); - let connection_collector = tokio::spawn(async move { - let mut collection = FuturesUnordered::new(); - loop { - let collect_next = async { - if collection.is_empty() { - futures::future::pending().await - } else { - collection.next().await - } - }; - tokio::select! { - result = collect_next => { - trace!("HTTP connection finished: {:?}", result); - } - new_fut = conn_out.recv() => { - match new_fut { - Some(f) => collection.push(f), - None => break, + let connection_collector = tokio::spawn({ + let server_name = server_name.clone(); + async move { + let mut connections = FuturesUnordered::new(); + loop { + let collect_next = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("{} server: HTTP connection finished: {:?}", server_name, result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => connections.push(f), + None => break, + } } } } + if !connections.is_empty() { + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + while let Some(conn_res) = connections.next().await { + trace!( + "{} server: HTTP connection finished: {:?}", + server_name, + conn_res + ); + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + } + } } - debug!("Collecting last open HTTP connections."); - while let Some(conn_res) = collection.next().await { - trace!("HTTP connection finished: {:?}", conn_res); - } - debug!("No more HTTP connections to collect"); }); - loop { + while !*must_exit.borrow() { let (stream, client_addr) = tokio::select! { acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, + _ = must_exit.changed() => continue, }; let io = TokioIo::new(stream); @@ -343,6 +357,8 @@ where conn_in.send(fut)?; } + info!("{} server exiting", server_name); + drop(conn_in); connection_collector.await?; Ok(()) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 128742c4..e97da2af 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -42,10 +42,10 @@ impl K2VApiServer { garage: Arc, bind_addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, None, shutdown_signal) + .run_server(bind_addr, None, must_exit) .await } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 495c5832..4b815f79 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -51,10 +51,10 @@ impl S3ApiServer { garage: Arc, addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, None, shutdown_signal) + .run_server(addr, None, must_exit) .await } -- cgit v1.2.3 From 10bc2ead6015bf446451015ee73e902c3fd5e9a9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Feb 2024 14:15:29 +0100 Subject: [multi-char-delimiter-692] allow multi-character delimiters in List* (fix #692) --- src/api/s3/api_server.rs | 6 +++--- src/api/s3/router.rs | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 4b815f79..0065ca59 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -260,7 +260,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), @@ -290,7 +290,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_keys.unwrap_or(1000).clamp(1, 1000), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), prefix: prefix.unwrap_or_default(), @@ -323,7 +323,7 @@ impl ApiHandler for S3ApiServer { common: ListQueryCommon { bucket_name, bucket_id, - delimiter: delimiter.map(|d| d.to_string()), + delimiter, page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index 821b0e07..05882885 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -170,7 +170,7 @@ pub enum Endpoint { }, ListBuckets, ListMultipartUploads { - delimiter: Option, + delimiter: Option, encoding_type: Option, key_marker: Option, max_uploads: Option, @@ -178,7 +178,7 @@ pub enum Endpoint { upload_id_marker: Option, }, ListObjects { - delimiter: Option, + delimiter: Option, encoding_type: Option, marker: Option, max_keys: Option, @@ -188,7 +188,7 @@ pub enum Endpoint { // This value should always be 2. It is not checked when constructing the struct list_type: String, continuation_token: Option, - delimiter: Option, + delimiter: Option, encoding_type: Option, fetch_owner: Option, max_keys: Option, @@ -196,7 +196,7 @@ pub enum Endpoint { start_after: Option, }, ListObjectVersions { - delimiter: Option, + delimiter: Option, encoding_type: Option, key_marker: Option, max_keys: Option, -- cgit v1.2.3 From 02e98e2d100a6af96369a72bc6979580424fe7df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Feb 2024 15:34:42 +0100 Subject: [header-override-650] implement header overriding in GetObject (fix #650) --- src/api/s3/api_server.rs | 22 +++++++++++++++++++-- src/api/s3/get.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++--- src/api/s3/router.rs | 21 +++++++++++++++++++- 3 files changed, 87 insertions(+), 6 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 0065ca59..7fac6261 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -178,8 +178,26 @@ impl ApiHandler for S3ApiServer { key, part_number, .. } => handle_head(garage, &req, bucket_id, &key, part_number).await, Endpoint::GetObject { - key, part_number, .. - } => handle_get(garage, &req, bucket_id, &key, part_number).await, + key, + part_number, + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + .. + } => { + let overrides = GetObjectOverrides { + response_cache_control, + response_content_disposition, + response_content_encoding, + response_content_language, + response_content_type, + response_expires, + }; + handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + } Endpoint::UploadPart { key, part_number, diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index f70dad7d..53f0a345 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -1,12 +1,14 @@ //! Function related to GET and HEAD requests +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; use futures::future; use futures::stream::{self, StreamExt}; use http::header::{ - ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, - IF_NONE_MATCH, LAST_MODIFIED, RANGE, + ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, + LAST_MODIFIED, RANGE, }; use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; @@ -27,6 +29,16 @@ use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; +#[derive(Default)] +pub struct GetObjectOverrides { + pub(crate) response_cache_control: Option, + pub(crate) response_content_disposition: Option, + pub(crate) response_content_encoding: Option, + pub(crate) response_content_language: Option, + pub(crate) response_content_type: Option, + pub(crate) response_expires: Option, +} + fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -52,6 +64,32 @@ fn object_headers( resp } +/// Override headers according to specific query parameters, see +/// section "Overriding response header values through the request" in +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html +fn getobject_override_headers( + overrides: GetObjectOverrides, + resp: &mut http::response::Builder, +) -> Result<(), Error> { + // TODO: this only applies for signed requests, so when we support + // anonymous access in the future we will have to do a permission check here + let overrides = [ + (CACHE_CONTROL, overrides.response_cache_control), + (CONTENT_DISPOSITION, overrides.response_content_disposition), + (CONTENT_ENCODING, overrides.response_content_encoding), + (CONTENT_LANGUAGE, overrides.response_content_language), + (CONTENT_TYPE, overrides.response_content_type), + (EXPIRES, overrides.response_expires), + ]; + for (hdr, val_opt) in overrides { + if let Some(val) = val_opt { + let val = val.try_into().ok_or_bad_request("invalid header value")?; + resp.headers_mut().unwrap().insert(hdr, val); + } + } + Ok(()) +} + fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -185,6 +223,7 @@ pub async fn handle_get( bucket_id: Uuid, key: &str, part_number: Option, + overrides: GetObjectOverrides, ) -> Result, Error> { let object = garage .object_table @@ -236,9 +275,10 @@ pub async fn handle_get( (None, None) => (), } - let resp_builder = object_headers(last_v, last_v_meta) + let mut resp_builder = object_headers(last_v, last_v_meta) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size)) .status(StatusCode::OK); + getobject_override_headers(overrides, &mut resp_builder)?; match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), @@ -303,6 +343,9 @@ async fn handle_get_range( begin: u64, end: u64, ) -> Result, Error> { + // Here we do not use getobject_override_headers because we don't + // want to add any overridden headers (those should not be added + // when returning PARTIAL_CONTENT) let resp_builder = object_headers(version, version_meta) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( @@ -343,6 +386,7 @@ async fn handle_get_part( version_meta: &ObjectVersionMeta, part_number: u64, ) -> Result, Error> { + // Same as for get_range, no getobject_override_headers let resp_builder = object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs index 05882885..e7ac1d77 100644 --- a/src/api/s3/router.rs +++ b/src/api/s3/router.rs @@ -125,6 +125,12 @@ pub enum Endpoint { key: String, part_number: Option, version_id: Option, + response_cache_control: Option, + response_content_disposition: Option, + response_content_encoding: Option, + response_content_language: Option, + response_content_type: Option, + response_expires: Option, }, GetObjectAcl { key: String, @@ -358,7 +364,14 @@ impl Endpoint { (query.keyword.take().unwrap_or_default(), key, query, None), key: [ EMPTY if upload_id => ListParts (query::upload_id, opt_parse::max_parts, opt_parse::part_number_marker), - EMPTY => GetObject (query_opt::version_id, opt_parse::part_number), + EMPTY => GetObject (query_opt::version_id, + opt_parse::part_number, + query_opt::response_cache_control, + query_opt::response_content_disposition, + query_opt::response_content_encoding, + query_opt::response_content_language, + query_opt::response_content_type, + query_opt::response_expires), ACL => GetObjectAcl (query_opt::version_id), LEGAL_HOLD => GetObjectLegalHold (query_opt::version_id), RETENTION => GetObjectRetention (query_opt::version_id), @@ -671,6 +684,12 @@ generateQueryParameters! { "partNumber" => part_number, "part-number-marker" => part_number_marker, "prefix" => prefix, + "response-cache-control" => response_cache_control, + "response-content-disposition" => response_content_disposition, + "response-content-encoding" => response_content_encoding, + "response-content-language" => response_content_language, + "response-content-type" => response_content_type, + "response-expires" => response_expires, "select-type" => select_type, "start-after" => start_after, "uploadId" => upload_id, -- cgit v1.2.3