diff options
-rw-r--r-- | src/api/s3/copy.rs | 2 | ||||
-rw-r--r-- | src/api/s3/get.rs | 15 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 15 | ||||
-rw-r--r-- | src/api/s3/put.rs | 50 | ||||
-rw-r--r-- | src/block/manager.rs | 4 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 7 |
6 files changed, 63 insertions, 30 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index b370b709..90149ef6 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -405,7 +405,7 @@ pub async fn handle_upload_part_copy( if must_upload { garage2 .block_manager - .rpc_put_block(final_hash, data, None) + .rpc_put_block(final_hash, data, false, None) .await } else { Ok(()) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 9fe4d63e..5e2a9830 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -172,15 +172,16 @@ pub async fn handle_head( if let Some(pn) = part_number { match version_data { - ObjectVersionData::Inline(_, bytes) => { + ObjectVersionData::Inline(_, _) => { if pn != 1 { return Err(Error::InvalidPart); } + let bytes_len = version_meta.size; Ok(object_headers(object_version, version_meta, &headers) - .header(CONTENT_LENGTH, format!("{}", bytes.len())) + .header(CONTENT_LENGTH, format!("{}", bytes_len)) .header( CONTENT_RANGE, - format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()), + format!("bytes 0-{}/{}", bytes_len - 1, bytes_len), ) .header(X_AMZ_MP_PARTS_COUNT, "1") .status(StatusCode::PARTIAL_CONTENT) @@ -312,7 +313,8 @@ async fn handle_get_full( match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) + let bytes = encryption.decrypt_blob(&bytes)?; + Ok(resp_builder.body(bytes_body(bytes.into_owned().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { let (tx, rx) = mpsc::channel::<ByteStream>(2); @@ -387,6 +389,7 @@ async fn handle_get_range( match &version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { + let bytes = encryption.decrypt_blob(&bytes)?; if end as usize <= bytes.len() { let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) @@ -428,6 +431,8 @@ async fn handle_get_part( if part_number != 1 { return Err(Error::InvalidPart); } + let bytes = encryption.decrypt_blob(&bytes)?; + assert_eq!(bytes.len() as u64, version_meta.size); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", bytes.len())) .header( @@ -435,7 +440,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(bytes_body(bytes.to_vec().into()))?) + .body(bytes_body(bytes.into_owned().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index a7c549f6..3b542f01 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -115,10 +115,6 @@ pub async fn handle_put_part( &Request::from_parts(req_head, empty_body::<Error>()), &object_encryption, )?; - if encryption.is_encrypted() { - // TODO - unimplemented!("encryption for mpu"); - } // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -159,8 +155,15 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; + let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks( + &garage, + &version, + encryption, + part_number, + first_block, + &mut chunker, + ) + .await?; // Verify that checksums map ensure_checksum_matches( diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index fdcd5aca..866ce90a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -136,7 +136,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( size, etag: data_md5sum_hex.clone(), }, - first_block.to_vec(), + encryption.encrypt_blob(&first_block)?.to_vec(), )), }; @@ -186,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(&garage, &version, encryption, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -305,6 +305,7 @@ pub(crate) async fn check_quotas( pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, + encryption: EncryptionParams, part_number: u64, first_block: Bytes, chunker: &mut StreamChunker<S>, @@ -364,12 +365,31 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + )) }; - let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1); - let hash_blocks = async { + let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1); + let encrypt_hash_blocks = async { let mut first_block_hash = None; while let Some(next) = block_rx2.recv().await { match next { Ok(block) => { + let block_len = block.len() as u64; + let block = if encryption.is_encrypted() { + let res = + tokio::task::spawn_blocking(move || encryption.encrypt_block(block)) + .with_context(Context::current_with_span( + tracer.start("Encrypt block"), + )) + .await + .unwrap(); + match res { + Ok(b) => b, + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } + } else { + block + }; let hash = async_blake2sum(block.clone()) .with_context(Context::current_with_span( tracer.start("Hash block (blake2)"), @@ -378,7 +398,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + if first_block_hash.is_none() { first_block_hash = Some(hash); } - block_tx3.send(Ok((block, hash))).await?; + block_tx3.send(Ok((block, block_len, hash))).await?; } Err(e) => { block_tx3.send(Err(e)).await?; @@ -413,7 +433,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + block_rx3.recv().await } }; - let (block, hash) = tokio::select! { + let (block, unencrypted_len, hash) = tokio::select! { result = write_futs_next => { result?; continue; @@ -425,17 +445,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; // For next block to be written: count its size and spawn future to write it - let offset = written_bytes; - written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( garage, version, part_number, - offset, + written_bytes, hash, block, + unencrypted_len, + encryption.is_encrypted(), order_stream.order(written_bytes), )); + written_bytes += unencrypted_len; } while let Some(res) = write_futs.next().await { res?; @@ -444,7 +465,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; let (_, stream_hash_result, block_hash_result, final_result) = - futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); + futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks); let total_size = final_result?; // unwrap here is ok, because if hasher failed, it is because something failed @@ -464,6 +485,8 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + size: u64, + is_encrypted: bool, order_tag: OrderTag, ) -> Result<(), GarageError> { let mut version = version.clone(); @@ -472,10 +495,7 @@ async fn put_block_and_meta( part_number, offset, }, - VersionBlock { - hash, - size: block.len() as u64, - }, + VersionBlock { hash, size }, ); let block_ref = BlockRef { @@ -487,7 +507,7 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), + .rpc_put_block(hash, block, is_encrypted, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; diff --git a/src/block/manager.rs b/src/block/manager.rs index f4d8ee56..4962300a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -352,11 +352,13 @@ impl BlockManager { &self, hash: Hash, data: Bytes, + prevent_compression: bool, order_tag: Option<OrderTag>, ) -> Result<(), Error> { let who = self.replication.write_sets(&hash); - let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + let compression_level = self.compression_level.filter(|_| !prevent_compression); + let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); let put_block_rpc = diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 20824e8d..7fa4b9e0 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -263,7 +263,9 @@ mod v010 { pub enum ObjectVersionData { /// The object was deleted, this Version is a tombstone to mark it as such DeleteMarker, - /// The object is short, it's stored inlined + /// The object is short, it's stored inlined. + /// It is never compressed. For encrypted objects, it is encrypted using + /// AES256-GCM, like the encrypted headers. Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), /// The object is not short, Hash of first block is stored here, next segments hashes are /// stored in the version table @@ -273,7 +275,8 @@ mod v010 { /// Metadata about the object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { - /// Size of the object + /// Size of the object. If object is encrypted/compressed, + /// this is always the size of the unencrypted/uncompressed data pub size: u64, /// etag of the object pub etag: String, |