aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 18:47:06 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-17 18:47:06 +0100
commitc5df820e2c2b4bff5e239b8e99f07178b98b3f5a (patch)
tree26fa3dd297ee1c8bb55f5f7573a5c3396b030507
parenta04d6cd5b8a3acffb8daeee00aed744fb1a78ea3 (diff)
downloadgarage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.tar.gz
garage-c5df820e2c2b4bff5e239b8e99f07178b98b3f5a.zip
api: start refactor of signature to calculate checksums earlier
-rw-r--r--src/api/common/cors.rs8
-rw-r--r--src/api/common/signature/body.rs69
-rw-r--r--src/api/common/signature/checksum.rs135
-rw-r--r--src/api/common/signature/mod.rs12
-rw-r--r--src/api/common/signature/payload.rs2
-rw-r--r--src/api/common/signature/streaming.rs52
-rw-r--r--src/api/k2v/batch.rs8
-rw-r--r--src/api/k2v/item.rs4
-rw-r--r--src/api/s3/api_server.rs27
-rw-r--r--src/api/s3/bucket.rs10
-rw-r--r--src/api/s3/checksum.rs108
-rw-r--r--src/api/s3/copy.rs1
-rw-r--r--src/api/s3/cors.rs11
-rw-r--r--src/api/s3/delete.rs9
-rw-r--r--src/api/s3/get.rs16
-rw-r--r--src/api/s3/lifecycle.rs10
-rw-r--r--src/api/s3/multipart.rs14
-rw-r--r--src/api/s3/post_object.rs1
-rw-r--r--src/api/s3/put.rs4
-rw-r--r--src/api/s3/website.rs10
-rw-r--r--src/web/web_server.rs8
21 files changed, 288 insertions, 231 deletions
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs
index 14369b56..09b55c13 100644
--- a/src/api/common/cors.rs
+++ b/src/api/common/cors.rs
@@ -14,9 +14,9 @@ use crate::common_error::{
};
use crate::helpers::*;
-pub fn find_matching_cors_rule<'a>(
+pub fn find_matching_cors_rule<'a, B>(
bucket_params: &'a BucketParams,
- req: &Request<impl Body>,
+ req: &Request<B>,
) -> Result<Option<&'a GarageCorsRule>, CommonError> {
if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@@ -132,8 +132,8 @@ pub async fn handle_options_api(
}
}
-pub fn handle_options_for_bucket(
- req: &Request<IncomingBody>,
+pub fn handle_options_for_bucket<B>(
+ req: &Request<B>,
bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs
new file mode 100644
index 00000000..877d8d85
--- /dev/null
+++ b/src/api/common/signature/body.rs
@@ -0,0 +1,69 @@
+use std::sync::Mutex;
+
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use http_body_util::{BodyExt, StreamBody};
+use hyper::body::{Bytes, Frame};
+use serde::Deserialize;
+use tokio::sync::{mpsc, oneshot};
+
+use super::*;
+
+use crate::signature::checksum::*;
+
+pub struct ReqBody {
+ // why need mutex to be sync??
+ pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
+ pub checksummer: Checksummer,
+ pub expected_checksums: ExpectedChecksums,
+}
+
+pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
+
+impl ReqBody {
+ pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
+ let body = self.collect().await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+ }
+
+ pub async fn collect(self) -> Result<Bytes, Error> {
+ self.collect_with_checksums().await.map(|(b, _)| b)
+ }
+
+ pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
+
+ self.checksummer.update(&bytes);
+ let checksums = self.checksummer.finalize();
+ checksums.verify(&self.expected_checksums)?;
+
+ Ok((bytes, checksums))
+ }
+
+ pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
+ self.streaming_with_checksums(false).0
+ }
+
+ pub fn streaming_with_checksums(
+ self,
+ add_md5: bool,
+ ) -> (
+ impl Stream<Item = Result<Bytes, Error>>,
+ StreamingChecksumReceiver,
+ ) {
+ let (tx, rx) = oneshot::channel();
+ // TODO: actually calculate checksums!!
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ (
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| Error::bad_request("non-data frame"))
+ })
+ }),
+ rx,
+ )
+ }
+}
diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs
index 432ed44d..b184fc65 100644
--- a/src/api/common/signature/checksum.rs
+++ b/src/api/common/signature/checksum.rs
@@ -8,13 +8,15 @@ use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
-use http::HeaderName;
+use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
-use super::error::*;
+use super::*;
+
+pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
@@ -58,14 +60,18 @@ pub struct Checksums {
}
impl Checksummer {
- pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
- let mut ret = Self {
+ pub fn new() -> Self {
+ Self {
crc32: None,
crc32c: None,
md5: None,
sha1: None,
sha256: None,
- };
+ }
+ }
+
+ pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
+ let mut ret = Self::new();
if expected.md5.is_some() || require_md5 {
ret.md5 = Some(Md5::new());
@@ -179,3 +185,122 @@ impl Checksums {
}
}
}
+
+// ----
+
+/// Extract the value of the x-amz-checksum-algorithm header
+pub fn request_checksum_algorithm(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumAlgorithm>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ None => Ok(None),
+ Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
+ Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
+ Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
+ Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
+pub fn request_trailer_checksum_algorithm(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumAlgorithm>, Error> {
+ match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? {
+ None => Ok(None),
+ Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)),
+ Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)),
+ Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)),
+ Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)),
+ _ => Err(Error::bad_request("invalid checksum algorithm")),
+ }
+}
+
+/// Extract the value of any of the x-amz-checksum-* headers
+pub fn request_checksum_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ let mut ret = vec![];
+
+ if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
+ let crc32 = BASE64_STANDARD
+ .decode(&crc32_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ ret.push(ChecksumValue::Crc32(crc32))
+ }
+ if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
+ let crc32c = BASE64_STANDARD
+ .decode(&crc32c_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ ret.push(ChecksumValue::Crc32c(crc32c))
+ }
+ if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
+ let sha1 = BASE64_STANDARD
+ .decode(&sha1_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ ret.push(ChecksumValue::Sha1(sha1))
+ }
+ if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
+ let sha256 = BASE64_STANDARD
+ .decode(&sha256_str)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ ret.push(ChecksumValue::Sha256(sha256))
+ }
+
+ if ret.len() > 1 {
+ return Err(Error::bad_request(
+ "multiple x-amz-checksum-* headers given",
+ ));
+ }
+ Ok(ret.pop())
+}
+
+/// Checks for the presence of x-amz-checksum-algorithm
+/// if so extract the corresponding x-amz-checksum-* value
+pub fn request_checksum_algorithm_value(
+ headers: &HeaderMap<HeaderValue>,
+) -> Result<Option<ChecksumValue>, Error> {
+ match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
+ Some(x) if x == "CRC32" => {
+ let crc32 = headers
+ .get(X_AMZ_CHECKSUM_CRC32)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
+ Ok(Some(ChecksumValue::Crc32(crc32)))
+ }
+ Some(x) if x == "CRC32C" => {
+ let crc32c = headers
+ .get(X_AMZ_CHECKSUM_CRC32C)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
+ Ok(Some(ChecksumValue::Crc32c(crc32c)))
+ }
+ Some(x) if x == "SHA1" => {
+ let sha1 = headers
+ .get(X_AMZ_CHECKSUM_SHA1)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
+ Ok(Some(ChecksumValue::Sha1(sha1)))
+ }
+ Some(x) if x == "SHA256" => {
+ let sha256 = headers
+ .get(X_AMZ_CHECKSUM_SHA256)
+ .and_then(|x| BASE64_STANDARD.decode(&x).ok())
+ .and_then(|x| x.try_into().ok())
+ .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
+ Ok(Some(ChecksumValue::Sha256(sha256)))
+ }
+ Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
+ None => Ok(None),
+ }
+}
diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs
index 2421d696..e93ca85a 100644
--- a/src/api/common/signature/mod.rs
+++ b/src/api/common/signature/mod.rs
@@ -11,6 +11,7 @@ use garage_util::data::{sha256sum, Hash};
use error::*;
+pub mod body;
pub mod checksum;
pub mod error;
pub mod payload;
@@ -51,7 +52,7 @@ pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
#[derive(Debug)]
pub enum ContentSha256Header {
UnsignedPayload,
- Sha256Hash(Hash),
+ Sha256Checksum(Hash),
StreamingPayload { trailer: bool, signed: bool },
}
@@ -90,15 +91,6 @@ pub async fn verify_request(
})
}
-pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
- if expected_sha256 != sha256sum(body) {
- return Err(Error::bad_request(
- "Request content hash does not match signed hash".to_string(),
- ));
- }
- Ok(())
-}
-
pub fn signing_hmac(
datetime: &DateTime<Utc>,
secret_key: &str,
diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs
index ccc55c90..4ca0153f 100644
--- a/src/api/common/signature/payload.rs
+++ b/src/api/common/signature/payload.rs
@@ -94,7 +94,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
- Ok(ContentSha256Header::Sha256Hash(sha256))
+ Ok(ContentSha256Header::Sha256Checksum(sha256))
}
}
diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs
index b8a5e66d..e8f9b3d7 100644
--- a/src/api/common/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
@@ -1,21 +1,22 @@
use std::pin::Pin;
+use std::sync::Mutex;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
-use http_body_util::StreamBody;
-use hyper::body::{Bytes, Incoming as IncomingBody};
+use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
use garage_util::data::Hash;
use super::*;
-use crate::helpers::*;
+use crate::helpers::body_stream;
+use crate::signature::checksum::*;
use crate::signature::payload::CheckedSignature;
-pub type ReqBody = BoxBody<Error>;
+pub use crate::signature::body::ReqBody;
pub fn parse_streaming_body(
req: Request<IncomingBody>,
@@ -23,6 +24,20 @@ pub fn parse_streaming_body(
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
+ let expected_checksums = ExpectedChecksums {
+ md5: match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ },
+ sha256: match &checked_signature.content_sha256_header {
+ ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
+ _ => None,
+ },
+ extra: None,
+ };
+
+ let mut checksummer = Checksummer::init(&expected_checksums, false);
+
match checked_signature.content_sha256_header {
ContentSha256Header::StreamingPayload { signed, trailer } => {
if !signed && !trailer {
@@ -31,6 +46,11 @@ pub fn parse_streaming_body(
));
}
+ if trailer {
+ let algo = request_trailer_checksum_algorithm(req.headers())?;
+ checksummer = checksummer.add(algo);
+ }
+
let sign_params = if signed {
let signature = checked_signature
.signature_header
@@ -77,14 +97,24 @@ pub fn parse_streaming_body(
Ok(req.map(move |body| {
let stream = body_stream::<_, Error>(body);
+
let signed_payload_stream =
- StreamingPayloadStream::new(stream, sign_params, trailer)
- .map(|x| x.map(hyper::body::Frame::data))
- .map_err(Error::from);
- ReqBody::new(StreamBody::new(signed_payload_stream))
+ StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from);
+ ReqBody {
+ stream: Mutex::new(signed_payload_stream.boxed()),
+ checksummer,
+ expected_checksums,
+ }
}))
}
- _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
+ _ => Ok(req.map(|body| {
+ let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
+ ReqBody {
+ stream: Mutex::new(stream.boxed()),
+ checksummer,
+ expected_checksums,
+ }
+ })),
}
}
@@ -386,7 +416,7 @@ impl<S> Stream for StreamingPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
- type Item = Result<Bytes, StreamingPayloadError>;
+ type Item = Result<Frame<Bytes>, StreamingPayloadError>;
fn poll_next(
self: Pin<&mut Self>,
@@ -450,7 +480,7 @@ where
return Poll::Ready(None);
}
- return Poll::Ready(Some(Ok(data)));
+ return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
if let Some(signing) = this.signing.as_mut() {
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index c284dbd4..7a03d836 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -20,7 +20,7 @@ pub async fn handle_insert_batch(
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
- let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
+ let items = req.into_body().json::<Vec<InsertBatchItem>>().await?;
let mut items2 = vec![];
for it in items {
@@ -47,7 +47,7 @@ pub async fn handle_read_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
- let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
+ let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@@ -141,7 +141,7 @@ pub async fn handle_delete_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
- let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
+ let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@@ -262,7 +262,7 @@ pub(crate) async fn handle_poll_range(
} = ctx;
use garage_model::k2v::sub::PollRange;
- let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
+ let query = req.into_body().json::<PollRangeQuery>().await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 4e28b499..0fb945d2 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -144,9 +144,7 @@ pub async fn handle_insert_item(
.map(parse_causality_token)
.transpose()?;
- let body = http_body_util::BodyExt::collect(req.into_body())
- .await?
- .to_bytes();
+ let body = req.into_body().collect().await?;
let value = DvvsValue::Value(body.to_vec());
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 0fdaab70..fe6545cc 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -125,7 +125,7 @@ impl ApiHandler for S3ApiServer {
let req = verified_request.request;
let api_key = verified_request.access_key;
let content_sha256 = match verified_request.content_sha256_header {
- ContentSha256Header::Sha256Hash(h) => Some(h),
+ ContentSha256Header::Sha256Checksum(h) => Some(h),
// TODO take into account streaming/trailer checksums, etc.
_ => None,
};
@@ -141,14 +141,7 @@ impl ApiHandler for S3ApiServer {
// Special code path for CreateBucket API endpoint
if let Endpoint::CreateBucket {} = endpoint {
- return handle_create_bucket(
- &garage,
- req,
- content_sha256,
- &api_key.key_id,
- bucket_name,
- )
- .await;
+ return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await;
}
let bucket_id = garage
@@ -186,7 +179,7 @@ impl ApiHandler for S3ApiServer {
let resp = match endpoint {
Endpoint::HeadObject {
key, part_number, ..
- } => handle_head(ctx, &req, &key, part_number).await,
+ } => handle_head(ctx, &req.map(|_| ()), &key, part_number).await,
Endpoint::GetObject {
key,
part_number,
@@ -206,7 +199,7 @@ impl ApiHandler for S3ApiServer {
response_content_type,
response_expires,
};
- handle_get(ctx, &req, &key, part_number, overrides).await
+ handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await
}
Endpoint::UploadPart {
key,
@@ -228,7 +221,7 @@ impl ApiHandler for S3ApiServer {
handle_create_multipart_upload(ctx, &req, &key).await
}
Endpoint::CompleteMultipartUpload { key, upload_id } => {
- handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
+ handle_complete_multipart_upload(ctx, req, &key, &upload_id).await
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
@@ -331,17 +324,15 @@ impl ApiHandler for S3ApiServer {
};
handle_list_parts(ctx, req, &query).await
}
- Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
+ Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await,
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
- Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
+ Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await,
Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
- Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
+ Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await,
Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
- Endpoint::PutBucketLifecycleConfiguration {} => {
- handle_put_lifecycle(ctx, req, content_sha256).await
- }
+ Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await,
Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 0a192ba6..3a09e769 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -1,6 +1,5 @@
use std::collections::HashMap;
-use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
@@ -10,12 +9,10 @@ use garage_model::key_table::Key;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
-use garage_util::data::*;
use garage_util::time::*;
use garage_api_common::common_error::CommonError;
use garage_api_common::helpers::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@@ -122,15 +119,10 @@ pub async fn handle_list_buckets(
pub async fn handle_create_bucket(
garage: &Garage,
req: Request<ReqBody>,
- content_sha256: Option<Hash>,
api_key_id: &String,
bucket_name: String,
) -> Result<Response<ResBody>, Error> {
- let body = BodyExt::collect(req.into_body()).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req.into_body().collect().await?;
let cmd =
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs
index a720a82f..8e6096b6 100644
--- a/src/api/s3/checksum.rs
+++ b/src/api/s3/checksum.rs
@@ -8,8 +8,6 @@ use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
-use http::{HeaderMap, HeaderValue};
-
use garage_util::error::OkOrMessage;
use garage_model::s3::object_table::*;
@@ -112,112 +110,6 @@ impl MultipartChecksummer {
}
}
-// ----
-
-/// Extract the value of the x-amz-checksum-algorithm header
-pub(crate) fn request_checksum_algorithm(
- headers: &HeaderMap<HeaderValue>,
-) -> Result<Option<ChecksumAlgorithm>, Error> {
- match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
- None => Ok(None),
- Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
- Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
- Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
- Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
- _ => Err(Error::bad_request("invalid checksum algorithm")),
- }
-}
-
-/// Extract the value of any of the x-amz-checksum-* headers
-pub(crate) fn request_checksum_value(
- headers: &HeaderMap<HeaderValue>,
-) -> Result<Option<ChecksumValue>, Error> {
- let mut ret = vec![];
-
- if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
- let crc32 = BASE64_STANDARD
- .decode(&crc32_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
- ret.push(ChecksumValue::Crc32(crc32))
- }
- if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
- let crc32c = BASE64_STANDARD
- .decode(&crc32c_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
- ret.push(ChecksumValue::Crc32c(crc32c))
- }
- if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
- let sha1 = BASE64_STANDARD
- .decode(&sha1_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
- ret.push(ChecksumValue::Sha1(sha1))
- }
- if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
- let sha256 = BASE64_STANDARD
- .decode(&sha256_str)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
- ret.push(ChecksumValue::Sha256(sha256))
- }
-
- if ret.len() > 1 {
- return Err(Error::bad_request(
- "multiple x-amz-checksum-* headers given",
- ));
- }
- Ok(ret.pop())
-}
-
-/// Checks for the presence of x-amz-checksum-algorithm
-/// if so extract the corresponding x-amz-checksum-* value
-pub(crate) fn request_checksum_algorithm_value(
- headers: &HeaderMap<HeaderValue>,
-) -> Result<Option<ChecksumValue>, Error> {
- match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
- Some(x) if x == "CRC32" => {
- let crc32 = headers
- .get(X_AMZ_CHECKSUM_CRC32)
- .and_then(|x| BASE64_STANDARD.decode(&x).ok())
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
- Ok(Some(ChecksumValue::Crc32(crc32)))
- }
- Some(x) if x == "CRC32C" => {
- let crc32c = headers
- .get(X_AMZ_CHECKSUM_CRC32C)
- .and_then(|x| BASE64_STANDARD.decode(&x).ok())
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
- Ok(Some(ChecksumValue::Crc32c(crc32c)))
- }
- Some(x) if x == "SHA1" => {
- let sha1 = headers
- .get(X_AMZ_CHECKSUM_SHA1)
- .and_then(|x| BASE64_STANDARD.decode(&x).ok())
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
- Ok(Some(ChecksumValue::Sha1(sha1)))
- }
- Some(x) if x == "SHA256" => {
- let sha256 = headers
- .get(X_AMZ_CHECKSUM_SHA256)
- .and_then(|x| BASE64_STANDARD.decode(&x).ok())
- .and_then(|x| x.try_into().ok())
- .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
- Ok(Some(ChecksumValue::Sha256(sha256)))
- }
- Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
- None => Ok(None),
- }
-}
-
pub(crate) fn add_checksum_response_headers(
checksum: &Option<ChecksumValue>,
mut resp: http::response::Builder,
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4bf68406..9ae48807 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -24,7 +24,6 @@ use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody};
-use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::get::full_object_byte_stream;
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 625b84db..fcfdb934 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -2,15 +2,11 @@ use quick_xml::de::from_reader;
use hyper::{header::HeaderName, Method, Request, Response, StatusCode};
-use http_body_util::BodyExt;
-
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
-use garage_util::data::*;
use garage_api_common::helpers::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@@ -59,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error>
pub async fn handle_put_cors(
ctx: ReqCtx,
req: Request<ReqBody>,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@@ -68,11 +63,7 @@ pub async fn handle_put_cors(
..
} = ctx;
- let body = BodyExt::collect(req.into_body()).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req.into_body().collect().await?;
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index b799e67a..d785b9d8 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -1,4 +1,3 @@
-use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@@ -6,7 +5,6 @@ use garage_util::data::*;
use garage_model::s3::object_table::*;
use garage_api_common::helpers::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@@ -68,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>,
pub async fn handle_delete_objects(
ctx: ReqCtx,
req: Request<ReqBody>,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
- let body = BodyExt::collect(req.into_body()).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req.into_body().collect().await?;
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 6627cf4a..16e2b935 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -12,7 +12,7 @@ use http::header::{
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
LAST_MODIFIED, RANGE,
};
-use hyper::{body::Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use tokio::sync::mpsc;
use garage_net::stream::ByteStream;
@@ -119,7 +119,7 @@ fn getobject_override_headers(
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
- req: &Request<impl Body>,
+ req: &Request<()>,
) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
@@ -158,7 +158,7 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
ctx: ReqCtx,
- req: &Request<impl Body>,
+ req: &Request<()>,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
@@ -168,7 +168,7 @@ pub async fn handle_head(
/// Handle HEAD request for website
pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
- req: &Request<impl Body>,
+ req: &Request<()>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@@ -279,7 +279,7 @@ pub async fn handle_head_without_ctx(
/// Handle GET request
pub async fn handle_get(
ctx: ReqCtx,
- req: &Request<impl Body>,
+ req: &Request<()>,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
@@ -290,7 +290,7 @@ pub async fn handle_get(
/// Handle GET request
pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
- req: &Request<impl Body>,
+ req: &Request<()>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@@ -578,7 +578,7 @@ async fn handle_get_part(
}
fn parse_range_header(
- req: &Request<impl Body>,
+ req: &Request<()>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@@ -619,7 +619,7 @@ struct ChecksumMode {
enabled: bool,
}
-fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
+fn checksum_mode(req: &Request<()>) -> ChecksumMode {
ChecksumMode {
enabled: req
.headers()
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index c35047ed..c140494e 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -1,12 +1,10 @@
use quick_xml::de::from_reader;
-use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_api_common::helpers::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@@ -16,7 +14,6 @@ use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
};
-use garage_util::data::*;
pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
@@ -56,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E
pub async fn handle_put_lifecycle(
ctx: ReqCtx,
req: Request<ReqBody>,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@@ -65,11 +61,7 @@ pub async fn handle_put_lifecycle(
..
} = ctx;
- let body = BodyExt::collect(req.into_body()).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req.into_body().collect().await?;
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
let config = conf
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 7f8d6440..f381d670 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -17,7 +17,6 @@ use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
@@ -114,7 +113,11 @@ pub async fn handle_put_part(
let key = key.to_string();
let (req_head, req_body) = req.into_parts();
- let stream = body_stream(req_body);
+
+ let (stream, checksums) = req_body.streaming_with_checksums(true);
+ let stream = stream.map_err(Error::from);
+ // TODO checksums
+
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, object_version, mut mpu), first_block) =
@@ -249,7 +252,6 @@ pub async fn handle_complete_multipart_upload(
req: Request<ReqBody>,
key: &str,
upload_id: &str,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@@ -261,11 +263,7 @@ pub async fn handle_complete_multipart_upload(
let expected_checksum = request_checksum_value(&req_head.headers)?;
- let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req_body.collect().await?;
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml)
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 908ee9f3..6c1b7453 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -22,7 +22,6 @@ use garage_api_common::signature::checksum::*;
use garage_api_common::signature::payload::{verify_v4, Authorization};
use crate::api_server::ResBody;
-use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::put::{get_headers, save_stream, ChecksumMode};
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 834be6f1..551c3b76 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -79,7 +79,9 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
- let stream = body_stream(req.into_body());
+ let (stream, checksums) = req.into_body().streaming_with_checksums(true);
+ let stream = stream.map_err(Error::from);
+ // TODO checksums
let res = save_stream(
&ctx,
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index b55bb345..7553bef7 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -1,14 +1,11 @@
use quick_xml::de::from_reader;
-use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::*;
-use garage_util::data::*;
use garage_api_common::helpers::*;
-use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@@ -61,7 +58,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err
pub async fn handle_put_website(
ctx: ReqCtx,
req: Request<ReqBody>,
- content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@@ -70,11 +66,7 @@ pub async fn handle_put_website(
..
} = ctx;
- let body = BodyExt::collect(req.into_body()).await?.to_bytes();
-
- if let Some(content_sha256) = content_sha256 {
- verify_signed_content(content_sha256, &body[..])?;
- }
+ let body = req.into_body().collect().await?;
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index e73dab48..34ba834c 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -1,6 +1,6 @@
use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
-use std::{convert::Infallible, sync::Arc};
+use std::sync::Arc;
use tokio::net::{TcpListener, UnixListener};
use tokio::sync::watch;
@@ -163,6 +163,8 @@ impl WebServer {
metrics_tags.push(KeyValue::new("host", host_header.clone()));
}
+ let req = req.map(|_| ());
+
// The actual handler
let res = self
.serve_file(&req)
@@ -218,7 +220,7 @@ impl WebServer {
async fn serve_file(
self: &Arc<Self>,
- req: &Request<IncomingBody>,
+ req: &Request<()>,
) -> Result<Response<BoxBody<ApiError>>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
@@ -322,7 +324,7 @@ impl WebServer {
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
- .body(empty_body::<Infallible>())
+ .body(())
.unwrap();
match handle_get_without_ctx(