diff options
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 221 |
1 files changed, 106 insertions, 115 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 941e4122..1e3b1b44 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,12 +1,9 @@ use std::collections::HashMap; use std::sync::Arc; -use base64::prelude::*; use futures::prelude::*; use futures::stream::FuturesOrdered; use futures::try_join; -use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; -use sha2::Sha256; use tokio::sync::mpsc; @@ -22,7 +19,6 @@ use opentelemetry::{ use garage_net::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; -use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -36,16 +32,22 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::checksum::*; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; -pub struct SaveStreamResult { - pub version_uuid: Uuid, - pub version_timestamp: u64, +pub(crate) struct SaveStreamResult { + pub(crate) version_uuid: Uuid, + pub(crate) version_timestamp: u64, /// Etag WITHOUT THE QUOTES (just the hex value) - pub etag: String, + pub(crate) etag: String, +} + +pub(crate) enum ChecksumMode<'a> { + Verify(&'a ExpectedChecksums), + Calculate(Option<ChecksumAlgorithm>), } pub async fn handle_put( @@ -58,24 +60,32 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); - // Determine whether object should be encrypted, and if so the key - let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let expected_checksums = ExpectedChecksums { + md5: match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }, + sha256: content_sha256, + extra: request_checksum_value(req.headers())?, + }; - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, + let meta = ObjectVersionMetaInner { + headers, + checksum: expected_checksums.extra, }; + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let stream = body_stream(req.into_body()); let res = save_stream( &ctx, - headers, + meta, encryption, stream, key, - content_md5, - content_sha256, + ChecksumMode::Verify(&expected_checksums), ) .await?; @@ -83,17 +93,17 @@ pub async fn handle_put( .header("x-amz-version-id", hex::encode(res.version_uuid)) .header("ETag", format!("\"{}\"", res.etag)); encryption.add_response_headers(&mut resp); + let resp = add_checksum_response_headers(&expected_checksums.extra, resp); Ok(resp.body(empty_body())?) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, - headers: ObjectVersionHeaders, + mut meta: ObjectVersionMetaInner, encryption: EncryptionParams, body: S, key: &String, - content_md5: Option<String>, - content_sha256: Option<FixedBytes32>, + checksum_mode: ChecksumMode<'_>, ) -> Result<SaveStreamResult, Error> { let ReqCtx { garage, bucket_id, .. @@ -107,32 +117,36 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let first_block = first_block_opt.unwrap_or_default(); - let object_encryption = encryption.encrypt_headers(headers)?; - // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); + let mut checksummer = match checksum_mode { + ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), + ChecksumMode::Calculate(algo) => { + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + } + }; + // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { - let mut md5sum = Md5::new(); - md5sum.update(&first_block[..]); - let data_md5sum = md5sum.finalize(); - - let data_sha256sum = sha256sum(&first_block[..]); + checksummer.update(&first_block); + let checksums = checksummer.finalize(); - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; let size = first_block.len() as u64; check_quotas(ctx, size, existing_object.as_ref()).await?; - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); let inline_data = encryption.encrypt_blob(&first_block)?.to_vec(); let object_version = ObjectVersion { @@ -140,7 +154,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - encryption: object_encryption, + encryption: encryption.encrypt_meta(meta)?, size, etag: etag.clone(), }, @@ -175,7 +189,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - encryption: object_encryption.clone(), + encryption: encryption.encrypt_meta(meta.clone())?, + checksum_algorithm: None, // don't care; overwritten later multipart: false, }, }; @@ -196,25 +211,37 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ); garage.version_table.insert(&version).await?; - // Transfer data and verify checksum - let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?; + // Transfer data + let (total_size, checksums, first_block_hash) = read_and_put_blocks( + ctx, + &version, + encryption, + 1, + first_block, + &mut chunker, + checksummer, + ) + .await?; - ensure_checksum_matches( - &data_md5sum, - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; + // Verify checksums are ok / add calculated checksum to metadata + match checksum_mode { + ChecksumMode::Verify(expected) => { + checksums.verify(&expected)?; + } + ChecksumMode::Calculate(algo) => { + meta.checksum = checksums.extract(algo); + } + }; + // Verify quotas are respsected check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete - let etag = encryption.etag_from_md5(&data_md5sum); + let etag = encryption.etag_from_md5(&checksums.md5); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - encryption: object_encryption, + encryption: encryption.encrypt_meta(meta)?, size: total_size, etag: etag.clone(), }, @@ -234,33 +261,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }) } -/// Validate MD5 sum against content-md5 header -/// and sha256sum against signed content-sha256 -pub(crate) fn ensure_checksum_matches( - data_md5sum: &[u8], - data_sha256sum: garage_util::data::FixedBytes32, - content_md5: Option<&str>, - content_sha256: Option<garage_util::data::FixedBytes32>, -) -> Result<(), Error> { - if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != data_sha256sum { - return Err(Error::bad_request( - "Unable to validate x-amz-content-sha256", - )); - } else { - trace!("Successfully validated x-amz-content-sha256"); - } - } - if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { - return Err(Error::bad_request("Unable to validate content-md5")); - } else { - trace!("Successfully validated content-md5"); - } - } - Ok(()) -} - /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( ctx: &ReqCtx, @@ -332,7 +332,8 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + part_number: u64, first_block: Bytes, chunker: &mut StreamChunker<S>, -) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> { + checksummer: Checksummer, +) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2); @@ -360,20 +361,20 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1); let hash_stream = async { - let md5hasher = AsyncHasher::<Md5>::new(); - let sha256hasher = AsyncHasher::<Sha256>::new(); + let mut checksummer = checksummer; while let Some(next) = block_rx.recv().await { match next { Ok(block) => { block_tx2.send(Ok(block.clone())).await?; - futures::future::join( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - ) + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&block); + checksummer + }) .with_context(Context::current_with_span( tracer.start("Hash block (md5, sha256)"), )) - .await; + .await + .unwrap() } Err(e) => { block_tx2.send(Err(e)).await?; @@ -382,10 +383,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } } drop(block_tx2); - Ok::<_, mpsc::error::SendError<_>>(futures::join!( - md5hasher.finalize(), - sha256hasher.finalize() - )) + Ok::<_, mpsc::error::SendError<_>>(checksummer) }; let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1); @@ -395,33 +393,28 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + match next { Ok(block) => { let unencrypted_len = block.len() as u64; - let block = if encryption.is_encrypted() { - let res = - tokio::task::spawn_blocking(move || encryption.encrypt_block(block)) - .with_context(Context::current_with_span( - tracer.start("Encrypt block"), - )) - .await - .unwrap(); - match res { - Ok(b) => b, - Err(e) => { - block_tx3.send(Err(e)).await?; - break; + let res = tokio::task::spawn_blocking(move || { + let block = encryption.encrypt_block(block)?; + let hash = blake2sum(&block); + Ok((block, hash)) + }) + .with_context(Context::current_with_span( + tracer.start("Encrypt and hash (blake2) block"), + )) + .await + .unwrap(); + match res { + Ok((block, hash)) => { + if first_block_hash.is_none() { + first_block_hash = Some(hash); } + block_tx3.send(Ok((block, unencrypted_len, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; } - } else { - block - }; - let hash = async_blake2sum(block.clone()) - .with_context(Context::current_with_span( - tracer.start("Hash block (blake2)"), - )) - .await; - if first_block_hash.is_none() { - first_block_hash = Some(hash); } - block_tx3.send(Ok((block, unencrypted_len, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -493,12 +486,10 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed // later in the pipeline which already caused a return at the ? on previous line - let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); let first_block_hash = block_hash_result.unwrap(); + let checksums = stream_hash_result.unwrap().finalize(); - let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - - Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) + Ok((total_size, checksums, first_block_hash)) } async fn put_block_and_meta( @@ -609,7 +600,7 @@ impl Drop for InterruptedCleanup { // ============ helpers ============ -pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> { +pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> { let mut ret = Vec::new(); // Preserve standard headers @@ -637,7 +628,7 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers } } - Ok(ObjectVersionHeaders(ret)) + Ok(ret) } pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { |