aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/s3/copy.rs2
-rw-r--r--src/api/s3/get.rs15
-rw-r--r--src/api/s3/multipart.rs15
-rw-r--r--src/api/s3/put.rs50
-rw-r--r--src/block/manager.rs4
-rw-r--r--src/model/s3/object_table.rs7
6 files changed, 63 insertions, 30 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index b370b709..90149ef6 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -405,7 +405,7 @@ pub async fn handle_upload_part_copy(
if must_upload {
garage2
.block_manager
- .rpc_put_block(final_hash, data, None)
+ .rpc_put_block(final_hash, data, false, None)
.await
} else {
Ok(())
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 9fe4d63e..5e2a9830 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -172,15 +172,16 @@ pub async fn handle_head(
if let Some(pn) = part_number {
match version_data {
- ObjectVersionData::Inline(_, bytes) => {
+ ObjectVersionData::Inline(_, _) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
+ let bytes_len = version_meta.size;
Ok(object_headers(object_version, version_meta, &headers)
- .header(CONTENT_LENGTH, format!("{}", bytes.len()))
+ .header(CONTENT_LENGTH, format!("{}", bytes_len))
.header(
CONTENT_RANGE,
- format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
+ format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
@@ -312,7 +313,8 @@ async fn handle_get_full(
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
- Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
+ let bytes = encryption.decrypt_blob(&bytes)?;
+ Ok(resp_builder.body(bytes_body(bytes.into_owned().into()))?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2);
@@ -387,6 +389,7 @@ async fn handle_get_range(
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
+ let bytes = encryption.decrypt_blob(&bytes)?;
if end as usize <= bytes.len() {
let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
@@ -428,6 +431,8 @@ async fn handle_get_part(
if part_number != 1 {
return Err(Error::InvalidPart);
}
+ let bytes = encryption.decrypt_blob(&bytes)?;
+ assert_eq!(bytes.len() as u64, version_meta.size);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
@@ -435,7 +440,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
- .body(bytes_body(bytes.to_vec().into()))?)
+ .body(bytes_body(bytes.into_owned().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index a7c549f6..3b542f01 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -115,10 +115,6 @@ pub async fn handle_put_part(
&Request::from_parts(req_head, empty_body::<Error>()),
&object_encryption,
)?;
- if encryption.is_encrypted() {
- // TODO
- unimplemented!("encryption for mpu");
- }
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@@ -159,8 +155,15 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
- let (total_size, data_md5sum, data_sha256sum, _) =
- read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?;
+ let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
+ &garage,
+ &version,
+ encryption,
+ part_number,
+ first_block,
+ &mut chunker,
+ )
+ .await?;
// Verify that checksums map
ensure_checksum_matches(
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index fdcd5aca..866ce90a 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -136,7 +136,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
size,
etag: data_md5sum_hex.clone(),
},
- first_block.to_vec(),
+ encryption.encrypt_blob(&first_block)?.to_vec(),
)),
};
@@ -186,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
- read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?;
+ read_and_put_blocks(&garage, &version, encryption, 1, first_block, &mut chunker).await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
@@ -305,6 +305,7 @@ pub(crate) async fn check_quotas(
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
+ encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
chunker: &mut StreamChunker<S>,
@@ -364,12 +365,31 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
))
};
- let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1);
- let hash_blocks = async {
+ let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1);
+ let encrypt_hash_blocks = async {
let mut first_block_hash = None;
while let Some(next) = block_rx2.recv().await {
match next {
Ok(block) => {
+ let block_len = block.len() as u64;
+ let block = if encryption.is_encrypted() {
+ let res =
+ tokio::task::spawn_blocking(move || encryption.encrypt_block(block))
+ .with_context(Context::current_with_span(
+ tracer.start("Encrypt block"),
+ ))
+ .await
+ .unwrap();
+ match res {
+ Ok(b) => b,
+ Err(e) => {
+ block_tx3.send(Err(e)).await?;
+ break;
+ }
+ }
+ } else {
+ block
+ };
let hash = async_blake2sum(block.clone())
.with_context(Context::current_with_span(
tracer.start("Hash block (blake2)"),
@@ -378,7 +398,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
if first_block_hash.is_none() {
first_block_hash = Some(hash);
}
- block_tx3.send(Ok((block, hash))).await?;
+ block_tx3.send(Ok((block, block_len, hash))).await?;
}
Err(e) => {
block_tx3.send(Err(e)).await?;
@@ -413,7 +433,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
block_rx3.recv().await
}
};
- let (block, hash) = tokio::select! {
+ let (block, unencrypted_len, hash) = tokio::select! {
result = write_futs_next => {
result?;
continue;
@@ -425,17 +445,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
// For next block to be written: count its size and spawn future to write it
- let offset = written_bytes;
- written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
garage,
version,
part_number,
- offset,
+ written_bytes,
hash,
block,
+ unencrypted_len,
+ encryption.is_encrypted(),
order_stream.order(written_bytes),
));
+ written_bytes += unencrypted_len;
}
while let Some(res) = write_futs.next().await {
res?;
@@ -444,7 +465,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
let (_, stream_hash_result, block_hash_result, final_result) =
- futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks);
+ futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks);
let total_size = final_result?;
// unwrap here is ok, because if hasher failed, it is because something failed
@@ -464,6 +485,8 @@ async fn put_block_and_meta(
offset: u64,
hash: Hash,
block: Bytes,
+ size: u64,
+ is_encrypted: bool,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let mut version = version.clone();
@@ -472,10 +495,7 @@ async fn put_block_and_meta(
part_number,
offset,
},
- VersionBlock {
- hash,
- size: block.len() as u64,
- },
+ VersionBlock { hash, size },
);
let block_ref = BlockRef {
@@ -487,7 +507,7 @@ async fn put_block_and_meta(
futures::try_join!(
garage
.block_manager
- .rpc_put_block(hash, block, Some(order_tag)),
+ .rpc_put_block(hash, block, is_encrypted, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index f4d8ee56..4962300a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -352,11 +352,13 @@ impl BlockManager {
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 20824e8d..7fa4b9e0 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -263,7 +263,9 @@ mod v010 {
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
- /// The object is short, it's stored inlined
+ /// The object is short, it's stored inlined.
+ /// It is never compressed. For encrypted objects, it is encrypted using
+ /// AES256-GCM, like the encrypted headers.
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
@@ -273,7 +275,8 @@ mod v010 {
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
- /// Size of the object
+ /// Size of the object. If object is encrypted/compressed,
+ /// this is always the size of the unencrypted/uncompressed data
pub size: u64,
/// etag of the object
pub etag: String,