diff options
author | Alex <lx@deuxfleurs.fr> | 2025-02-19 09:59:32 +0000 |
---|---|---|
committer | Alex <lx@deuxfleurs.fr> | 2025-02-19 09:59:32 +0000 |
commit | f64ec6e542c73a4eaaf1962330c7bfe4d7c47461 (patch) | |
tree | c44ffa88d70f99c512283dbd3d377990e4a55893 /src/api | |
parent | 859b38b0d260a0833e5e604c873c7d259acff22e (diff) | |
parent | 6d38907dac2872a43e5bbaa108c14e8877dd818e (diff) | |
download | garage-f64ec6e542c73a4eaaf1962330c7bfe4d7c47461.tar.gz garage-f64ec6e542c73a4eaaf1962330c7bfe4d7c47461.zip |
Merge pull request 'implement STREAMING-*-PAYLOAD-TRAILER' (#960) from fix-824 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/960
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/common/Cargo.toml | 5 | ||||
-rw-r--r-- | src/api/common/cors.rs | 8 | ||||
-rw-r--r-- | src/api/common/signature/body.rs | 135 | ||||
-rw-r--r-- | src/api/common/signature/checksum.rs (renamed from src/api/s3/checksum.rs) | 239 | ||||
-rw-r--r-- | src/api/common/signature/error.rs | 4 | ||||
-rw-r--r-- | src/api/common/signature/mod.rs | 76 | ||||
-rw-r--r-- | src/api/common/signature/payload.rs | 103 | ||||
-rw-r--r-- | src/api/common/signature/streaming.rs | 555 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 4 | ||||
-rw-r--r-- | src/api/k2v/batch.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/error.rs | 7 | ||||
-rw-r--r-- | src/api/k2v/item.rs | 4 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 33 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 10 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 2 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 11 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 9 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 2 | ||||
-rw-r--r-- | src/api/s3/error.rs | 3 | ||||
-rw-r--r-- | src/api/s3/get.rs | 32 | ||||
-rw-r--r-- | src/api/s3/lib.rs | 1 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 10 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 151 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 7 | ||||
-rw-r--r-- | src/api/s3/put.rs | 70 | ||||
-rw-r--r-- | src/api/s3/website.rs | 10 |
26 files changed, 998 insertions, 501 deletions
diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml index 5b9cf479..c33d585d 100644 --- a/src/api/common/Cargo.toml +++ b/src/api/common/Cargo.toml @@ -18,16 +18,21 @@ garage_model.workspace = true garage_table.workspace = true garage_util.workspace = true +base64.workspace = true bytes.workspace = true chrono.workspace = true +crc32fast.workspace = true +crc32c.workspace = true crypto-common.workspace = true err-derive.workspace = true hex.workspace = true hmac.workspace = true +md-5.workspace = true idna.workspace = true tracing.workspace = true nom.workspace = true pin-project.workspace = true +sha1.workspace = true sha2.workspace = true futures.workspace = true diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs index 14369b56..09b55c13 100644 --- a/src/api/common/cors.rs +++ b/src/api/common/cors.rs @@ -14,9 +14,9 @@ use crate::common_error::{ }; use crate::helpers::*; -pub fn find_matching_cors_rule<'a>( +pub fn find_matching_cors_rule<'a, B>( bucket_params: &'a BucketParams, - req: &Request<impl Body>, + req: &Request<B>, ) -> Result<Option<&'a GarageCorsRule>, CommonError> { if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -132,8 +132,8 @@ pub async fn handle_options_api( } } -pub fn handle_options_for_bucket( - req: &Request<IncomingBody>, +pub fn handle_options_for_bucket<B>( + req: &Request<B>, bucket_params: &BucketParams, ) -> Result<Response<EmptyBody>, CommonError> { let origin = req diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs new file mode 100644 index 00000000..96be0d5b --- /dev/null +++ b/src/api/common/signature/body.rs @@ -0,0 +1,135 @@ +use std::sync::Mutex; + +use futures::prelude::*; +use futures::stream::BoxStream; +use http_body_util::{BodyExt, StreamBody}; +use hyper::body::{Bytes, Frame}; +use serde::Deserialize; +use tokio::sync::mpsc; +use tokio::task; + +use super::*; + +use crate::signature::checksum::*; + +pub struct ReqBody { + // why need mutex to be sync?? + pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, + pub(crate) checksummer: Checksummer, + pub(crate) expected_checksums: ExpectedChecksums, + pub(crate) trailer_algorithm: Option<ChecksumAlgorithm>, +} + +pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>; + +impl ReqBody { + pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) { + if more.md5.is_some() { + self.expected_checksums.md5 = more.md5; + } + if more.sha256.is_some() { + self.expected_checksums.sha256 = more.sha256; + } + if more.extra.is_some() { + self.expected_checksums.extra = more.extra; + } + self.checksummer.add_expected(&self.expected_checksums); + } + + pub fn add_md5(&mut self) { + self.checksummer.add_md5(); + } + + // ============ non-streaming ============= + + pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { + let body = self.collect().await?; + let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + Ok(resp) + } + + pub async fn collect(self) -> Result<Bytes, Error> { + self.collect_with_checksums().await.map(|(b, _)| b) + } + + pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> { + let stream: BoxStream<_> = self.stream.into_inner().unwrap(); + let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes(); + + self.checksummer.update(&bytes); + let checksums = self.checksummer.finalize(); + checksums.verify(&self.expected_checksums)?; + + Ok((bytes, checksums)) + } + + // ============ streaming ============= + + pub fn streaming_with_checksums( + self, + ) -> ( + BoxStream<'static, Result<Bytes, Error>>, + StreamingChecksumReceiver, + ) { + let Self { + stream, + mut checksummer, + mut expected_checksums, + trailer_algorithm, + } = self; + + let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(5); + + let join_checksums = tokio::spawn(async move { + while let Some(frame) = frame_rx.recv().await { + match frame.into_data() { + Ok(data) => { + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&data); + checksummer + }) + .await + .unwrap() + } + Err(frame) => { + let trailers = frame.into_trailers().unwrap(); + let algo = trailer_algorithm.unwrap(); + expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?); + break; + } + } + } + + if trailer_algorithm.is_some() && expected_checksums.extra.is_none() { + return Err(Error::bad_request("trailing checksum was not sent")); + } + + let checksums = checksummer.finalize(); + checksums.verify(&expected_checksums)?; + + Ok(checksums) + }); + + let stream: BoxStream<_> = stream.into_inner().unwrap(); + let stream = stream.filter_map(move |x| { + let frame_tx = frame_tx.clone(); + async move { + match x { + Err(e) => Some(Err(e)), + Ok(frame) => { + if frame.is_data() { + let data = frame.data_ref().unwrap().clone(); + let _ = frame_tx.send(frame).await; + Some(Ok(data)) + } else { + let _ = frame_tx.send(frame).await; + None + } + } + } + } + }); + + (stream.boxed(), join_checksums) + } +} diff --git a/src/api/s3/checksum.rs b/src/api/common/signature/checksum.rs index 02fb55ec..3c5e7c53 100644 --- a/src/api/s3/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -11,11 +11,12 @@ use sha2::Sha256; use http::{HeaderMap, HeaderName, HeaderValue}; use garage_util::data::*; -use garage_util::error::OkOrMessage; -use garage_model::s3::object_table::*; +use super::*; -use crate::error::*; +pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; + +pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-checksum-algorithm"); @@ -31,8 +32,8 @@ pub type Md5Checksum = [u8; 16]; pub type Sha1Checksum = [u8; 20]; pub type Sha256Checksum = [u8; 32]; -#[derive(Debug, Default)] -pub(crate) struct ExpectedChecksums { +#[derive(Debug, Default, Clone)] +pub struct ExpectedChecksums { // base64-encoded md5 (content-md5 header) pub md5: Option<String>, // content_sha256 (as a Hash / FixedBytes32) @@ -41,7 +42,7 @@ pub(crate) struct ExpectedChecksums { pub extra: Option<ChecksumValue>, } -pub(crate) struct Checksummer { +pub struct Checksummer { pub crc32: Option<Crc32>, pub crc32c: Option<Crc32c>, pub md5: Option<Md5>, @@ -50,7 +51,7 @@ pub(crate) struct Checksummer { } #[derive(Default)] -pub(crate) struct Checksums { +pub struct Checksums { pub crc32: Option<Crc32Checksum>, pub crc32c: Option<Crc32cChecksum>, pub md5: Option<Md5Checksum>, @@ -59,34 +60,48 @@ pub(crate) struct Checksums { } impl Checksummer { - pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { - let mut ret = Self { + pub fn new() -> Self { + Self { crc32: None, crc32c: None, md5: None, sha1: None, sha256: None, - }; + } + } + + pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self { + let mut ret = Self::new(); + ret.add_expected(expected); + if add_md5 { + ret.add_md5(); + } + ret + } + + pub fn add_md5(&mut self) { + self.md5 = Some(Md5::new()); + } - if expected.md5.is_some() || require_md5 { - ret.md5 = Some(Md5::new()); + pub fn add_expected(&mut self, expected: &ExpectedChecksums) { + if expected.md5.is_some() { + self.md5 = Some(Md5::new()); } if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { - ret.sha256 = Some(Sha256::new()); + self.sha256 = Some(Sha256::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { - ret.crc32 = Some(Crc32::new()); + self.crc32 = Some(Crc32::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { - ret.crc32c = Some(Crc32c::default()); + self.crc32c = Some(Crc32c::default()); } if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { - ret.sha1 = Some(Sha1::new()); + self.sha1 = Some(Sha1::new()); } - ret } - pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { + pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { match algo { Some(ChecksumAlgorithm::Crc32) => { self.crc32 = Some(Crc32::new()); @@ -105,7 +120,7 @@ impl Checksummer { self } - pub(crate) fn update(&mut self, bytes: &[u8]) { + pub fn update(&mut self, bytes: &[u8]) { if let Some(crc32) = &mut self.crc32 { crc32.update(bytes); } @@ -123,7 +138,7 @@ impl Checksummer { } } - pub(crate) fn finalize(self) -> Checksums { + pub fn finalize(self) -> Checksums { Checksums { crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())), crc32c: self @@ -183,153 +198,56 @@ impl Checksums { // ---- -#[derive(Default)] -pub(crate) struct MultipartChecksummer { - pub md5: Md5, - pub extra: Option<MultipartExtraChecksummer>, -} - -pub(crate) enum MultipartExtraChecksummer { - Crc32(Crc32), - Crc32c(Crc32c), - Sha1(Sha1), - Sha256(Sha256), -} - -impl MultipartChecksummer { - pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { - Self { - md5: Md5::new(), - extra: match algo { - None => None, - Some(ChecksumAlgorithm::Crc32) => { - Some(MultipartExtraChecksummer::Crc32(Crc32::new())) - } - Some(ChecksumAlgorithm::Crc32c) => { - Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) - } - Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), - Some(ChecksumAlgorithm::Sha256) => { - Some(MultipartExtraChecksummer::Sha256(Sha256::new())) - } - }, - } - } - - pub(crate) fn update( - &mut self, - etag: &str, - checksum: Option<ChecksumValue>, - ) -> Result<(), Error> { - self.md5 - .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); - match (&mut self.extra, checksum) { - (None, _) => (), - ( - Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), - Some(ChecksumValue::Crc32(x)), - ) => { - crc32.update(&x); - } - ( - Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), - Some(ChecksumValue::Crc32c(x)), - ) => { - crc32c.write(&x); - } - (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { - sha1.update(&x); - } - ( - Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), - Some(ChecksumValue::Sha256(x)), - ) => { - sha256.update(&x); - } - (Some(_), b) => { - return Err(Error::internal_error(format!( - "part checksum was not computed correctly, got: {:?}", - b - ))) - } - } - Ok(()) - } - - pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { - let md5 = self.md5.finalize()[..].try_into().unwrap(); - let extra = match self.extra { - None => None, - Some(MultipartExtraChecksummer::Crc32(crc32)) => { - Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) - } - Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( - u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), - )), - Some(MultipartExtraChecksummer::Sha1(sha1)) => { - Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) - } - Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( - sha256.finalize()[..].try_into().unwrap(), - )), - }; - (md5, extra) +pub fn parse_checksum_algorithm(algo: &str) -> Result<ChecksumAlgorithm, Error> { + match algo { + "CRC32" => Ok(ChecksumAlgorithm::Crc32), + "CRC32C" => Ok(ChecksumAlgorithm::Crc32c), + "SHA1" => Ok(ChecksumAlgorithm::Sha1), + "SHA256" => Ok(ChecksumAlgorithm::Sha256), + _ => Err(Error::bad_request("invalid checksum algorithm")), } } -// ---- - /// Extract the value of the x-amz-checksum-algorithm header -pub(crate) fn request_checksum_algorithm( +pub fn request_checksum_algorithm( headers: &HeaderMap<HeaderValue>, ) -> Result<Option<ChecksumAlgorithm>, Error> { match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { None => Ok(None), - Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), - Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), - Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), - Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), + Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some), + } +} + +pub fn request_trailer_checksum_algorithm( + headers: &HeaderMap<HeaderValue>, +) -> Result<Option<ChecksumAlgorithm>, Error> { + match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? { + None => Ok(None), + Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)), + Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)), + Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)), + Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)), _ => Err(Error::bad_request("invalid checksum algorithm")), } } /// Extract the value of any of the x-amz-checksum-* headers -pub(crate) fn request_checksum_value( +pub fn request_checksum_value( headers: &HeaderMap<HeaderValue>, ) -> Result<Option<ChecksumValue>, Error> { let mut ret = vec![]; - if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { - let crc32 = BASE64_STANDARD - .decode(&crc32_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - ret.push(ChecksumValue::Crc32(crc32)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?); } - if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { - let crc32c = BASE64_STANDARD - .decode(&crc32c_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - ret.push(ChecksumValue::Crc32c(crc32c)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?); } - if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { - let sha1 = BASE64_STANDARD - .decode(&sha1_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - ret.push(ChecksumValue::Sha1(sha1)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA1) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?); } - if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { - let sha256 = BASE64_STANDARD - .decode(&sha256_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - ret.push(ChecksumValue::Sha256(sha256)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA256) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?); } if ret.len() > 1 { @@ -342,48 +260,47 @@ pub(crate) fn request_checksum_value( /// Checks for the presence of x-amz-checksum-algorithm /// if so extract the corresponding x-amz-checksum-* value -pub(crate) fn request_checksum_algorithm_value( +pub fn extract_checksum_value( headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - Some(x) if x == "CRC32" => { + algo: ChecksumAlgorithm, +) -> Result<ChecksumValue, Error> { + match algo { + ChecksumAlgorithm::Crc32 => { let crc32 = headers .get(X_AMZ_CHECKSUM_CRC32) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - Ok(Some(ChecksumValue::Crc32(crc32))) + Ok(ChecksumValue::Crc32(crc32)) } - Some(x) if x == "CRC32C" => { + ChecksumAlgorithm::Crc32c => { let crc32c = headers .get(X_AMZ_CHECKSUM_CRC32C) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - Ok(Some(ChecksumValue::Crc32c(crc32c))) + Ok(ChecksumValue::Crc32c(crc32c)) } - Some(x) if x == "SHA1" => { + ChecksumAlgorithm::Sha1 => { let sha1 = headers .get(X_AMZ_CHECKSUM_SHA1) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - Ok(Some(ChecksumValue::Sha1(sha1))) + Ok(ChecksumValue::Sha1(sha1)) } - Some(x) if x == "SHA256" => { + ChecksumAlgorithm::Sha256 => { let sha256 = headers .get(X_AMZ_CHECKSUM_SHA256) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - Ok(Some(ChecksumValue::Sha256(sha256))) + Ok(ChecksumValue::Sha256(sha256)) } - Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), - None => Ok(None), } } -pub(crate) fn add_checksum_response_headers( +pub fn add_checksum_response_headers( checksum: &Option<ChecksumValue>, mut resp: http::response::Builder, ) -> http::response::Builder { diff --git a/src/api/common/signature/error.rs b/src/api/common/signature/error.rs index 2d92a072..b2f396b5 100644 --- a/src/api/common/signature/error.rs +++ b/src/api/common/signature/error.rs @@ -18,6 +18,10 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUtf8Str(#[error(source)] std::str::Utf8Error), + + /// The provided digest (checksum) value was invalid + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), } impl<T> From<T> for Error diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 6514da43..50fbd304 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use hmac::{Hmac, Mac}; use sha2::Sha256; +use hyper::header::HeaderName; use hyper::{body::Incoming as IncomingBody, Request}; use garage_model::garage::Garage; @@ -10,6 +11,8 @@ use garage_util::data::{sha256sum, Hash}; use error::*; +pub mod body; +pub mod checksum; pub mod error; pub mod payload; pub mod streaming; @@ -17,36 +20,73 @@ pub mod streaming; pub const SHORT_DATE: &str = "%Y%m%d"; pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; +// ---- Constants used in AWSv4 signatures ---- + +pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm"); +pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential"); +pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date"); +pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires"); +pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders"); +pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature"); +pub const X_AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); +pub const X_AMZ_TRAILER: HeaderName = HeaderName::from_static("x-amz-trailer"); + +/// Result of `sha256("")` +pub(crate) const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +// Signature calculation algorithm +pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256"; type HmacSha256 = Hmac<Sha256>; +// Possible values for x-amz-content-sha256, in addition to the actual sha256 +pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; +pub const STREAMING_UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"; +pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; + +// Used in the computation of StringToSign +pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; + +// ---- enums to describe stuff going on in signature calculation ---- + +#[derive(Debug)] +pub enum ContentSha256Header { + UnsignedPayload, + Sha256Checksum(Hash), + StreamingPayload { trailer: bool, signed: bool }, +} + +// ---- top-level functions ---- + +pub struct VerifiedRequest { + pub request: Request<streaming::ReqBody>, + pub access_key: Key, + pub content_sha256_header: ContentSha256Header, +} + pub async fn verify_request( garage: &Garage, mut req: Request<IncomingBody>, service: &'static str, -) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> { - let (api_key, mut content_sha256) = - payload::check_payload_signature(&garage, &mut req, service).await?; - let api_key = - api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; - - let req = streaming::parse_streaming_body( - &api_key, +) -> Result<VerifiedRequest, Error> { + let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?; + + let request = streaming::parse_streaming_body( req, - &mut content_sha256, + &checked_signature, &garage.config.s3_api.s3_region, service, )?; - Ok((req, api_key, content_sha256)) -} + let access_key = checked_signature + .key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; -pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { - if expected_sha256 != sha256sum(body) { - return Err(Error::bad_request( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) + Ok(VerifiedRequest { + request, + access_key, + content_sha256_header: checked_signature.content_sha256_header, + }) } pub fn signing_hmac( diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index 81541e4a..2d5f8603 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -13,23 +13,9 @@ use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; -use super::LONG_DATETIME; -use super::{compute_scope, signing_hmac}; +use super::*; use crate::encoding::uri_encode; -use crate::signature::error::*; - -pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm"); -pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential"); -pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date"); -pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires"); -pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders"); -pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature"); -pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256"); - -pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256"; -pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; -pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; pub type QueryMap = HeaderMap<QueryValue>; pub struct QueryValue { @@ -39,11 +25,18 @@ pub struct QueryValue { value: String, } +#[derive(Debug)] +pub struct CheckedSignature { + pub key: Option<Key>, + pub content_sha256_header: ContentSha256Header, + pub signature_header: Option<String>, +} + pub async fn check_payload_signature( garage: &Garage, request: &mut Request<IncomingBody>, service: &'static str, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let query = parse_query_map(request.uri())?; if query.contains_key(&X_AMZ_ALGORITHM) { @@ -57,17 +50,46 @@ pub async fn check_payload_signature( // Unsigned (anonymous) request let content_sha256 = request .headers() - .get("x-amz-content-sha256") - .filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes()); - if let Some(content_sha256) = content_sha256 { - let sha256 = hex::decode(content_sha256) - .ok() - .and_then(|bytes| Hash::try_from(&bytes)) - .ok_or_bad_request("Invalid content sha256 hash")?; - Ok((None, Some(sha256))) + .get(X_AMZ_CONTENT_SHA256) + .map(|x| x.to_str()) + .transpose()?; + Ok(CheckedSignature { + key: None, + content_sha256_header: parse_x_amz_content_sha256(content_sha256)?, + signature_header: None, + }) + } +} + +fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Header, Error> { + let header = match header { + Some(x) => x, + None => return Ok(ContentSha256Header::UnsignedPayload), + }; + if header == UNSIGNED_PAYLOAD { + Ok(ContentSha256Header::UnsignedPayload) + } else if let Some(rest) = header.strip_prefix("STREAMING-") { + let (trailer, algo) = if let Some(rest2) = rest.strip_suffix("-TRAILER") { + (true, rest2) } else { - Ok((None, None)) - } + (false, rest) + }; + let signed = match algo { + AWS4_HMAC_SHA256_PAYLOAD => true, + UNSIGNED_PAYLOAD => false, + _ => { + return Err(Error::bad_request( + "invalid or unsupported x-amz-content-sha256", + )) + } + }; + Ok(ContentSha256Header::StreamingPayload { trailer, signed }) + } else { + let sha256 = hex::decode(header) + .ok() + .and_then(|bytes| Hash::try_from(&bytes)) + .ok_or_bad_request("Invalid content sha256 hash")?; + Ok(ContentSha256Header::Sha256Checksum(sha256)) } } @@ -76,7 +98,7 @@ async fn check_standard_signature( service: &'static str, request: &Request<IncomingBody>, query: QueryMap, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let authorization = Authorization::parse_header(request.headers())?; // Verify that all necessary request headers are included in signed_headers @@ -108,18 +130,13 @@ async fn check_standard_signature( let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; - let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD { - None - } else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD { - let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; - Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) - } else { - let bytes = hex::decode(authorization.content_sha256) - .ok_or_bad_request("Invalid content sha256 hash")?; - Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) - }; + let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?; - Ok((Some(key), content_sha256)) + Ok(CheckedSignature { + key: Some(key), + content_sha256_header, + signature_header: Some(authorization.signature), + }) } async fn check_presigned_signature( @@ -127,7 +144,7 @@ async fn check_presigned_signature( service: &'static str, request: &mut Request<IncomingBody>, mut query: QueryMap, -) -> Result<(Option<Key>, Option<Hash>), Error> { +) -> Result<CheckedSignature, Error> { let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap(); let authorization = Authorization::parse_presigned(&algorithm.value, &query)?; @@ -193,7 +210,11 @@ async fn check_presigned_signature( // Presigned URLs always use UNSIGNED-PAYLOAD, // so there is no sha256 hash to return. - Ok((Some(key), None)) + Ok(CheckedSignature { + key: Some(key), + content_sha256_header: ContentSha256Header::UnsignedPayload, + signature_header: Some(authorization.signature), + }) } pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> { @@ -442,7 +463,7 @@ impl Authorization { .to_string(); let content_sha256 = headers - .get(X_AMZ_CONTENT_SH256) + .get(X_AMZ_CONTENT_SHA256) .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; let date = headers diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index e223d1b1..64362727 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -1,84 +1,157 @@ use std::pin::Pin; +use std::sync::Mutex; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; -use garage_model::key_table::Key; use hmac::Mac; -use http_body_util::StreamBody; -use hyper::body::{Bytes, Incoming as IncomingBody}; +use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING}; +use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; use hyper::Request; use garage_util::data::Hash; -use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; +use super::*; -use crate::helpers::*; -use crate::signature::error::*; -use crate::signature::payload::{ - STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE, -}; +use crate::helpers::body_stream; +use crate::signature::checksum::*; +use crate::signature::payload::CheckedSignature; -pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD"; - -pub type ReqBody = BoxBody<Error>; +pub use crate::signature::body::ReqBody; pub fn parse_streaming_body( - api_key: &Key, - req: Request<IncomingBody>, - content_sha256: &mut Option<Hash>, + mut req: Request<IncomingBody>, + checked_signature: &CheckedSignature, region: &str, service: &str, ) -> Result<Request<ReqBody>, Error> { - match req.headers().get(X_AMZ_CONTENT_SH256) { - Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => { - let signature = content_sha256 - .take() - .ok_or_bad_request("No signature provided")?; - - let secret_key = &api_key - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key; - - let date = req - .headers() - .get(X_AMZ_DATE) - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime<Utc> = Utc.from_utc_datetime(&date); - - let scope = compute_scope(&date, region, service); - let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) - .ok_or_internal_error("Unable to build signing HMAC")?; + debug!( + "Content signature mode: {:?}", + checked_signature.content_sha256_header + ); + + match checked_signature.content_sha256_header { + ContentSha256Header::StreamingPayload { signed, trailer } => { + // Sanity checks + if !signed && !trailer { + return Err(Error::bad_request( + "STREAMING-UNSIGNED-PAYLOAD without trailer is not a valid combination", + )); + } + + // Remove the aws-chunked component in the content-encoding: header + // Note: this header is not properly sent by minio client, so don't fail + // if it is absent from the request. + if let Some(content_encoding) = req.headers_mut().remove(CONTENT_ENCODING) { + if let Some(rest) = content_encoding.as_bytes().strip_prefix(b"aws-chunked,") { + req.headers_mut() + .insert(CONTENT_ENCODING, HeaderValue::from_bytes(rest).unwrap()); + } else if content_encoding != "aws-chunked" { + return Err(Error::bad_request( + "content-encoding does not contain aws-chunked for STREAMING-*-PAYLOAD", + )); + } + } + + // If trailer header is announced, add the calculation of the requested checksum + let mut checksummer = Checksummer::init(&Default::default(), false); + let trailer_algorithm = if trailer { + let algo = Some( + request_trailer_checksum_algorithm(req.headers())? + .ok_or_bad_request("Missing x-amz-trailer header")?, + ); + checksummer = checksummer.add(algo); + algo + } else { + None + }; + + // For signed variants, determine signing parameters + let sign_params = if signed { + let signature = checked_signature + .signature_header + .clone() + .ok_or_bad_request("No signature provided")?; + let signature = hex::decode(signature) + .ok() + .and_then(|bytes| Hash::try_from(&bytes)) + .ok_or_bad_request("Invalid signature")?; + + let secret_key = checked_signature + .key + .as_ref() + .ok_or_bad_request("Cannot sign streaming payload without signing key")? + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key + .to_string(); + + let date = req + .headers() + .get(X_AMZ_DATE) + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = Utc.from_utc_datetime(&date); + + let scope = compute_scope(&date, region, service); + let signing_hmac = + crate::signature::signing_hmac(&date, &secret_key, region, service) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Some(SignParams { + datetime: date, + scope, + signing_hmac, + previous_signature: signature, + }) + } else { + None + }; Ok(req.map(move |body| { let stream = body_stream::<_, Error>(body); + let signed_payload_stream = - SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) - .map(|x| x.map(hyper::body::Frame::data)) - .map_err(Error::from); - ReqBody::new(StreamBody::new(signed_payload_stream)) + StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from); + ReqBody { + stream: Mutex::new(signed_payload_stream.boxed()), + checksummer, + expected_checksums: Default::default(), + trailer_algorithm, + } })) } - _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), + _ => Ok(req.map(|body| { + let expected_checksums = ExpectedChecksums { + sha256: match &checked_signature.content_sha256_header { + ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), + _ => None, + }, + ..Default::default() + }; + let checksummer = Checksummer::init(&expected_checksums, false); + + let stream = http_body_util::BodyStream::new(body).map_err(Error::from); + ReqBody { + stream: Mutex::new(stream.boxed()), + checksummer, + expected_checksums, + trailer_algorithm: None, + } + })), } } -/// Result of `sha256("")` -const EMPTY_STRING_HEX_DIGEST: &str = - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - fn compute_streaming_payload_signature( signing_hmac: &HmacSha256, date: DateTime<Utc>, scope: &str, previous_signature: Hash, content_sha256: Hash, -) -> Result<Hash, Error> { +) -> Result<Hash, StreamingPayloadError> { let string_to_sign = [ AWS4_HMAC_SHA256_PAYLOAD, &date.format(LONG_DATETIME).to_string(), @@ -92,12 +165,49 @@ fn compute_streaming_payload_signature( let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) +} + +fn compute_streaming_trailer_signature( + signing_hmac: &HmacSha256, + date: DateTime<Utc>, + scope: &str, + previous_signature: Hash, + trailer_sha256: Hash, +) -> Result<Hash, StreamingPayloadError> { + let string_to_sign = [ + AWS4_HMAC_SHA256_PAYLOAD, + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + &hex::encode(trailer_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) } mod payload { + use http::{HeaderName, HeaderValue}; + use garage_util::data::Hash; + use nom::bytes::streaming::{tag, take_while}; + use nom::character::streaming::hex_digit1; + use nom::combinator::{map_res, opt}; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + pub enum Error<I> { Parser(nom::error::Error<I>), BadSignature, @@ -113,24 +223,13 @@ mod payload { } #[derive(Debug, Clone)] - pub struct Header { + pub struct ChunkHeader { pub size: usize, - pub signature: Hash, + pub signature: Option<Hash>, } - impl Header { - pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { - use nom::bytes::streaming::tag; - use nom::character::streaming::hex_digit1; - use nom::combinator::map_res; - use nom::number::streaming::hex_u32; - - macro_rules! try_parse { - ($expr:expr) => { - $expr.map_err(|e| e.map(Error::Parser))? - }; - } - + impl ChunkHeader { + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, size) = try_parse!(hex_u32(input)); let (input, _) = try_parse!(tag(";")(input)); @@ -140,96 +239,172 @@ mod payload { let (input, _) = try_parse!(tag("\r\n")(input)); - let header = Header { + let header = ChunkHeader { + size: size as usize, + signature: Some(signature), + }; + + Ok((input, header)) + } + + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = ChunkHeader { size: size as usize, - signature, + signature: None, }; Ok((input, header)) } } + + #[derive(Debug, Clone)] + pub struct TrailerChunk { + pub header_name: HeaderName, + pub header_value: HeaderValue, + pub signature: Option<Hash>, + } + + impl TrailerChunk { + fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, header_name) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'), + HeaderName::from_bytes + )(input)); + let (input, _) = try_parse!(tag(b":")(input)); + let (input, header_value) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)), + HeaderValue::from_bytes + )(input)); + + // Possible '\n' after the header value, depends on clients + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + let (input, _) = try_parse!(opt(tag(b"\n"))(input)); + + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok(( + input, + TrailerChunk { + header_name, + header_value, + signature: None, + }, + )) + } + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + + let (input, _) = try_parse!(tag(b"x-amz-trailer-signature:")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok(( + input, + TrailerChunk { + signature: Some(signature), + ..trailer + }, + )) + } + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok((input, trailer)) + } + } } #[derive(Debug)] -pub enum SignedPayloadStreamError { +pub enum StreamingPayloadError { Stream(Error), InvalidSignature, Message(String), } -impl SignedPayloadStreamError { +impl StreamingPayloadError { fn message(msg: &str) -> Self { - SignedPayloadStreamError::Message(msg.into()) + StreamingPayloadError::Message(msg.into()) } } -impl From<SignedPayloadStreamError> for Error { - fn from(err: SignedPayloadStreamError) -> Self { +impl From<StreamingPayloadError> for Error { + fn from(err: StreamingPayloadError) -> Self { match err { - SignedPayloadStreamError::Stream(e) => e, - SignedPayloadStreamError::InvalidSignature => { + StreamingPayloadError::Stream(e) => e, + StreamingPayloadError::InvalidSignature => { Error::bad_request("Invalid payload signature") } - SignedPayloadStreamError::Message(e) => { + StreamingPayloadError::Message(e) => { Error::bad_request(format!("Chunk format error: {}", e)) } } } } -impl<I> From<payload::Error<I>> for SignedPayloadStreamError { +impl<I> From<payload::Error<I>> for StreamingPayloadError { fn from(err: payload::Error<I>) -> Self { Self::message(err.description()) } } -impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError { +impl<I> From<nom::error::Error<I>> for StreamingPayloadError { fn from(err: nom::error::Error<I>) -> Self { Self::message(err.code.description()) } } -struct SignedPayload { - header: payload::Header, - data: Bytes, +enum StreamingPayloadChunk { + Chunk { + header: payload::ChunkHeader, + data: Bytes, + }, + Trailer(payload::TrailerChunk), +} + +struct SignParams { + datetime: DateTime<Utc>, + scope: String, + signing_hmac: HmacSha256, + previous_signature: Hash, } #[pin_project::pin_project] -pub struct SignedPayloadStream<S> +pub struct StreamingPayloadStream<S> where S: Stream<Item = Result<Bytes, Error>>, { #[pin] stream: S, buf: bytes::BytesMut, - datetime: DateTime<Utc>, - scope: String, - signing_hmac: HmacSha256, - previous_signature: Hash, + signing: Option<SignParams>, + has_trailer: bool, + done: bool, } -impl<S> SignedPayloadStream<S> +impl<S> StreamingPayloadStream<S> where S: Stream<Item = Result<Bytes, Error>>, { - pub fn new( - stream: S, - signing_hmac: HmacSha256, - datetime: DateTime<Utc>, - scope: &str, - seed_signature: Hash, - ) -> Self { + fn new(stream: S, signing: Option<SignParams>, has_trailer: bool) -> Self { Self { stream, buf: bytes::BytesMut::new(), - datetime, - scope: scope.into(), - signing_hmac, - previous_signature: seed_signature, + signing, + has_trailer, + done: false, } } - fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + fn parse_next( + input: &[u8], + is_signed: bool, + has_trailer: bool, + ) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> { use nom::bytes::streaming::{tag, take}; macro_rules! try_parse { @@ -238,17 +413,30 @@ where }; } - let (input, header) = try_parse!(payload::Header::parse(input)); + let (input, header) = if is_signed { + try_parse!(payload::ChunkHeader::parse_signed(input)) + } else { + try_parse!(payload::ChunkHeader::parse_unsigned(input)) + }; // 0-sized chunk is the last if header.size == 0 { - return Ok(( - input, - SignedPayload { - header, - data: Bytes::new(), - }, - )); + if has_trailer { + let (input, trailer) = if is_signed { + try_parse!(payload::TrailerChunk::parse_signed(input)) + } else { + try_parse!(payload::TrailerChunk::parse_unsigned(input)) + }; + return Ok((input, StreamingPayloadChunk::Trailer(trailer))); + } else { + return Ok(( + input, + StreamingPayloadChunk::Chunk { + header, + data: Bytes::new(), + }, + )); + } } let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); @@ -256,15 +444,15 @@ where let data = Bytes::from(data.to_vec()); - Ok((input, SignedPayload { header, data })) + Ok((input, StreamingPayloadChunk::Chunk { header, data })) } } -impl<S> Stream for SignedPayloadStream<S> +impl<S> Stream for StreamingPayloadStream<S> where S: Stream<Item = Result<Bytes, Error>> + Unpin, { - type Item = Result<Bytes, SignedPayloadStreamError>; + type Item = Result<Frame<Bytes>, StreamingPayloadError>; fn poll_next( self: Pin<&mut Self>, @@ -274,56 +462,105 @@ where let mut this = self.project(); + if *this.done { + return Poll::Ready(None); + } + loop { - let (input, payload) = match Self::parse_next(this.buf) { - Ok(res) => res, - Err(nom::Err::Incomplete(_)) => { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - this.buf.extend(bytes); - continue; - } - Some(Err(e)) => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + let (input, payload) = + match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(StreamingPayloadError::message( + "Unexpected EOF", + )))); + } } - None => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( - "Unexpected EOF", - )))); + } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + match payload { + StreamingPayloadChunk::Chunk { data, header } => { + if let Some(signing) = this.signing.as_mut() { + let data_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_payload_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + data_sha256sum, + )?; + + if header.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); } + + signing.previous_signature = header.signature.unwrap(); } - } - Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { - return Poll::Ready(Some(Err(e))) - } - }; - // 0-sized chunk is the last - if payload.data.is_empty() { - return Poll::Ready(None); - } + *this.buf = input.into(); - let data_sha256sum = sha256sum(&payload.data); - - let expected_signature = compute_streaming_payload_signature( - this.signing_hmac, - *this.datetime, - this.scope, - *this.previous_signature, - data_sha256sum, - ) - .map_err(|e| { - SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) - })?; - - if payload.header.signature != expected_signature { - return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); - } + // 0-sized chunk is the last + if data.is_empty() { + // if there was a trailer, it would have been returned by the parser + assert!(!*this.has_trailer); + *this.done = true; + return Poll::Ready(None); + } + + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + StreamingPayloadChunk::Trailer(trailer) => { + trace!( + "In StreamingPayloadStream::poll_next: got trailer {:?}", + trailer + ); + + if let Some(signing) = this.signing.as_mut() { + let data = [ + trailer.header_name.as_ref(), + &b":"[..], + trailer.header_value.as_ref(), + &b"\n"[..], + ] + .concat(); + let trailer_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_trailer_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + trailer_sha256sum, + )?; + + if trailer.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); + } + } - *this.buf = input.into(); - *this.previous_signature = payload.header.signature; + *this.buf = input.into(); + *this.done = true; - return Poll::Ready(Some(Ok(payload.data))); + let mut trailers_map = HeaderMap::new(); + trailers_map.insert(trailer.header_name, trailer.header_value); + + return Poll::Ready(Some(Ok(Frame::trailers(trailers_map)))); + } + } } } @@ -336,7 +573,7 @@ where mod tests { use futures::prelude::*; - use super::{SignedPayloadStream, SignedPayloadStreamError}; + use super::{SignParams, StreamingPayloadError, StreamingPayloadStream}; #[tokio::test] async fn test_interrupted_signed_payload_stream() { @@ -358,12 +595,20 @@ mod tests { let seed_signature = Hash::default(); - let mut stream = - SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature); + let mut stream = StreamingPayloadStream::new( + body, + Some(SignParams { + signing_hmac, + datetime, + scope, + previous_signature: seed_signature, + }), + false, + ); assert!(stream.try_next().await.is_err()); match stream.try_next().await { - Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {} item => panic!( "Unexpected result, expected early EOF error, got {:?}", item diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb276f5b..de5775da 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -81,7 +81,9 @@ impl ApiHandler for K2VApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?; + let verified_request = verify_request(&garage, req, "k2v").await?; + let req = verified_request.request; + let api_key = verified_request.access_key; let bucket_id = garage .bucket_helper() diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index c284dbd4..7a03d836 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -20,7 +20,7 @@ pub async fn handle_insert_batch( let ReqCtx { garage, bucket_id, .. } = &ctx; - let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?; + let items = req.into_body().json::<Vec<InsertBatchItem>>().await?; let mut items2 = vec![]; for it in items { @@ -47,7 +47,7 @@ pub async fn handle_read_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -141,7 +141,7 @@ pub async fn handle_delete_batch( ctx: ReqCtx, req: Request<ReqBody>, ) -> Result<Response<ResBody>, Error> { - let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?; + let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?; let resp_results = futures::future::join_all( queries @@ -262,7 +262,7 @@ pub(crate) async fn handle_poll_range( } = ctx; use garage_model::k2v::sub::PollRange; - let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?; + let query = req.into_body().json::<PollRangeQuery>().await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 3cd0e6f7..257ff893 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -23,6 +23,10 @@ pub enum Error { #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), + /// The provided digest (checksum) value was invalid + #[error(display = "Invalid digest: {}", _0)] + InvalidDigest(String), + /// The object requested don't exists #[error(display = "Key not found")] NoSuchKey, @@ -54,6 +58,7 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidDigest(d) => Self::InvalidDigest(d), } } } @@ -71,6 +76,7 @@ impl Error { Error::InvalidBase64(_) => "InvalidBase64", Error::InvalidUtf8Str(_) => "InvalidUtf8String", Error::InvalidCausalityToken => "CausalityToken", + Error::InvalidDigest(_) => "InvalidDigest", } } } @@ -85,6 +91,7 @@ impl ApiError for Error { Error::AuthorizationHeaderMalformed(_) | Error::InvalidBase64(_) | Error::InvalidUtf8Str(_) + | Error::InvalidDigest(_) | Error::InvalidCausalityToken => StatusCode::BAD_REQUEST, } } diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 4e28b499..0fb945d2 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -144,9 +144,7 @@ pub async fn handle_insert_item( .map(parse_causality_token) .transpose()?; - let body = http_body_util::BodyExt::collect(req.into_body()) - .await? - .to_bytes(); + let body = req.into_body().collect().await?; let value = DvvsValue::Value(body.to_vec()); diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 14fd03c3..e26c2b65 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -121,7 +121,9 @@ impl ApiHandler for S3ApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?; + let verified_request = verify_request(&garage, req, "s3").await?; + let req = verified_request.request; + let api_key = verified_request.access_key; let bucket_name = match bucket_name { None => { @@ -134,14 +136,7 @@ impl ApiHandler for S3ApiServer { // Special code path for CreateBucket API endpoint if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket( - &garage, - req, - content_sha256, - &api_key.key_id, - bucket_name, - ) - .await; + return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } let bucket_id = garage @@ -179,7 +174,7 @@ impl ApiHandler for S3ApiServer { let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(ctx, &req, &key, part_number).await, + } => handle_head(ctx, &req.map(|_| ()), &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -199,20 +194,20 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(ctx, &req, &key, part_number, overrides).await + handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + } => handle_put_part(ctx, req, &key, part_number, &upload_id).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, - Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key).await, Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(ctx, &key, &upload_id).await } @@ -221,7 +216,7 @@ impl ApiHandler for S3ApiServer { handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await + handle_complete_multipart_upload(ctx, req, &key, &upload_id).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { @@ -324,17 +319,15 @@ impl ApiHandler for S3ApiServer { }; handle_list_parts(ctx, req, &query).await } - Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, - Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await, Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, - Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await, Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, - Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(ctx, req, content_sha256).await - } + Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await, Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 0a192ba6..3a09e769 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; @@ -10,12 +9,10 @@ use garage_model::key_table::Key; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; -use garage_util::data::*; use garage_util::time::*; use garage_api_common::common_error::CommonError; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -122,15 +119,10 @@ pub async fn handle_list_buckets( pub async fn handle_create_bucket( garage: &Garage, req: Request<ReqBody>, - content_sha256: Option<Hash>, api_key_id: &String, bucket_name: String, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 07d50ea5..9ae48807 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -21,9 +21,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::get::full_object_byte_stream; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 625b84db..fcfdb934 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -2,15 +2,11 @@ use quick_xml::de::from_reader; use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; -use http_body_util::BodyExt; - use serde::{Deserialize, Serialize}; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -59,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> pub async fn handle_put_cors( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -68,11 +63,7 @@ pub async fn handle_put_cors( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index b799e67a..d785b9d8 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,4 +1,3 @@ -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -6,7 +5,6 @@ use garage_util::data::*; use garage_model::s3::object_table::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -68,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, pub async fn handle_delete_objects( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index b38d7792..fa7285ca 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -29,8 +29,8 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; use garage_api_common::common_error::*; +use garage_api_common::signature::checksum::Md5Checksum; -use crate::checksum::Md5Checksum; use crate::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 1bb8909c..6d4b7a11 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -80,7 +80,7 @@ pub enum Error { #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)] InvalidEncryptionAlgorithm(String), - /// The client sent invalid XML data + /// The provided digest (checksum) value was invalid #[error(display = "Invalid digest: {}", _0)] InvalidDigest(String), @@ -119,6 +119,7 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidDigest(d) => Self::InvalidDigest(d), } } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index c2393a51..bcb72cc3 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -12,7 +12,7 @@ use http::header::{ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{body::Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; @@ -26,9 +26,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; use crate::api_server::ResBody; -use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; use crate::encryption::EncryptionParams; use crate::error::*; @@ -118,7 +118,7 @@ fn getobject_override_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<impl Body>, + req: &Request<()>, ) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes @@ -157,7 +157,7 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, ) -> Result<Response<ResBody>, Error> { @@ -167,7 +167,7 @@ pub async fn handle_head( /// Handle HEAD request for website pub async fn handle_head_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -278,7 +278,7 @@ pub async fn handle_head_without_ctx( /// Handle GET request pub async fn handle_get( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, overrides: GetObjectOverrides, @@ -289,7 +289,7 @@ pub async fn handle_get( /// Handle GET request pub async fn handle_get_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -340,7 +340,12 @@ pub async fn handle_get_without_ctx( enc, &headers, pn, - checksum_mode, + ChecksumMode { + // TODO: for multipart uploads, checksums of each part should be stored + // so that we can return the corresponding checksum here + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -354,7 +359,12 @@ pub async fn handle_get_without_ctx( &headers, range.start, range.start + range.length, - checksum_mode, + ChecksumMode { + // TODO: for range queries that align with part boundaries, + // we should return the saved checksum of the part + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -577,7 +587,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<impl Body>, + req: &Request<()>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -618,7 +628,7 @@ struct ChecksumMode { enabled: bool, } -fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { +fn checksum_mode(req: &Request<()>) -> ChecksumMode { ChecksumMode { enabled: req .headers() diff --git a/src/api/s3/lib.rs b/src/api/s3/lib.rs index fd99b443..4d1d3ef5 100644 --- a/src/api/s3/lib.rs +++ b/src/api/s3/lib.rs @@ -16,7 +16,6 @@ mod post_object; mod put; mod website; -mod checksum; mod encryption; mod router; pub mod xml; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index c35047ed..c140494e 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,12 +1,10 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -16,7 +14,6 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_util::data::*; pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -56,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E pub async fn handle_put_lifecycle( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -65,11 +61,7 @@ pub async fn handle_put_lifecycle( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index fa053df2..1ee04bc1 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,13 +1,20 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; use std::sync::Arc; use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; use futures::prelude::*; use hyper::{Request, Response}; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; use garage_table::*; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; @@ -16,10 +23,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::put::*; @@ -94,7 +100,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = &ctx; @@ -105,17 +110,30 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; - // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); - let stream = body_stream(req_body); + let (req_head, mut req_body) = req.into_parts(); + + // Before we stream the body, configure the needed checksums. + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as an etag of the + // part, which is in turn used in the etag computation of the whole object + req_body.add_md5(); + } + + let (stream, stream_checksums) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); + // Read first chuck, and at the same time try to get object to see if it exists let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; @@ -172,21 +190,21 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; - // Verify that checksums map - checksums.verify(&expected_checksums)?; + // Verify that checksums match + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); @@ -248,7 +266,6 @@ pub async fn handle_complete_multipart_upload( req: Request<ReqBody>, key: &str, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -260,11 +277,7 @@ pub async fn handle_complete_multipart_upload( let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req_body.collect().await?; let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) @@ -602,3 +615,99 @@ fn parse_complete_multipart_upload_body( Some(parts) } + +// ====== checksummer ==== + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 6c0e73d4..350684da 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -18,10 +18,10 @@ use garage_model::s3::object_table::*; use garage_api_common::cors::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; use garage_api_common::signature::payload::{verify_v4, Authorization}; use crate::api_server::ResBody; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::put::{get_headers, save_stream, ChecksumMode}; @@ -218,6 +218,7 @@ pub async fn handle_post_object( // around here to make sure the rest of the machinery takes our acl into account. let headers = get_headers(¶ms)?; + let checksum_algorithm = request_checksum_algorithm(¶ms)?; let expected_checksums = ExpectedChecksums { md5: params .get("content-md5") @@ -225,7 +226,9 @@ pub async fn handle_post_object( .transpose()? .map(str::to_string), sha256: None, - extra: request_checksum_algorithm_value(¶ms)?, + extra: checksum_algorithm + .map(|algo| extract_checksum_value(¶ms, algo)) + .transpose()?, }; let meta = ObjectVersionMetaInner { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 530b4e7b..4d866a06 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -31,9 +31,10 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; @@ -48,6 +49,10 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom { + checksummer: StreamingChecksumReceiver, + trailer_algo: Option<ChecksumAlgorithm>, + }, Calculate(Option<ChecksumAlgorithm>), } @@ -55,7 +60,6 @@ pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, key: &String, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; @@ -66,9 +70,10 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; + let trailer_checksum_algorithm = request_trailer_checksum_algorithm(req.headers())?; let meta = ObjectVersionMetaInner { headers, @@ -78,7 +83,19 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let stream = body_stream(req.into_body()); + // The request body is a special ReqBody object (see garage_api_common::signature::body) + // which supports calculating checksums while streaming the data. + // Before we start streaming, we configure it to calculate all the checksums we need. + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as the object etag + req_body.add_md5(); + } + + let (stream, checksummer) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); let res = save_stream( &ctx, @@ -86,7 +103,10 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo: trailer_checksum_algorithm, + }, ) .await?; @@ -122,10 +142,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom { .. } => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -133,7 +158,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -142,6 +167,18 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; let size = first_block.len() as u64; @@ -213,13 +250,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -232,6 +269,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; // Verify quotas are respsected @@ -332,7 +380,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker<S>, + mut chunker: StreamChunker<S>, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index b55bb345..7553bef7 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,14 +1,11 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_model::bucket_table::*; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -61,7 +58,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err pub async fn handle_put_website( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -70,11 +66,7 @@ pub async fn handle_put_website( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; |