diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 18:47:06 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-17 18:47:06 +0100 |
commit | c5df820e2c2b4bff5e239b8e99f07178b98b3f5a (patch) | |
tree | 26fa3dd297ee1c8bb55f5f7573a5c3396b030507 /src/api/common | |
parent | a04d6cd5b8a3acffb8daeee00aed744fb1a78ea3 (diff) | |
download | garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.tar.gz garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.zip |
api: start refactor of signature to calculate checksums earlier
Diffstat (limited to 'src/api/common')
-rw-r--r-- | src/api/common/cors.rs | 8 | ||||
-rw-r--r-- | src/api/common/signature/body.rs | 69 | ||||
-rw-r--r-- | src/api/common/signature/checksum.rs | 135 | ||||
-rw-r--r-- | src/api/common/signature/mod.rs | 12 | ||||
-rw-r--r-- | src/api/common/signature/payload.rs | 2 | ||||
-rw-r--r-- | src/api/common/signature/streaming.rs | 52 |
6 files changed, 247 insertions, 31 deletions
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs index 14369b56..09b55c13 100644 --- a/src/api/common/cors.rs +++ b/src/api/common/cors.rs @@ -14,9 +14,9 @@ use crate::common_error::{ }; use crate::helpers::*; -pub fn find_matching_cors_rule<'a>( +pub fn find_matching_cors_rule<'a, B>( bucket_params: &'a BucketParams, - req: &Request<impl Body>, + req: &Request<B>, ) -> Result<Option<&'a GarageCorsRule>, CommonError> { if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -132,8 +132,8 @@ pub async fn handle_options_api( } } -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, +pub fn handle_options_for_bucket<B>( + req: &Request<B>, bucket_params: &BucketParams, ) -> Result<Response<EmptyBody>, CommonError> { let origin = req diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs new file mode 100644 index 00000000..877d8d85 --- /dev/null +++ b/src/api/common/signature/body.rs @@ -0,0 +1,69 @@ +use std::sync::Mutex; + +use futures::prelude::*; +use futures::stream::BoxStream; +use http_body_util::{BodyExt, StreamBody}; +use hyper::body::{Bytes, Frame}; +use serde::Deserialize; +use tokio::sync::{mpsc, oneshot}; + +use super::*; + +use crate::signature::checksum::*; + +pub struct ReqBody { + // why need mutex to be sync?? + pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, + pub checksummer: Checksummer, + pub expected_checksums: ExpectedChecksums, +} + +pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>; + +impl ReqBody { + pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { + let body = self.collect().await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) + } + + pub async fn collect(self) -> Result<Bytes, Error> { + self.collect_with_checksums().await.map(|(b, _)| b) + } + + pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> { + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes(); + + self.checksummer.update(&bytes); + let checksums = self.checksummer.finalize(); + checksums.verify(&self.expected_checksums)?; + + Ok((bytes, checksums)) + } + + pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> { + self.streaming_with_checksums(false).0 + } + + pub fn streaming_with_checksums( + self, + add_md5: bool, + ) -> ( + impl Stream<Item = Result<Bytes, Error>>, + StreamingChecksumReceiver, + ) { + let (tx, rx) = oneshot::channel(); + // TODO: actually calculate checksums!! + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + ( + stream.map(|x| { + x.and_then(|f| { + f.into_data() + .map_err(|_| Error::bad_request("non-data frame")) + }) + }), + rx, + ) + } +} diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs index 432ed44d..b184fc65 100644 --- a/src/api/common/signature/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -8,13 +8,15 @@ use md5::{Digest, Md5}; use sha1::Sha1; use sha2::Sha256; -use http::HeaderName; +use http::{HeaderMap, HeaderName, HeaderValue}; use garage_util::data::*; use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; -use super::error::*; +use super::*; + +pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); @@ -58,14 +60,18 @@ pub struct Checksums { } impl Checksummer { - pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { - let mut ret = Self { + pub fn new() -> Self { + Self { crc32: None, crc32c: None, md5: None, sha1: None, sha256: None, - }; + } + } + + pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + let mut ret = Self::new(); if expected.md5.is_some() || require_md5 { ret.md5 = Some(Md5::new()); @@ -179,3 +185,122 @@ impl Checksums { } } } + +// ---- + +/// Extract the value of the x-amz-checksum-algorithm header +pub fn request_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + None => Ok(None), + Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +pub fn request_trailer_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? { + None => Ok(None), + Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + +/// Extract the value of any of the x-amz-checksum-* headers +pub fn request_checksum_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + let mut ret = vec![]; + + if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { + let crc32 = BASE64_STANDARD + .decode(&crc32_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + ret.push(ChecksumValue::Crc32(crc32)) + } + if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { + let crc32c = BASE64_STANDARD + .decode(&crc32c_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + ret.push(ChecksumValue::Crc32c(crc32c)) + } + if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { + let sha1 = BASE64_STANDARD + .decode(&sha1_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + ret.push(ChecksumValue::Sha1(sha1)) + } + if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { + let sha256 = BASE64_STANDARD + .decode(&sha256_str) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + ret.push(ChecksumValue::Sha256(sha256)) + } + + if ret.len() > 1 { + return Err(Error::bad_request( + "multiple x-amz-checksum-* headers given", + )); + } + Ok(ret.pop()) +} + +/// Checks for the presence of x-amz-checksum-algorithm +/// if so extract the corresponding x-amz-checksum-* value +pub fn request_checksum_algorithm_value( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumValue>, Error> { + match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { + Some(x) if x == "CRC32" => { + let crc32 = headers + .get(X_AMZ_CHECKSUM_CRC32) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; + Ok(Some(ChecksumValue::Crc32(crc32))) + } + Some(x) if x == "CRC32C" => { + let crc32c = headers + .get(X_AMZ_CHECKSUM_CRC32C) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; + Ok(Some(ChecksumValue::Crc32c(crc32c))) + } + Some(x) if x == "SHA1" => { + let sha1 = headers + .get(X_AMZ_CHECKSUM_SHA1) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; + Ok(Some(ChecksumValue::Sha1(sha1))) + } + Some(x) if x == "SHA256" => { + let sha256 = headers + .get(X_AMZ_CHECKSUM_SHA256) + .and_then(|x| BASE64_STANDARD.decode(&x).ok()) + .and_then(|x| x.try_into().ok()) + .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; + Ok(Some(ChecksumValue::Sha256(sha256))) + } + Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), + None => Ok(None), + } +} diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 2421d696..e93ca85a 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -11,6 +11,7 @@ use garage_util::data::{sha256sum, Hash}; use error::*; +pub mod body; pub mod checksum; pub mod error; pub mod payload; @@ -51,7 +52,7 @@ pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; #[derive(Debug)] pub enum ContentSha256Header { UnsignedPayload, - Sha256Hash(Hash), + Sha256Checksum(Hash), StreamingPayload { trailer: bool, signed: bool }, } @@ -90,15 +91,6 @@ pub async fn verify_request( }) } -pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { - if expected_sha256 != sha256sum(body) { - return Err(Error::bad_request( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} - pub fn signing_hmac( datetime: &DateTime<Utc>, secret_key: &str, diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index ccc55c90..4ca0153f 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -94,7 +94,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade .ok() .and_then(|bytes| Hash::try_from(&bytes)) .ok_or_bad_request("Invalid content sha256 hash")?; - Ok(ContentSha256Header::Sha256Hash(sha256)) + Ok(ContentSha256Header::Sha256Checksum(sha256)) } } diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index b8a5e66d..e8f9b3d7 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -1,21 +1,22 @@ use std::pin::Pin; +use std::sync::Mutex; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use hmac::Mac; -use http_body_util::StreamBody; -use hyper::body::{Bytes, Incoming as IncomingBody}; +use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; use hyper::Request; use garage_util::data::Hash; use super::*; -use crate::helpers::*; +use crate::helpers::body_stream; +use crate::signature::checksum::*; use crate::signature::payload::CheckedSignature; -pub type ReqBody = BoxBody<Error>; +pub use crate::signature::body::ReqBody; pub fn parse_streaming_body( req: Request<IncomingBody>, @@ -23,6 +24,20 @@ pub fn parse_streaming_body( region: &str, service: &str, ) -> Result<Request<ReqBody>, Error> { + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: match &checked_signature.content_sha256_header { + ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), + _ => None, + }, + extra: None, + }; + + let mut checksummer = Checksummer::init(&expected_checksums, false); + match checked_signature.content_sha256_header { ContentSha256Header::StreamingPayload { signed, trailer } => { if !signed && !trailer { @@ -31,6 +46,11 @@ pub fn parse_streaming_body( )); } + if trailer { + let algo = request_trailer_checksum_algorithm(req.headers())?; + checksummer = checksummer.add(algo); + } + let sign_params = if signed { let signature = checked_signature .signature_header @@ -77,14 +97,24 @@ pub fn parse_streaming_body( Ok(req.map(move |body| { let stream = body_stream::<_, Error>(body); + let signed_payload_stream = - StreamingPayloadStream::new(stream, sign_params, trailer) - .map(|x| x.map(hyper::body::Frame::data)) - .map_err(Error::from); - ReqBody::new(StreamBody::new(signed_payload_stream)) + StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from); + ReqBody { + stream: Mutex::new(signed_payload_stream.boxed()), + checksummer, + expected_checksums, + } })) } - _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), + _ => Ok(req.map(|body| { + let stream = http_body_util::BodyStream::new(body).map_err(Error::from); + ReqBody { + stream: Mutex::new(stream.boxed()), + checksummer, + expected_checksums, + } + })), } } @@ -386,7 +416,7 @@ impl<S> Stream for StreamingPayloadStream<S> where S: Stream<Item = Result<Bytes, Error>> + Unpin, { - type Item = Result<Bytes, StreamingPayloadError>; + type Item = Result<Frame<Bytes>, StreamingPayloadError>; fn poll_next( self: Pin<&mut Self>, @@ -450,7 +480,7 @@ where return Poll::Ready(None); } - return Poll::Ready(Some(Ok(data))); + return Poll::Ready(Some(Ok(Frame::data(data)))); } StreamingPayloadChunk::Trailer(trailer) => { if let Some(signing) = this.signing.as_mut() { |