diff options
Diffstat (limited to 'src/api/s3/multipart.rs')
-rw-r--r-- | src/api/s3/multipart.rs | 177 |
1 files changed, 148 insertions, 29 deletions
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 3db3e8aa..d6eb26cb 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,13 +1,20 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; +use std::hash::Hasher; use std::sync::Arc; use base64::prelude::*; +use crc32c::Crc32cHasher as Crc32c; +use crc32fast::Hasher as Crc32; use futures::prelude::*; use hyper::{Request, Response}; +use md5::{Digest, Md5}; +use sha1::Sha1; +use sha2::Sha256; use garage_table::*; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; @@ -15,14 +22,14 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::*; -use crate::s3::api_server::{ReqBody, ResBody}; -use crate::s3::checksum::*; -use crate::s3::encryption::EncryptionParams; -use crate::s3::error::*; -use crate::s3::put::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; +use garage_api_common::helpers::*; +use garage_api_common::signature::checksum::*; + +use crate::api_server::{ReqBody, ResBody}; +use crate::encryption::EncryptionParams; +use crate::error::*; +use crate::put::*; +use crate::xml as s3_xml; // ---- @@ -42,7 +49,7 @@ pub async fn handle_create_multipart_upload( let upload_id = gen_uuid(); let timestamp = next_timestamp(existing_object.as_ref()); - let headers = get_headers(req.headers())?; + let headers = extract_metadata_headers(req.headers())?; let meta = ObjectVersionMetaInner { headers, checksum: None, @@ -93,7 +100,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, .. } = &ctx; @@ -104,17 +110,30 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; - // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); - let stream = body_stream(req_body); + let (req_head, mut req_body) = req.into_parts(); + + // Before we stream the body, configure the needed checksums. + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as an etag of the + // part, which is in turn used in the etag computation of the whole object + req_body.add_md5(); + } + + let (stream, stream_checksums) = req_body.streaming_with_checksums(); + let stream = stream.map_err(Error::from); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); + // Read first chuck, and at the same time try to get object to see if it exists let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; @@ -171,21 +190,21 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; - // Verify that checksums map - checksums.verify(&expected_checksums)?; + // Verify that checksums match + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); @@ -247,7 +266,6 @@ pub async fn handle_complete_multipart_upload( req: Request<ReqBody>, key: &str, upload_id: &str, - content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { let ReqCtx { garage, @@ -259,11 +277,7 @@ pub async fn handle_complete_multipart_upload( let expected_checksum = request_checksum_value(&req_head.headers)?; - let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes(); - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } + let body = req_body.collect().await?; let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) @@ -429,7 +443,16 @@ pub async fn handle_complete_multipart_upload( // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { xmlns: (), - location: None, + // FIXME: the location returned is not always correct: + // - we always return https, but maybe some people do http + // - if root_domain is not specified, a full URL is not returned + location: garage + .config + .s3_api + .root_domain + .as_ref() + .map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key))) + .or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))), bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), @@ -592,3 +615,99 @@ fn parse_complete_multipart_upload_body( Some(parts) } + +// ====== checksummer ==== + +#[derive(Default)] +pub(crate) struct MultipartChecksummer { + pub md5: Md5, + pub extra: Option<MultipartExtraChecksummer>, +} + +pub(crate) enum MultipartExtraChecksummer { + Crc32(Crc32), + Crc32c(Crc32c), + Sha1(Sha1), + Sha256(Sha256), +} + +impl MultipartChecksummer { + pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self { + Self { + md5: Md5::new(), + extra: match algo { + None => None, + Some(ChecksumAlgorithm::Crc32) => { + Some(MultipartExtraChecksummer::Crc32(Crc32::new())) + } + Some(ChecksumAlgorithm::Crc32c) => { + Some(MultipartExtraChecksummer::Crc32c(Crc32c::default())) + } + Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())), + Some(ChecksumAlgorithm::Sha256) => { + Some(MultipartExtraChecksummer::Sha256(Sha256::new())) + } + }, + } + } + + pub(crate) fn update( + &mut self, + etag: &str, + checksum: Option<ChecksumValue>, + ) -> Result<(), Error> { + self.md5 + .update(&hex::decode(&etag).ok_or_message("invalid etag hex")?); + match (&mut self.extra, checksum) { + (None, _) => (), + ( + Some(MultipartExtraChecksummer::Crc32(ref mut crc32)), + Some(ChecksumValue::Crc32(x)), + ) => { + crc32.update(&x); + } + ( + Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)), + Some(ChecksumValue::Crc32c(x)), + ) => { + crc32c.write(&x); + } + (Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => { + sha1.update(&x); + } + ( + Some(MultipartExtraChecksummer::Sha256(ref mut sha256)), + Some(ChecksumValue::Sha256(x)), + ) => { + sha256.update(&x); + } + (Some(_), b) => { + return Err(Error::internal_error(format!( + "part checksum was not computed correctly, got: {:?}", + b + ))) + } + } + Ok(()) + } + + pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) { + let md5 = self.md5.finalize()[..].try_into().unwrap(); + let extra = match self.extra { + None => None, + Some(MultipartExtraChecksummer::Crc32(crc32)) => { + Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize()))) + } + Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c( + u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()), + )), + Some(MultipartExtraChecksummer::Sha1(sha1)) => { + Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap())) + } + Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256( + sha256.finalize()[..].try_into().unwrap(), + )), + }; + (md5, extra) + } +} |