diff options
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/api_server.rs | 33 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 10 | ||||
-rw-r--r-- | src/api/s3/checksum.rs | 406 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 2 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 11 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 9 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 2 | ||||
-rw-r--r-- | src/api/s3/error.rs | 3 | ||||
-rw-r--r-- | src/api/s3/get.rs | 32 | ||||
-rw-r--r-- | src/api/s3/lib.rs | 1 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 10 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 151 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 7 | ||||
-rw-r--r-- | src/api/s3/put.rs | 70 | ||||
-rw-r--r-- | src/api/s3/website.rs | 10 |
15 files changed, 237 insertions, 520 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 14fd03c3..e26c2b65 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -121,7 +121,9 @@ impl ApiHandler for S3ApiServer { return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?; + let verified_request = verify_request(&garage, req, "s3").await?; + let req = verified_request.request; + let api_key = verified_request.access_key; let bucket_name = match bucket_name { None => { @@ -134,14 +136,7 @@ impl ApiHandler for S3ApiServer { // Special code path for CreateBucket API endpoint if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket( - &garage, - req, - content_sha256, - &api_key.key_id, - bucket_name, - ) - .await; + return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } let bucket_id = garage @@ -179,7 +174,7 @@ impl ApiHandler for S3ApiServer { let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(ctx, &req, &key, part_number).await, + } => handle_head(ctx, &req.map(|_| ()), &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -199,20 +194,20 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(ctx, &req, &key, part_number, overrides).await + handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + } => handle_put_part(ctx, req, &key, part_number, &upload_id).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, - Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key).await, Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(ctx, &key, &upload_id).await } @@ -221,7 +216,7 @@ impl ApiHandler for S3ApiServer { handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await + handle_complete_multipart_upload(ctx, req, &key, &upload_id).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { @@ -324,17 +319,15 @@ impl ApiHandler for S3ApiServer { }; handle_list_parts(ctx, req, &query).await } - Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await, Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, - Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await, Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, - Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await, Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, - Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(ctx, req, content_sha256).await - } + Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await, Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 0a192ba6..3a09e769 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; @@ -10,12 +9,10 @@ use garage_model::key_table::Key; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; -use garage_util::data::*; use garage_util::time::*; use garage_api_common::common_error::CommonError; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -122,15 +119,10 @@ pub async fn handle_list_buckets( pub async fn handle_create_bucket( garage: &Garage, req: Request<ReqBody>, - content_sha256: Option<Hash>, api_key_id: &String, bucket_name: String, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs deleted file mode 100644 index 02fb55ec..00000000 --- a/src/api/s3/checksum.rs +++ /dev/null @@ -1,406 +0,0 @@ -use std::convert::{TryFrom, TryInto}; -use std::hash::Hasher; - -use base64::prelude::*; -use crc32c::Crc32cHasher as Crc32c; -use crc32fast::Hasher as Crc32; -use md5::{Digest, Md5}; -use sha1::Sha1; -use sha2::Sha256; - -use http::{HeaderMap, HeaderName, HeaderValue}; - -use garage_util::data::*; -use garage_util::error::OkOrMessage; - -use garage_model::s3::object_table::*; - -use crate::error::*; - -pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = - HeaderName::from_static("x-amz-checksum-algorithm"); -pub const X_AMZ_CHECKSUM_MODE: HeaderName = HeaderName::from_static("x-amz-checksum-mode"); -pub const X_AMZ_CHECKSUM_CRC32: HeaderName = HeaderName::from_static("x-amz-checksum-crc32"); -pub const X_AMZ_CHECKSUM_CRC32C: HeaderName = HeaderName::from_static("x-amz-checksum-crc32c"); -pub const X_AMZ_CHECKSUM_SHA1: HeaderName = HeaderName::from_static("x-amz-checksum-sha1"); -pub const X_AMZ_CHECKSUM_SHA256: HeaderName = HeaderName::from_static("x-amz-checksum-sha256"); - -pub type Crc32Checksum = [u8; 4]; -pub type Crc32cChecksum = [u8; 4]; -pub type Md5Checksum = [u8; 16]; -pub type Sha1Checksum = [u8; 20]; -pub type Sha256Checksum = [u8; 32]; - -#[derive(Debug, Default)] -pub(crate) struct ExpectedChecksums { - // base64-encoded md5 (content-md5 header) - pub md5: Option<String>, - // content_sha256 (as a Hash / FixedBytes32) - pub sha256: Option<Hash>, - // extra x-amz-checksum-* header - pub extra: Option<ChecksumValue>, -} - -pub(crate) struct Checksummer { - pub crc32: Option<Crc32>, - pub crc32c: Option<Crc32c>, - pub md5: Option<Md5>, - pub sha1: Option<Sha1>, - pub sha256: Option<Sha256>, -} - -#[derive(Default)] -pub(crate) struct Checksums { - pub crc32: Option<Crc32Checksum>, - pub crc32c: Option<Crc32cChecksum>, - pub md5: Option<Md5Checksum>, - pub sha1: Option<Sha1Checksum>, - pub sha256: Option<Sha256Checksum>, -} - -impl Checksummer { - pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { - let mut ret = Self { - crc32: None, - crc32c: None, - md5: None, - sha1: None, - sha256: None, - }; - - if expected.md5.is_some() || require_md5 { - ret.md5 = Some(Md5::new()); - } - if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { - ret.sha256 = Some(Sha256::new()); - } - if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { - ret.crc32 = Some(Crc32::new()); - } - if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { - ret.crc32c = Some(Crc32c::default()); - } - if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { - ret.sha1 = Some(Sha1::new()); - } - ret - } - - pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { - match algo { - Some(ChecksumAlgorithm::Crc32) => { - self.crc32 = Some(Crc32::new()); - } - Some(ChecksumAlgorithm::Crc32c) => { - self.crc32c = Some(Crc32c::default()); - } - Some(ChecksumAlgorithm::Sha1) => { - self.sha1 = Some(Sha1::new()); - } - Some(ChecksumAlgorithm::Sha256) => { - self.sha256 = Some(Sha256::new()); - } - None => (), - } - self - } - - pub(crate) fn update(&mut self, bytes: &[u8]) { - if let Some(crc32) = &mut self.crc32 { - crc32.update(bytes); - } - if let Some(crc32c) = &mut self.crc32c { - crc32c.write(bytes); - } - if let Some(md5) = &mut self.md5 { - md5.update(bytes); - } - if let Some(sha1) = &mut self.sha1 { - sha1.update(bytes); - } - if let Some(sha256) = &mut self.sha256 { - sha256.update(bytes); - } - } - - pub(crate) fn finalize(self) -> Checksums { - Checksums { - crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())), - crc32c: self - .crc32c - .map(|x| u32::to_be_bytes(u32::try_from(x.finish()).unwrap())), - md5: self.md5.map(|x| x.finalize()[..].try_into().unwrap()), - sha1: self.sha1.map(|x| x.finalize()[..].try_into().unwrap()), - sha256: self.sha256.map(|x| x.finalize()[..].try_into().unwrap()), - } - } -} - -impl Checksums { - pub fn verify(&self, expected: &ExpectedChecksums) -> Result<(), Error> { - if let Some(expected_md5) = &expected.md5 { - match self.md5 { - Some(md5) if BASE64_STANDARD.encode(&md5) == expected_md5.trim_matches('"') => (), - _ => { - return Err(Error::InvalidDigest( - "MD5 checksum verification failed (from content-md5)".into(), - )) - } - } - } - if let Some(expected_sha256) = &expected.sha256 { - match self.sha256 { - Some(sha256) if &sha256[..] == expected_sha256.as_slice() => (), - _ => { - return Err(Error::InvalidDigest( - "SHA256 checksum verification failed (from x-amz-content-sha256)".into(), - )) - } - } - } - if let Some(extra) = expected.extra { - let algo = extra.algorithm(); - if self.extract(Some(algo)) != Some(extra) { - return Err(Error::InvalidDigest(format!( - "Failed to validate checksum for algorithm {:?}", - algo - ))); - } - } - Ok(()) - } - - pub fn extract(&self, algo: Option<ChecksumAlgorithm>) -> Option<ChecksumValue> { - match algo { - None => None, - Some(ChecksumAlgorithm::Crc32) => Some(ChecksumValue::Crc32(self.crc32.unwrap())), - Some(ChecksumAlgorithm::Crc32c) => Some(ChecksumValue::Crc32c(self.crc32c.unwrap())), - Some(ChecksumAlgorithm::Sha1) => Some(ChecksumValue::Sha1(self.sha1.unwrap())), - Some(ChecksumAlgorithm::Sha256) => Some(ChecksumValue::Sha256(self.sha256.unwrap())), - } - } -} - -// ---- - -#[derive(Default)] -pub(crate) struct MultipartChecksummer { - pub md5: Md5, - pub extra: Option<MultipartExtraChecksummer>, -} - -pub(crate) enum MultipartExtraChecksummer { - Crc32(Crc32), - Crc32c(Crc32c), - Sha1(Sha1), - Sha256(Sha256), -} - -impl MultipartChecksummer { - pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { - Self { - md5: Md5::new(), - extra: match algo { - None => None, - Some(ChecksumAlgorithm::Crc32) => { - Some(MultipartExtraChecksummer::Crc32(Crc32::new())) - } - Some(ChecksumAlgorithm::Crc32c) => { - Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) - } - Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), - Some(ChecksumAlgorithm::Sha256) => { - Some(MultipartExtraChecksummer::Sha256(Sha256::new())) - } - }, - } - } - - pub(crate) fn update( - &mut self, - etag: &str, - checksum: Option<ChecksumValue>, - ) -> Result<(), Error> { - self.md5 - .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); - match (&mut self.extra, checksum) { - (None, _) => (), - ( - Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), - Some(ChecksumValue::Crc32(x)), - ) => { - crc32.update(&x); - } - ( - Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), - Some(ChecksumValue::Crc32c(x)), - ) => { - crc32c.write(&x); - } - (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { - sha1.update(&x); - } - ( - Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), - Some(ChecksumValue::Sha256(x)), - ) => { - sha256.update(&x); - } - (Some(_), b) => { - return Err(Error::internal_error(format!( - "part checksum was not computed correctly, got: {:?}", - b - ))) - } - } - Ok(()) - } - - pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { - let md5 = self.md5.finalize()[..].try_into().unwrap(); - let extra = match self.extra { - None => None, - Some(MultipartExtraChecksummer::Crc32(crc32)) => { - Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) - } - Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( - u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), - )), - Some(MultipartExtraChecksummer::Sha1(sha1)) => { - Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) - } - Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( - sha256.finalize()[..].try_into().unwrap(), - )), - }; - (md5, extra) - } -} - -// ---- - -/// Extract the value of the x-amz-checksum-algorithm header -pub(crate) fn request_checksum_algorithm( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumAlgorithm>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - None => Ok(None), - Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), - Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), - Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), - Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), - _ => Err(Error::bad_request("invalid checksum algorithm")), - } -} - -/// Extract the value of any of the x-amz-checksum-* headers -pub(crate) fn request_checksum_value( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - let mut ret = vec![]; - - if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { - let crc32 = BASE64_STANDARD - .decode(&crc32_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - ret.push(ChecksumValue::Crc32(crc32)) - } - if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { - let crc32c = BASE64_STANDARD - .decode(&crc32c_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - ret.push(ChecksumValue::Crc32c(crc32c)) - } - if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { - let sha1 = BASE64_STANDARD - .decode(&sha1_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - ret.push(ChecksumValue::Sha1(sha1)) - } - if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { - let sha256 = BASE64_STANDARD - .decode(&sha256_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - ret.push(ChecksumValue::Sha256(sha256)) - } - - if ret.len() > 1 { - return Err(Error::bad_request( - "multiple x-amz-checksum-* headers given", - )); - } - Ok(ret.pop()) -} - -/// Checks for the presence of x-amz-checksum-algorithm -/// if so extract the corresponding x-amz-checksum-* value -pub(crate) fn request_checksum_algorithm_value( - headers: &HeaderMap<HeaderValue>, -) -> Result<Option<ChecksumValue>, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - Some(x) if x == "CRC32" => { - let crc32 = headers - .get(X_AMZ_CHECKSUM_CRC32) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - Ok(Some(ChecksumValue::Crc32(crc32))) - } - Some(x) if x == "CRC32C" => { - let crc32c = headers - .get(X_AMZ_CHECKSUM_CRC32C) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - Ok(Some(ChecksumValue::Crc32c(crc32c))) - } - Some(x) if x == "SHA1" => { - let sha1 = headers - .get(X_AMZ_CHECKSUM_SHA1) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - Ok(Some(ChecksumValue::Sha1(sha1))) - } - Some(x) if x == "SHA256" => { - let sha256 = headers - .get(X_AMZ_CHECKSUM_SHA256) - .and_then(|x| BASE64_STANDARD.decode(&x).ok()) - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - Ok(Some(ChecksumValue::Sha256(sha256))) - } - Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), - None => Ok(None), - } -} - -pub(crate) fn add_checksum_response_headers( - checksum: &Option<ChecksumValue>, - mut resp: http::response::Builder, -) -> http::response::Builder { - match checksum { - Some(ChecksumValue::Crc32(crc32)) => { - resp = resp.header(X_AMZ_CHECKSUM_CRC32, BASE64_STANDARD.encode(&crc32)); - } - Some(ChecksumValue::Crc32c(crc32c)) => { - resp = resp.header(X_AMZ_CHECKSUM_CRC32C, BASE64_STANDARD.encode(&crc32c)); - } - Some(ChecksumValue::Sha1(sha1)) => { - resp = resp.header(X_AMZ_CHECKSUM_SHA1, BASE64_STANDARD.encode(&sha1)); - } - Some(ChecksumValue::Sha256(sha256)) => { - resp = resp.header(X_AMZ_CHECKSUM_SHA256, BASE64_STANDARD.encode(&sha256)); - } - None => (), - } - resp -} diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 07d50ea5..9ae48807 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -21,9 +21,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::get::full_object_byte_stream; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index 625b84db..fcfdb934 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -2,15 +2,11 @@ use quick_xml::de::from_reader; use hyper::{header::HeaderName, Method, Request, Response, StatusCode}; -use http_body_util::BodyExt; - use serde::{Deserialize, Serialize}; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -59,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> pub async fn handle_put_cors( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -68,11 +63,7 @@ pub async fn handle_put_cors( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index b799e67a..d785b9d8 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,4 +1,3 @@ -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; @@ -6,7 +5,6 @@ use garage_util::data::*; use garage_model::s3::object_table::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -68,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, pub async fn handle_delete_objects( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index b38d7792..fa7285ca 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -29,8 +29,8 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner}; use garage_api_common::common_error::*; +use garage_api_common::signature::checksum::Md5Checksum; -use crate::checksum::Md5Checksum; use crate::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 1bb8909c..6d4b7a11 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -80,7 +80,7 @@ pub enum Error { #[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)] InvalidEncryptionAlgorithm(String), - /// The client sent invalid XML data + /// The provided digest (checksum) value was invalid #[error(display = "Invalid digest: {}", _0)] InvalidDigest(String), @@ -119,6 +119,7 @@ impl From<SignatureError> for Error { Self::AuthorizationHeaderMalformed(c) } SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), + SignatureError::InvalidDigest(d) => Self::InvalidDigest(d), } } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index c2393a51..bcb72cc3 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -12,7 +12,7 @@ use http::header::{ CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{body::Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; @@ -26,9 +26,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; use crate::api_server::ResBody; -use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; use crate::encryption::EncryptionParams; use crate::error::*; @@ -118,7 +118,7 @@ fn getobject_override_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<impl Body>, + req: &Request<()>, ) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes @@ -157,7 +157,7 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, ) -> Result<Response<ResBody>, Error> { @@ -167,7 +167,7 @@ pub async fn handle_head( /// Handle HEAD request for website pub async fn handle_head_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -278,7 +278,7 @@ pub async fn handle_head_without_ctx( /// Handle GET request pub async fn handle_get( ctx: ReqCtx, - req: &Request<impl Body>, + req: &Request<()>, key: &str, part_number: Option<u64>, overrides: GetObjectOverrides, @@ -289,7 +289,7 @@ pub async fn handle_get( /// Handle GET request pub async fn handle_get_without_ctx( garage: Arc<Garage>, - req: &Request<impl Body>, + req: &Request<()>, bucket_id: Uuid, key: &str, part_number: Option<u64>, @@ -340,7 +340,12 @@ pub async fn handle_get_without_ctx( enc, &headers, pn, - checksum_mode, + ChecksumMode { + // TODO: for multipart uploads, checksums of each part should be stored + // so that we can return the corresponding checksum here + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -354,7 +359,12 @@ pub async fn handle_get_without_ctx( &headers, range.start, range.start + range.length, - checksum_mode, + ChecksumMode { + // TODO: for range queries that align with part boundaries, + // we should return the saved checksum of the part + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + enabled: false, + }, ) .await } @@ -577,7 +587,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<impl Body>, + req: &Request<()>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -618,7 +628,7 @@ struct ChecksumMode { enabled: bool, } -fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode { +fn checksum_mode(req: &Request<()>) -> ChecksumMode { ChecksumMode { enabled: req .headers() diff --git a/src/api/s3/lib.rs b/src/api/s3/lib.rs index fd99b443..4d1d3ef5 100644 --- a/src/api/s3/lib.rs +++ b/src/api/s3/lib.rs @@ -16,7 +16,6 @@ mod post_object; mod put; mod website; -mod checksum; mod encryption; mod router; pub mod xml; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index c35047ed..c140494e 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,12 +1,10 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -16,7 +14,6 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_util::data::*; pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> { let ReqCtx { bucket_params, .. } = ctx; @@ -56,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E pub async fn handle_put_lifecycle( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -65,11 +61,7 @@ pub async fn handle_put_lifecycle( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index fa053df2..1ee04bc1 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,13 +1,20 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; use std::sync::Arc; use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; use futures::prelude::*; use hyper::{Request, Response}; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; use garage_table::*; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; @@ -16,10 +23,9 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::put::*; @@ -94,7 +100,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = &ctx; @@ -105,17 +110,30 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; - // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); - let stream = body_stream(req_body); + let (req_head, mut req_body) = req.into_parts(); + + // Before we stream the body, configure the needed checksums. + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as an etag of the + // part, which is in turn used in the etag computation of the whole object + req_body.add_md5(); + } + + let (stream, stream_checksums) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); + // Read first chuck, and at the same time try to get object to see if it exists let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; @@ -172,21 +190,21 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; - // Verify that checksums map - checksums.verify(&expected_checksums)?; + // Verify that checksums match + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); @@ -248,7 +266,6 @@ pub async fn handle_complete_multipart_upload( req: Request<ReqBody>, key: &str, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -260,11 +277,7 @@ pub async fn handle_complete_multipart_upload( let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req_body.collect().await?; let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) @@ -602,3 +615,99 @@ fn parse_complete_multipart_upload_body( Some(parts) } + +// ====== checksummer ==== + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 6c0e73d4..350684da 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -18,10 +18,10 @@ use garage_model::s3::object_table::*; use garage_api_common::cors::*; use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; use garage_api_common::signature::payload::{verify_v4, Authorization}; use crate::api_server::ResBody; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; use crate::put::{get_headers, save_stream, ChecksumMode}; @@ -218,6 +218,7 @@ pub async fn handle_post_object( // around here to make sure the rest of the machinery takes our acl into account. let headers = get_headers(¶ms)?; + let checksum_algorithm = request_checksum_algorithm(¶ms)?; let expected_checksums = ExpectedChecksums { md5: params .get("content-md5") @@ -225,7 +226,9 @@ pub async fn handle_post_object( .transpose()? .map(str::to_string), sha256: None, - extra: request_checksum_algorithm_value(¶ms)?, + extra: checksum_algorithm + .map(|algo| extract_checksum_value(¶ms, algo)) + .transpose()?, }; let meta = ObjectVersionMetaInner { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 530b4e7b..4d866a06 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -31,9 +31,10 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; +use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; -use crate::checksum::*; use crate::encryption::EncryptionParams; use crate::error::*; @@ -48,6 +49,10 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom { + checksummer: StreamingChecksumReceiver, + trailer_algo: Option<ChecksumAlgorithm>, + }, Calculate(Option<ChecksumAlgorithm>), } @@ -55,7 +60,6 @@ pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, key: &String, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; @@ -66,9 +70,10 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; + let trailer_checksum_algorithm = request_trailer_checksum_algorithm(req.headers())?; let meta = ObjectVersionMetaInner { headers, @@ -78,7 +83,19 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let stream = body_stream(req.into_body()); + // The request body is a special ReqBody object (see garage_api_common::signature::body) + // which supports calculating checksums while streaming the data. + // Before we start streaming, we configure it to calculate all the checksums we need. + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as the object etag + req_body.add_md5(); + } + + let (stream, checksummer) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); let res = save_stream( &ctx, @@ -86,7 +103,10 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo: trailer_checksum_algorithm, + }, ) .await?; @@ -122,10 +142,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom { .. } => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -133,7 +158,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -142,6 +167,18 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; let size = first_block.len() as u64; @@ -213,13 +250,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -232,6 +269,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom { + checksummer, + trailer_algo, + } => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + if let Some(algo) = trailer_algo { + meta.checksum = checksums.extract(Some(algo)); + } + } }; // Verify quotas are respsected @@ -332,7 +380,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker<S>, + mut chunker: StreamChunker<S>, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index b55bb345..7553bef7 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,14 +1,11 @@ use quick_xml::de::from_reader; -use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_model::bucket_table::*; -use garage_util::data::*; use garage_api_common::helpers::*; -use garage_api_common::signature::verify_signed_content; use crate::api_server::{ReqBody, ResBody}; use crate::error::*; @@ -61,7 +58,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err pub async fn handle_put_website( ctx: ReqCtx, req: Request<ReqBody>, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -70,11 +66,7 @@ pub async fn handle_put_website( .. } = ctx; - let body = BodyExt::collect(req.into_body()).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req.into_body().collect().await?; let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; |