aboutsummaryrefslogtreecommitdiff
path: root/src/api/common
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 18:47:06 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 18:47:06 +0100
commitc5df820e2c2b4bff5e239b8e99f07178b98b3f5a (patch)
tree26fa3dd297ee1c8bb55f5f7573a5c3396b030507 /src/api/common
parenta04d6cd5b8a3acffb8daeee00aed744fb1a78ea3 (diff)
downloadgarage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.tar.gz
garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.zip
api: start refactor of signature to calculate checksums earlier
Diffstat (limited to 'src/api/common')
-rw-r--r--src/api/common/cors.rs8
-rw-r--r--src/api/common/signature/body.rs69
-rw-r--r--src/api/common/signature/checksum.rs135
-rw-r--r--src/api/common/signature/mod.rs12
-rw-r--r--src/api/common/signature/payload.rs2
-rw-r--r--src/api/common/signature/streaming.rs52
6 files changed, 247 insertions, 31 deletions
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs
index 14369b56..09b55c13 100644
--- a/src/api/common/cors.rs
+++ b/src/api/common/cors.rs
@@ -14,9 +14,9 @@ use crate::common_error::{
};
use crate::helpers::*;
-pub fn find_matching_cors_rule<'a>(
+pub fn find_matching_cors_rule<'a, B>(
bucket_params: &'a BucketParams,
- req: &Request<impl Body>,
+ req: &Request<B>,
) -> Result<Option<&'a GarageCorsRule>, CommonError> {
if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@@ -132,8 +132,8 @@ pub async fn handle_options_api(
}
}
-pub fn handle_options_for_bucket(
- req: &Request<IncomingBody>,
+pub fn handle_options_for_bucket<B>(
+ req: &Request<B>,
bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs
new file mode 100644
index 00000000..877d8d85
--- /dev/null
+++ b/src/api/common/signature/body.rs
@@ -0,0 +1,69 @@
+use std::sync::Mutex;
+
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use http_body_util::{BodyExt, StreamBody};
+use hyper::body::{Bytes, Frame};
+use serde::Deserialize;
+use tokio::sync::{mpsc, oneshot};
+
+use super::*;
+
+use crate::signature::checksum::*;
+
+pub struct ReqBody {
+ // why need mutex to be sync??
+ pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
+ pub checksummer: Checksummer,
+ pub expected_checksums: ExpectedChecksums,
+}
+
+pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
+
+impl ReqBody {
+ pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
+ let body = self.collect().await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+ }
+
+ pub async fn collect(self) -> Result<Bytes, Error> {
+ self.collect_with_checksums().await.map(|(b, _)| b)
+ }
+
+ pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
+
+ self.checksummer.update(&bytes);
+ let checksums = self.checksummer.finalize();
+ checksums.verify(&self.expected_checksums)?;
+
+ Ok((bytes, checksums))
+ }
+
+ pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
+ self.streaming_with_checksums(false).0
+ }
+
+ pub fn streaming_with_checksums(
+ self,
+ add_md5: bool,
+ ) -> (
+ impl Stream<Item = Result<Bytes, Error>>,
+ StreamingChecksumReceiver,
+ ) {
+ let (tx, rx) = oneshot::channel();
+ // TODO: actually calculate checksums!!
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ (
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| Error::bad_request("non-data frame"))
+ })
+ }),
+ rx,
+ )
+ }
+}
diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs
index 432ed44d..b184fc65 100644
--- a/src/api/common/signature/checksum.rs
+++ b/src/api/common/signature/checksum.rs
@@ -8,13 +8,15 @@ use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
-use http::HeaderName;
+use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
-use super::error::*;
+use super::*;
+
+pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
@@ -58,14 +60,18 @@ pub struct Checksums {
}
impl Checksummer {
- pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
- let mut ret = Self {
+ pub fn new() -> Self {
+ Self {
crc32: None,
crc32c: None,
md5: None,
sha1: None,
sha256: None,
- };
+ }
+ }
+
+ pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
+ let mut ret = Self::new();
if expected.md5.is_some() || require_md5 {
ret.md5 = Some(Md5::new());
@@ -179,3 +185,122 @@ impl Checksums {
}
}
}
+
+// ----
+
+/// Extract the value of the x-amz-checksum-algorithm header
+pub fn request_checksum_algorithm(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumAlgorithm>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ None => Ok(None),
+ Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
+ Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
+ Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
+ Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
+pub fn request_trailer_checksum_algorithm(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumAlgorithm>, Error> {
+ match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? {
+ None => Ok(None),
+ Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)),
+ Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)),
+ Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)),
+ Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
+/// Extract the value of any of the x-amz-checksum-* headers
+pub fn request_checksum_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ let mut ret = vec![];
+
+ if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
+ let crc32 = BASE64_STANDARD
+ .decode(&crc32_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ ret.push(ChecksumValue::Crc32(crc32))
+ }
+ if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
+ let crc32c = BASE64_STANDARD
+ .decode(&crc32c_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ ret.push(ChecksumValue::Crc32c(crc32c))
+ }
+ if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
+ let sha1 = BASE64_STANDARD
+ .decode(&sha1_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ ret.push(ChecksumValue::Sha1(sha1))
+ }
+ if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
+ let sha256 = BASE64_STANDARD
+ .decode(&sha256_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ ret.push(ChecksumValue::Sha256(sha256))
+ }
+
+ if ret.len() > 1 {
+ return Err(Error::bad_request(
+ "multiple x-amz-checksum-* headers given",
+ ));
+ }
+ Ok(ret.pop())
+}
+
+/// Checks for the presence of x-amz-checksum-algorithm
+/// if so extract the corresponding x-amz-checksum-* value
+pub fn request_checksum_algorithm_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ Some(x) if x == "CRC32" => {
+ let crc32 = headers
+ .get(X_AMZ_CHECKSUM_CRC32)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ Ok(Some(ChecksumValue::Crc32(crc32)))
+ }
+ Some(x) if x == "CRC32C" => {
+ let crc32c = headers
+ .get(X_AMZ_CHECKSUM_CRC32C)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ Ok(Some(ChecksumValue::Crc32c(crc32c)))
+ }
+ Some(x) if x == "SHA1" => {
+ let sha1 = headers
+ .get(X_AMZ_CHECKSUM_SHA1)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ Ok(Some(ChecksumValue::Sha1(sha1)))
+ }
+ Some(x) if x == "SHA256" => {
+ let sha256 = headers
+ .get(X_AMZ_CHECKSUM_SHA256)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ Ok(Some(ChecksumValue::Sha256(sha256)))
+ }
+ Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
+ None => Ok(None),
+ }
+}
diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs
index 2421d696..e93ca85a 100644
--- a/src/api/common/signature/mod.rs
+++ b/src/api/common/signature/mod.rs
@@ -11,6 +11,7 @@ use garage_util::data::{sha256sum, Hash};
use error::*;
+pub mod body;
pub mod checksum;
pub mod error;
pub mod payload;
@@ -51,7 +52,7 @@ pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
#[derive(Debug)]
pub enum ContentSha256Header {
UnsignedPayload,
- Sha256Hash(Hash),
+ Sha256Checksum(Hash),
StreamingPayload { trailer: bool, signed: bool },
}
@@ -90,15 +91,6 @@ pub async fn verify_request(
})
}
-pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
- if expected_sha256 != sha256sum(body) {
- return Err(Error::bad_request(
- "Request content hash does not match signed hash".to_string(),
- ));
- }
- Ok(())
-}
-
pub fn signing_hmac(
datetime: &DateTime<Utc>,
secret_key: &str,
diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs
index ccc55c90..4ca0153f 100644
--- a/src/api/common/signature/payload.rs
+++ b/src/api/common/signature/payload.rs
@@ -94,7 +94,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
- Ok(ContentSha256Header::Sha256Hash(sha256))
+ Ok(ContentSha256Header::Sha256Checksum(sha256))
}
}
diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs
index b8a5e66d..e8f9b3d7 100644
--- a/src/api/common/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
@@ -1,21 +1,22 @@
use std::pin::Pin;
+use std::sync::Mutex;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
-use http_body_util::StreamBody;
-use hyper::body::{Bytes, Incoming as IncomingBody};
+use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
use garage_util::data::Hash;
use super::*;
-use crate::helpers::*;
+use crate::helpers::body_stream;
+use crate::signature::checksum::*;
use crate::signature::payload::CheckedSignature;
-pub type ReqBody = BoxBody<Error>;
+pub use crate::signature::body::ReqBody;
pub fn parse_streaming_body(
req: Request<IncomingBody>,
@@ -23,6 +24,20 @@ pub fn parse_streaming_body(
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: match &checked_signature.content_sha256_header {
+ ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
+ _ => None,
+ },
+ extra: None,
+ };
+
+ let mut checksummer = Checksummer::init(&expected_checksums, false);
+
match checked_signature.content_sha256_header {
ContentSha256Header::StreamingPayload { signed, trailer } => {
if !signed && !trailer {
@@ -31,6 +46,11 @@ pub fn parse_streaming_body(
));
}
+ if trailer {
+ let algo = request_trailer_checksum_algorithm(req.headers())?;
+ checksummer = checksummer.add(algo);
+ }
+
let sign_params = if signed {
let signature = checked_signature
.signature_header
@@ -77,14 +97,24 @@ pub fn parse_streaming_body(
Ok(req.map(move |body| {
let stream = body_stream::<_, Error>(body);
+
let signed_payload_stream =
- StreamingPayloadStream::new(stream, sign_params, trailer)
- .map(|x| x.map(hyper::body::Frame::data))
- .map_err(Error::from);
- ReqBody::new(StreamBody::new(signed_payload_stream))
+ StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from);
+ ReqBody {
+ stream: Mutex::new(signed_payload_stream.boxed()),
+ checksummer,
+ expected_checksums,
+ }
}))
}
- _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
+ _ => Ok(req.map(|body| {
+ let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
+ ReqBody {
+ stream: Mutex::new(stream.boxed()),
+ checksummer,
+ expected_checksums,
+ }
+ })),
}
}
@@ -386,7 +416,7 @@ impl<S> Stream for StreamingPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
- type Item = Result<Bytes, StreamingPayloadError>;
+ type Item = Result<Frame<Bytes>, StreamingPayloadError>;
fn poll_next(
self: Pin<&mut Self>,
@@ -450,7 +480,7 @@ where
return Poll::Ready(None);
}
- return Poll::Ready(Some(Ok(data)));
+ return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
if let Some(signing) = this.signing.as_mut() {