diff options
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 176 |
1 files changed, 102 insertions, 74 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 2ced0580..941e4122 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use base64::prelude::*; @@ -36,10 +36,18 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; +pub struct SaveStreamResult { + pub version_uuid: Uuid, + pub version_timestamp: u64, + /// Etag WITHOUT THE QUOTES (just the hex value) + pub etag: String, +} + pub async fn handle_put( ctx: ReqCtx, req: Request<ReqBody>, @@ -50,6 +58,9 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, @@ -57,19 +68,33 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream(&ctx, headers, stream, key, content_md5, content_sha256) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + let res = save_stream( + &ctx, + headers, + encryption, + stream, + key, + content_md5, + content_sha256, + ) + .await?; + + let mut resp = Response::builder() + .header("x-amz-version-id", hex::encode(res.version_uuid)) + .header("ETag", format!("\"{}\"", res.etag)); + encryption.add_response_headers(&mut resp); + Ok(resp.body(empty_body())?) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, headers: ObjectVersionHeaders, + encryption: EncryptionParams, body: S, key: &String, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, -) -> Result<(Uuid, String), Error> { +) -> Result<SaveStreamResult, Error> { let ReqCtx { garage, bucket_id, .. } = ctx; @@ -82,6 +107,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let first_block = first_block_opt.unwrap_or_default(); + let object_encryption = encryption.encrypt_headers(headers)?; + // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); @@ -92,37 +119,43 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let mut md5sum = Md5::new(); md5sum.update(&first_block[..]); let data_md5sum = md5sum.finalize(); - let data_md5sum_hex = hex::encode(data_md5sum); let data_sha256sum = sha256sum(&first_block[..]); - let size = first_block.len() as u64; ensure_checksum_matches( - data_md5sum.as_slice(), + &data_md5sum, data_sha256sum, content_md5.as_deref(), content_sha256, )?; + let size = first_block.len() as u64; check_quotas(ctx, size, existing_object.as_ref()).await?; + let etag = encryption.etag_from_md5(&data_md5sum); + let inline_data = encryption.encrypt_blob(&first_block)?.to_vec(); + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - headers, + encryption: object_encryption, size, - etag: data_md5sum_hex.clone(), + etag: etag.clone(), }, - first_block.to_vec(), + inline_data, )), }; let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - return Ok((version_uuid, data_md5sum_hex)); + return Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }); } // The following consists in many steps that can each fail. @@ -142,7 +175,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - headers: headers.clone(), + encryption: object_encryption.clone(), multipart: false, }, }; @@ -165,10 +198,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?; ensure_checksum_matches( - data_md5sum.as_slice(), + &data_md5sum, data_sha256sum, content_md5.as_deref(), content_sha256, @@ -177,12 +210,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(data_md5sum); + let etag = encryption.etag_from_md5(&data_md5sum); + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, - etag: md5sum_hex.clone(), + etag: etag.clone(), }, first_block_hash, )); @@ -193,7 +227,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // We won't have to clean up on drop. interrupted_cleanup.cancel(); - Ok((version_uuid, md5sum_hex)) + Ok(SaveStreamResult { + version_uuid, + version_timestamp, + etag, + }) } /// Validate MD5 sum against content-md5 header @@ -290,6 +328,7 @@ pub(crate) async fn check_quotas( pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( ctx: &ReqCtx, version: &Version, + encryption: EncryptionParams, part_number: u64, first_block: Bytes, chunker: &mut StreamChunker<S>, @@ -349,12 +388,31 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + )) }; - let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1); - let hash_blocks = async { + let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1); + let encrypt_hash_blocks = async { let mut first_block_hash = None; while let Some(next) = block_rx2.recv().await { match next { Ok(block) => { + let unencrypted_len = block.len() as u64; + let block = if encryption.is_encrypted() { + let res = + tokio::task::spawn_blocking(move || encryption.encrypt_block(block)) + .with_context(Context::current_with_span( + tracer.start("Encrypt block"), + )) + .await + .unwrap(); + match res { + Ok(b) => b, + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } + } else { + block + }; let hash = async_blake2sum(block.clone()) .with_context(Context::current_with_span( tracer.start("Hash block (blake2)"), @@ -363,7 +421,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + if first_block_hash.is_none() { first_block_hash = Some(hash); } - block_tx3.send(Ok((block, hash))).await?; + block_tx3.send(Ok((block, unencrypted_len, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -398,7 +456,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + block_rx3.recv().await } }; - let (block, hash) = tokio::select! { + let (block, unencrypted_len, hash) = tokio::select! { result = write_futs_next => { result?; continue; @@ -410,17 +468,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; // For next block to be written: count its size and spawn future to write it - let offset = written_bytes; - written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( ctx, version, part_number, - offset, + written_bytes, hash, block, + unencrypted_len, + encryption.is_encrypted(), order_stream.order(written_bytes), )); + written_bytes += unencrypted_len; } while let Some(res) = write_futs.next().await { res?; @@ -429,7 +488,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; let (_, stream_hash_result, block_hash_result, final_result) = - futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); + futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks); let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed @@ -449,6 +508,8 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + size: u64, + is_encrypted: bool, order_tag: OrderTag, ) -> Result<(), GarageError> { let ReqCtx { garage, .. } = ctx; @@ -459,10 +520,7 @@ async fn put_block_and_meta( part_number, offset, }, - VersionBlock { - hash, - size: block.len() as u64, - }, + VersionBlock { hash, size }, ); let block_ref = BlockRef { @@ -474,7 +532,7 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), + .rpc_put_block(hash, block, is_encrypted, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; @@ -517,14 +575,6 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> { - Response::builder() - .header("x-amz-version-id", hex::encode(version_uuid)) - .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(empty_body()) - .unwrap() -} - struct InterruptedCleanup(Option<InterruptedCleanupInner>); struct InterruptedCleanupInner { garage: Arc<Garage>, @@ -559,57 +609,35 @@ impl Drop for InterruptedCleanup { // ============ helpers ============ -pub(crate) fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { - Ok(headers - .get(hyper::header::CONTENT_TYPE) - .map(|x| x.to_str()) - .unwrap_or(Ok("blob"))? - .to_string()) -} - pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> { - let content_type = get_mime_type(headers)?; - let mut other = BTreeMap::new(); + let mut ret = Vec::new(); // Preserve standard headers let standard_header = vec![ + hyper::header::CONTENT_TYPE, hyper::header::CACHE_CONTROL, hyper::header::CONTENT_DISPOSITION, hyper::header::CONTENT_ENCODING, hyper::header::CONTENT_LANGUAGE, hyper::header::EXPIRES, ]; - for h in standard_header.iter() { - if let Some(v) = headers.get(h) { - match v.to_str() { - Ok(v_str) => { - other.insert(h.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", h, e); - } - } + for name in standard_header.iter() { + if let Some(value) = headers.get(name) { + ret.push((name.to_string(), value.to_str()?.to_string())); } } // Preserve x-amz-meta- headers - for (k, v) in headers.iter() { - if k.as_str().starts_with("x-amz-meta-") { - match std::str::from_utf8(v.as_bytes()) { - Ok(v_str) => { - other.insert(k.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", k, e); - } - } + for (name, value) in headers.iter() { + if name.as_str().starts_with("x-amz-meta-") { + ret.push(( + name.to_string(), + std::str::from_utf8(value.as_bytes())?.to_string(), + )); } } - Ok(ObjectVersionHeaders { - content_type, - other, - }) + Ok(ObjectVersionHeaders(ret)) } pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { |