diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 18:47:06 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 18:47:06 +0100 |
commit | c5df820e2c2b4bff5e239b8e99f07178b98b3f5a (patch) | |
tree | 26fa3dd297ee1c8bb55f5f7573a5c3396b030507 | |
parent | a04d6cd5b8a3acffb8daeee00aed744fb1a78ea3 (diff) | |
download | garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.tar.gz garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.zip |
api: start refactor of signature to calculate checksums earlier
-rw-r--r-- | src/api/common/cors.rs | 8 | ||||
-rw-r--r-- | src/api/common/signature/body.rs | 69 | ||||
-rw-r--r-- | src/api/common/signature/checksum.rs | 135 | ||||
-rw-r--r-- | src/api/common/signature/mod.rs | 12 | ||||
-rw-r--r-- | src/api/common/signature/payload.rs | 2 | ||||
-rw-r--r-- | src/api/common/signature/streaming.rs | 52 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/item.rs | 4 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 27 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 10 | ||||
-rw-r--r-- | src/api/s3/checksum.rs | 108 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 1 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 11 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 9 | ||||
-rw-r--r-- | src/api/s3/get.rs | 16 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 10 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 14 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 1 | ||||
-rw-r--r-- | src/api/s3/put.rs | 4 | ||||
-rw-r--r-- | src/api/s3/website.rs | 10 | ||||
-rw-r--r-- | src/web/web_server.rs | 8 |
21 files changed, 288 insertions, 231 deletions
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs index 14369b56..09b55c13 100644 --- a/src/api/common/cors.rs +++ b/src/api/common/cors.rs @@ -14,9 +14,9 @@ use crate::common_error::{ }; use crate::helpers::*; -pub fn find_matching_cors_rule<'a>( +pub fn find_matching_cors_rule<'a, B>( bucket_params: &'a BucketParams, - req: &Request<impl Body>, + req: &Request<B>, ) -> Result<Option<&'a GarageCorsRule>, CommonError> { if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -132,8 +132,8 @@ pub async fn handle_options_api( } } -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, +pub fn handle_options_for_bucket<B>( + req: &Request<B>, bucket_params: &BucketParams, ) -> Result<Response<EmptyBody>, CommonError> { let origin = req diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs new file mode 100644 index 00000000..877d8d85 --- /dev/null +++ b/src/api/common/signature/body.rs @@ -0,0 +1,69 @@ +use std::sync::Mutex; + +use futures::prelude::*; +use futures::stream::BoxStream; +use http_body_util::{BodyExt, StreamBody}; +use hyper::body::{Bytes, Frame}; +use serde::Deserialize; +use tokio::sync::{mpsc, oneshot}; + +use super::*; + +use crate::signature::checksum::*; + +pub struct ReqBody { + // why need mutex to be sync?? + pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, + pub checksummer: Checksummer, + pub expected_checksums: ExpectedChecksums, +} + +pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>; + +impl ReqBody { + pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { + let body = self.collect().await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) + } + + pub async fn collect(self) -> Result<Bytes, Error> { + self.collect_with_checksums().await.map(|(b, _)| b) + } + + pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> { + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes(); + + self.checksummer.update(&bytes); + let checksums = self.checksummer.finalize(); + checksums.verify(&self.expected_checksums)?; + + Ok((bytes, checksums)) + } + + pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> { + self.streaming_with_checksums(false).0 + } + + pub fn streaming_with_checksums( + self, + add_md5: bool, + ) -> ( + impl Stream<Item = Result<Bytes, Error>>, + StreamingChecksumReceiver, + ) { + let (tx, rx) = oneshot::channel(); + // TODO: actually calculate checksums!! + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + ( + stream.map(|x| { + x.and_then(|f| { + f.into_data() + .map_err(|_| Error::bad_request("non-data frame")) + }) + }), + rx, + ) + } +} diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs index 432ed44d..b184fc65 100644 --- a/src/api/common/signature/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -8,13 +8,15 @@ use md5::{Digest, Md5}; use sha1::Sha1; use sha2::Sha256; -use http::HeaderName; +use http::{HeaderMap, HeaderName, HeaderValue}; use garage_util::data::*; use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; -use super::error::*; +use super::*; + +pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); @@ -58,14 +60,18 @@ pub struct Checksums { } impl Checksummer { - pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { - let mut ret = Self { + pub fn new() -> Self { + Self { crc32: None, crc32c: None, md5: None, sha1: None, sha256: None, - }; + } + } + + pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + let mut ret = Self::new(); if expected.md5.is_some() || require_md5 { ret.md5 = Some(Md5::new()); @@ -179,3 +185,122 @@ impl Checksums { } } } + +// ---- + +/// Extract the value of the x-amz-checksum-algorithm header +pub fn request_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + None => Ok(None), + Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +pub fn request_trailer_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? { + None => Ok(None), + Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +/// Extract the value of any of the x-amz-checksum-* headers +pub fn request_checksum_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + let mut ret = vec![]; + + if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { + let crc32 = BASE64_STANDARD + .decode(&crc32_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + ret.push(ChecksumValue::Crc32(crc32)) + } + if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { + let crc32c = BASE64_STANDARD + .decode(&crc32c_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + ret.push(ChecksumValue::Crc32c(crc32c)) + } + if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { + let sha1 = BASE64_STANDARD + .decode(&sha1_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + ret.push(ChecksumValue::Sha1(sha1)) + } + if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { + let sha256 = BASE64_STANDARD + .decode(&sha256_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + ret.push(ChecksumValue::Sha256(sha256)) + } + + if ret.len() > 1 { + return Err(Error::bad_request( + "multiple x-amz-checksum-* headers given", + )); + } + Ok(ret.pop()) +} + +/// Checks for the presence of x-amz-checksum-algorithm +/// if so extract the corresponding x-amz-checksum-* value +pub fn request_checksum_algorithm_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + Some(x) if x == "CRC32" => { + let crc32 = headers + .get(X_AMZ_CHECKSUM_CRC32) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + Ok(Some(ChecksumValue::Crc32(crc32))) + } + Some(x) if x == "CRC32C" => { + let crc32c = headers + .get(X_AMZ_CHECKSUM_CRC32C) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + Ok(Some(ChecksumValue::Crc32c(crc32c))) + } + Some(x) if x == "SHA1" => { + let sha1 = headers + .get(X_AMZ_CHECKSUM_SHA1) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + Ok(Some(ChecksumValue::Sha1(sha1))) + } + Some(x) if x == "SHA256" => { + let sha256 = headers + .get(X_AMZ_CHECKSUM_SHA256) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + Ok(Some(ChecksumValue::Sha256(sha256))) + } + Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), + None => Ok(None), + } +} diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 2421d696..e93ca85a 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -11,6 +11,7 @@ use garage_util::data::{sha256sum, Hash}; use error::*; +pub mod body; pub mod checksum; pub mod error; pub mod payload; @@ -51,7 +52,7 @@ pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; #[derive(Debug)] pub enum ContentSha256Header { UnsignedPayload, - Sha256Hash(Hash), + Sha256Checksum(Hash), StreamingPayload { trailer: bool, signed: bool }, } @@ -90,15 +91,6 @@ pub async fn verify_request( }) } -pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { - if expected_sha256 != sha256sum(body) { - return Err(Error::bad_request( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} - pub fn signing_hmac( datetime: &DateTime<Utc>, secret_key: &str, diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index ccc55c90..4ca0153f 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -94,7 +94,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade .ok() .and_then(|bytes| Hash::try_from(&bytes)) .ok_or_bad_request("Invalid content sha256 hash")?; - Ok(ContentSha256Header::Sha256Hash(sha256)) + Ok(ContentSha256Header::Sha256Checksum(sha256)) } } diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index b8a5e66d..e8f9b3d7 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -1,21 +1,22 @@ use std::pin::Pin; +use std::sync::Mutex; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use hmac::Mac; -use http_body_util::StreamBody; -use hyper::body::{Bytes, Incoming as IncomingBody}; +use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; use hyper::Request; use garage_util::data::Hash; use super::*; -use crate::helpers::*; +use crate::helpers::body_stream; +use crate::signature::checksum::*; use crate::signature::payload::CheckedSignature; -pub type ReqBody = BoxBody<Error>; +pub use crate::signature::body::ReqBody; pub fn parse_streaming_body( req: Request<IncomingBody>, @@ -23,6 +24,20 @@ pub fn parse_streaming_body( region: &str, service: &str, ) -> Result<Request<ReqBody>, Error> { + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: match &checked_signature.content_sha256_header { + ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), + _ => None, + }, + extra: None, + }; + + let mut checksummer = Checksummer::init(&expected_checksums, false); + match checked_signature.content_sha256_header { ContentSha256Header::StreamingPayload { signed, trailer } => { if !signed && !trailer { @@ -31,6 +46,11 @@ pub fn parse_streaming_body( )); } + if trailer { + let algo = request_trailer_checksum_algorithm(req.headers())?; + checksummer = checksummer.add(algo); + } + let sign_params = if signed { let signature = checked_signature .signature_header @@ -77,14 +97,24 @@ pub fn parse_streaming_body( Ok(req.map(move |body| { let stream = body_stream::<_, Error>(body); + let signed_payload_stream = - StreamingPayloadStream::new(stream, sign_params, trailer) - .map(|x| x.map(hyper::body::Frame::data)) - .map_err(Error::from); - ReqBody::new(StreamBody::new(signed_payload_stream)) + StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from); + ReqBody { + stream: Mutex::new(signed_payload_stream.boxed()), + checksummer, + expected_checksums, + } })) } - _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), + _ => Ok(req.map(|body| { + let stream = http_body_util::BodyStream::new(body).map_err(Error::from); + ReqBody { + stream: Mutex::new(stream.boxed()), + checksummer, + expected_checksums, + } + })), } } @@ -386,7 +416,7 @@ impl<S> Stream for StreamingPayloadStream<S> where S: Stream<Item = Result<Bytes, Error>> + Unpin, { - type Item = Result<Bytes, StreamingPayloadError>; + type Item = Result<Frame<Bytes>, StreamingPayloadError>; fn poll_next( self: Pin<&mut Self>, @@ -450,7 +480,7 @@ where return Poll::Ready(None); } - return Poll::Ready(Some(Ok(data))); + return Poll::Ready(Some(Ok(Frame::data(data)))); } StreamingPayloadChunk::Trailer(trailer) => { if let Some(signing) = this.signing.as_mut() { diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index c284dbd4..7a03d836 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -20,7 +20,7 @@ pub async fn handle_insert_batch( let ReqCtx { garage, bucket_id, .. } = &ctx; - let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; + let items = req.into_body().json::<Vec<InsertBatchItem>>().await?; let mut items2 = vec![]; for it in items { @@ -47,7 +47,7 @@ pub async fn handle_read_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -141,7 +141,7 @@ pub async fn handle_delete_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -262,7 +262,7 @@ pub(crate) async fn handle_poll_range( } = ctx; use garage_model::k2v::sub::PollRange; - let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; + let query = req.into_body().json::<PollRangeQuery>().await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 4e28b499..0fb945d2 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -144,9 +144,7 @@ pub async fn handle_insert_item( .map(parse_causality_token) .transpose()?; - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let body = req.into_body().collect().await?; let value = DvvsValue::Value(body.to_vec()); diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 0fdaab70..fe6545cc 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -125,7 +125,7 @@ impl ApiHandler for S3ApiServer { let req = verified_request.request; let api_key = verified_request.access_key; let content_sha256 = match verified_request.content_sha256_header { - ContentSha256Header::Sha256Hash(h) => Some(h), + ContentSha256Header::Sha256Checksum(h) => Some(h), // TODO take into account streaming/trailer checksums, etc. _ => None, }; @@ -141,14 +141,7 @@ impl ApiHandler for S3ApiServer { // Special code path for CreateBucket API endpoint if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket( - &garage, - req, - content_sha256, - &api_key.key_id, - bucket_name, - ) - .await; + return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } let bucket_id = garage @@ -186,7 +179,7 @@ impl ApiHandler for S3ApiServer { let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(ctx, &req, &key, part_number).await, + } => handle_head(ctx, &req.map(|_| ()), &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -206,7 +199,7 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(ctx, &req, &key, part_number, overrides).await + handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await } Endpoint::UploadPart { key, @@ -228,7 +221,7 @@ impl ApiHandler for S3ApiServer { handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await + handle_complete_multipart_upload(ctx, req, &key, &upload_id).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { @@ -331,17 +324,15 @@ impl ApiHandler for S3ApiServer { }; handle_list_parts(ctx, req, &query).await } - Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, - Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await, Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, - Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await, Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, - Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(ctx, req, content_sha256).await - } + Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await, Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 0a192ba6..3a09e769 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; @@ -10,12 +9,10 @@ use garage_model::key_table::Key; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; -use garage_util::data::*; use garage_util::time::*; use garage_api_common::common_error::CommonError; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -122,15 +119,10 @@ pub async fn handle_list_buckets( pub async fn handle_create_bucket( garage: &Garage, req: Request<ReqBody>, - content_sha256: Option<Hash>, api_key_id: &String, bucket_name: String, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs index a720a82f..8e6096b6 100644 --- a/src/api/s3/checksum.rs +++ b/src/api/s3/checksum.rs @@ -8,8 +8,6 @@ use md5::{Digest, Md5}; use sha1::Sha1; use sha2::Sha256; -use http::{HeaderMap, HeaderValue}; - use garage_util::error::OkOrMessage; use garage_model::s3::object_table::*; @@ -112,112 +110,6 @@ impl MultipartChecksummer { } } -// ---- - -/// Extract the value of the x-amz-checksum-algorithm header -pub(crate) fn request_checksum_algorithm( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumAlgorithm>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - None => Ok(None), - Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), - Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), - Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), - Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), - _ => Err(Error::bad_request("invalid checksum algorithm")), - } -} - -/// Extract the value of any of the x-amz-checksum-* headers -pub(crate) fn request_checksum_value( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - let mut ret = vec![]; - - if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { - let crc32 = BASE64_STANDARD - .decode(&crc32_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - ret.push(ChecksumValue::Crc32(crc32)) - } - if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { - let crc32c = BASE64_STANDARD - .decode(&crc32c_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - ret.push(ChecksumValue::Crc32c(crc32c)) - } - if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { - let sha1 = BASE64_STANDARD - .decode(&sha1_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - ret.push(ChecksumValue::Sha1(sha1)) - } - if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { - let sha256 = BASE64_STANDARD - .decode(&sha256_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - ret.push(ChecksumValue::Sha256(sha256)) - } - - if ret.len() > 1 { - return Err(Error::bad_request( - "multiple x-amz-checksum-* headers given", - )); - } - Ok(ret.pop()) -} - -/// Checks for the presence of x-amz-checksum-algorithm -/// if so extract the corresponding x-amz-checksum-* value -pub(crate) fn request_checksum_algorithm_value( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - Some(x) if x == "CRC32" => { - let crc32 = headers - .get(X_AMZ_CHECKSUM_CRC32) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - Ok(Some(ChecksumValue::Crc32(crc32))) - } - Some(x) if x == "CRC32C" => { - let crc32c = headers - .get(X_AMZ_CHECKSUM_CRC32C) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - Ok(Some(ChecksumValue::Crc32c(crc32c))) - } - Some(x) if x == "SHA1" => { - let sha1 = headers - .get(X_AMZ_CHECKSUM_SHA1) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - Ok(Some(ChecksumValue::Sha1(sha1))) - } - Some(x) if x == "SHA256" => { - let sha256 = headers - .get(X_AMZ_CHECKSUM_SHA256) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - Ok(Some(ChecksumValue::Sha256(sha256))) - } - Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), - None => Ok(None), - } -} - pub(crate) fn add_checksum_response_headers( checksum: &Option<ChecksumValue>, mut resp: http::response::Builder, diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4bf68406..9ae48807 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -24,7 +24,6 @@ use garage_api_common::helpers::*; use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::get::full_object_byte_stream; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 625b84db..fcfdb934 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -2,15 +2,11 @@ use quick_xml::de::from_reader; use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; -use http_body_util::BodyExt; - use serde::{Deserialize, Serialize}; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -59,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> pub async fn handle_put_cors( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -68,11 +63,7 @@ pub async fn handle_put_cors( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index b799e67a..d785b9d8 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,4 +1,3 @@ -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -6,7 +5,6 @@ use garage_util::data::*; use garage_model::s3::object_table::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -68,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, pub async fn handle_delete_objects( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 6627cf4a..16e2b935 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -12,7 +12,7 @@ use http::header::{ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{body::Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; @@ -119,7 +119,7 @@ fn getobject_override_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<impl Body>, + req: &Request<()>, ) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes @@ -158,7 +158,7 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, ) -> Result<Response<ResBody>, Error> { @@ -168,7 +168,7 @@ pub async fn handle_head( /// Handle HEAD request for website pub async fn handle_head_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -279,7 +279,7 @@ pub async fn handle_head_without_ctx( /// Handle GET request pub async fn handle_get( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, overrides: GetObjectOverrides, @@ -290,7 +290,7 @@ pub async fn handle_get( /// Handle GET request pub async fn handle_get_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -578,7 +578,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<impl Body>, + req: &Request<()>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -619,7 +619,7 @@ struct ChecksumMode { enabled: bool, } -fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { +fn checksum_mode(req: &Request<()>) -> ChecksumMode { ChecksumMode { enabled: req .headers() diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index c35047ed..c140494e 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,12 +1,10 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -16,7 +14,6 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_util::data::*; pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -56,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E pub async fn handle_put_lifecycle( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -65,11 +61,7 @@ pub async fn handle_put_lifecycle( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 7f8d6440..f381d670 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -17,7 +17,6 @@ use garage_model::s3::version_table::*; use garage_api_common::helpers::*; use garage_api_common::signature::checksum::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::checksum::*; @@ -114,7 +113,11 @@ pub async fn handle_put_part( let key = key.to_string(); let (req_head, req_body) = req.into_parts(); - let stream = body_stream(req_body); + + let (stream, checksums) = req_body.streaming_with_checksums(true); + let stream = stream.map_err(Error::from); + // TODO checksums + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, object_version, mut mpu), first_block) = @@ -249,7 +252,6 @@ pub async fn handle_complete_multipart_upload( req: Request<ReqBody>, key: &str, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -261,11 +263,7 @@ pub async fn handle_complete_multipart_upload( let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req_body.collect().await?; let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 908ee9f3..6c1b7453 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -22,7 +22,6 @@ use garage_api_common::signature::checksum::*; use garage_api_common::signature::payload::{verify_v4, Authorization}; use crate::api_server::ResBody; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::put::{get_headers, save_stream, ChecksumMode}; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 834be6f1..551c3b76 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -79,7 +79,9 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let stream = body_stream(req.into_body()); + let (stream, checksums) = req.into_body().streaming_with_checksums(true); + let stream = stream.map_err(Error::from); + // TODO checksums let res = save_stream( &ctx, diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index b55bb345..7553bef7 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,14 +1,11 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_model::bucket_table::*; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -61,7 +58,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err pub async fn handle_put_website( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -70,11 +66,7 @@ pub async fn handle_put_website( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index e73dab48..34ba834c 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -1,6 +1,6 @@ use std::fs::{self, Permissions}; use std::os::unix::prelude::PermissionsExt; -use std::{convert::Infallible, sync::Arc}; +use std::sync::Arc; use tokio::net::{TcpListener, UnixListener}; use tokio::sync::watch; @@ -163,6 +163,8 @@ impl WebServer { metrics_tags.push(KeyValue::new("host", host_header.clone())); } + let req = req.map(|_| ()); + // The actual handler let res = self .serve_file(&req) @@ -218,7 +220,7 @@ impl WebServer { async fn serve_file( self: &Arc<Self>, - req: &Request<IncomingBody>, + req: &Request<()>, ) -> Result<Response<BoxBody<ApiError>>, Error> { // Get http authority string (eg. [::1]:3902 or garage.tld:80) let authority = req @@ -322,7 +324,7 @@ impl WebServer { // Create a fake HTTP request with path = the error document let req2 = Request::builder() .uri(format!("http://{}/{}", host, &error_document)) - .body(empty_body::<Infallible>()) + .body(()) .unwrap(); match handle_get_without_ctx( |