From 57acc60082089e1a24fd47588f6ff3cb20ed4eef Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 16:49:50 +0100 Subject: [sse-c] Implement SSE-C encryption --- src/api/s3/put.rs | 128 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 39 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 2ced0580..745c2219 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -36,10 +36,18 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; +pub struct SaveStreamResult { + pub version_uuid: Uuid, + pub version_timestamp: u64, + /// Etag WITHOUT THE QUOTES (just the hex value) + pub etag: String, +} + pub async fn handle_put( ctx: ReqCtx, req: Request, @@ -50,6 +58,9 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, @@ -57,19 +68,33 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream(&ctx, headers, stream, key, content_md5, content_sha256) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + let res = save_stream( + &ctx, + headers, + encryption, + stream, + key, + content_md5, + content_sha256, + ) + .await?; + + let mut resp = Response::builder() + .header("x-amz-version-id", hex::encode(res.version_uuid)) + .header("ETag", format!("\"{}\"", res.etag)); + encryption.add_response_headers(&mut resp); + Ok(resp.body(empty_body())?) } pub(crate) async fn save_stream> + Unpin>( ctx: &ReqCtx, headers: ObjectVersionHeaders, + encryption: EncryptionParams, body: S, key: &String, content_md5: Option, content_sha256: Option, -) -> Result<(Uuid, String), Error> { +) -> Result { let ReqCtx { garage, bucket_id, .. } = ctx; @@ -82,6 +107,8 @@ pub(crate) async fn save_stream> + Unpin>( let first_block = first_block_opt.unwrap_or_default(); + let object_encryption = encryption.encrypt_headers(headers)?; + // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); @@ -92,37 +119,43 @@ pub(crate) async fn save_stream> + Unpin>( let mut md5sum = Md5::new(); md5sum.update(&first_block[..]); let data_md5sum = md5sum.finalize(); - let data_md5sum_hex = hex::encode(data_md5sum); let data_sha256sum = sha256sum(&first_block[..]); - let size = first_block.len() as u64; ensure_checksum_matches( - data_md5sum.as_slice(), + &data_md5sum, data_sha256sum, content_md5.as_deref(), content_sha256, )?; + let size = first_block.len() as u64; check_quotas(ctx, size, existing_object.as_ref()).await?; + let etag = encryption.etag_from_md5(&data_md5sum); + let inline_data = encryption.encrypt_blob(&first_block)?.to_vec(); + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - headers, + encryption: object_encryption, size, - etag: data_md5sum_hex.clone(), + etag: etag.clone(), }, - first_block.to_vec(), + inline_data, )), }; let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - return Ok((version_uuid, data_md5sum_hex)); + return Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }); } // The following consists in many steps that can each fail. @@ -142,7 +175,7 @@ pub(crate) async fn save_stream> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - headers: headers.clone(), + encryption: object_encryption.clone(), multipart: false, }, }; @@ -165,10 +198,10 @@ pub(crate) async fn save_stream> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?; ensure_checksum_matches( - data_md5sum.as_slice(), + &data_md5sum, data_sha256sum, content_md5.as_deref(), content_sha256, @@ -177,12 +210,13 @@ pub(crate) async fn save_stream> + Unpin>( check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&data_md5sum); + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, - etag: md5sum_hex.clone(), + etag: etag.clone(), }, first_block_hash, )); @@ -193,7 +227,11 @@ pub(crate) async fn save_stream> + Unpin>( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - Ok((version_uuid, md5sum_hex)) + Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }) } /// Validate MD5 sum against content-md5 header @@ -290,6 +328,7 @@ pub(crate) async fn check_quotas( pub(crate) async fn read_and_put_blocks> + Unpin>( ctx: &ReqCtx, version: &Version, + encryption: EncryptionParams, part_number: u64, first_block: Bytes, chunker: &mut StreamChunker, @@ -349,12 +388,31 @@ pub(crate) async fn read_and_put_blocks> + )) }; - let (block_tx3, mut block_rx3) = mpsc::channel::>(1); - let hash_blocks = async { + let (block_tx3, mut block_rx3) = mpsc::channel::>(1); + let encrypt_hash_blocks = async { let mut first_block_hash = None; while let Some(next) = block_rx2.recv().await { match next { Ok(block) => { + let unencrypted_len = block.len() as u64; + let block = if encryption.is_encrypted() { + let res = + tokio::task::spawn_blocking(move || encryption.encrypt_block(block)) + .with_context(Context::current_with_span( + tracer.start("Encrypt block"), + )) + .await + .unwrap(); + match res { + Ok(b) => b, + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } + } else { + block + }; let hash = async_blake2sum(block.clone()) .with_context(Context::current_with_span( tracer.start("Hash block (blake2)"), @@ -363,7 +421,7 @@ pub(crate) async fn read_and_put_blocks> + if first_block_hash.is_none() { first_block_hash = Some(hash); } - block_tx3.send(Ok((block, hash))).await?; + block_tx3.send(Ok((block, unencrypted_len, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -398,7 +456,7 @@ pub(crate) async fn read_and_put_blocks> + block_rx3.recv().await } }; - let (block, hash) = tokio::select! { + let (block, unencrypted_len, hash) = tokio::select! { result = write_futs_next => { result?; continue; @@ -410,17 +468,18 @@ pub(crate) async fn read_and_put_blocks> + }; // For next block to be written: count its size and spawn future to write it - let offset = written_bytes; - written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( ctx, version, part_number, - offset, + written_bytes, hash, block, + unencrypted_len, + encryption.is_encrypted(), order_stream.order(written_bytes), )); + written_bytes += unencrypted_len; } while let Some(res) = write_futs.next().await { res?; @@ -429,7 +488,7 @@ pub(crate) async fn read_and_put_blocks> + }; let (_, stream_hash_result, block_hash_result, final_result) = - futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); + futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks); let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed @@ -449,6 +508,8 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + size: u64, + is_encrypted: bool, order_tag: OrderTag, ) -> Result<(), GarageError> { let ReqCtx { garage, .. } = ctx; @@ -459,10 +520,7 @@ async fn put_block_and_meta( part_number, offset, }, - VersionBlock { - hash, - size: block.len() as u64, - }, + VersionBlock { hash, size }, ); let block_ref = BlockRef { @@ -474,7 +532,7 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), + .rpc_put_block(hash, block, is_encrypted, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; @@ -517,14 +575,6 @@ impl> + Unpin> StreamChunker { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { - Response::builder() - .header("x-amz-version-id", hex::encode(version_uuid)) - .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(empty_body()) - .unwrap() -} - struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, -- cgit v1.2.3