diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-23 19:31:22 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-27 10:15:17 +0100 |
commit | 439e7cb39f76852494ff9eb8ead499e29685af3f (patch) | |
tree | 8a5610874454381b630552b25cef54c1529475af | |
parent | 80ffbc7d9a53e00317281e8f62d902cb464bfc81 (diff) | |
download | garage-439e7cb39f76852494ff9eb8ead499e29685af3f.tar.gz garage-439e7cb39f76852494ff9eb8ead499e29685af3f.zip |
[sse-c] WIP: DecryptStream
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 117 | ||||
-rw-r--r-- | src/api/s3/get.rs | 14 |
4 files changed, 118 insertions, 17 deletions
@@ -1377,6 +1377,7 @@ name = "garage_api" version = "0.10.0" dependencies = [ "aes-gcm", + "async-compression", "async-trait", "base64 0.21.7", "bytes", @@ -1417,6 +1418,7 @@ dependencies = [ "sha2", "tokio", "tokio-stream", + "tokio-util 0.7.10", "tracing", "url", ] diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index c759d5a4..8099f765 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -22,6 +22,7 @@ garage_util.workspace = true garage_rpc.workspace = true aes-gcm.workspace = true +async-compression.workspace = true async-trait.workspace = true base64.workspace = true bytes.workspace = true @@ -41,6 +42,7 @@ futures.workspace = true futures-util.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true form_urlencoded.workspace = true http.workspace = true diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 2d403ff3..1515d903 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -1,17 +1,25 @@ use std::borrow::Cow; +use std::pin::Pin; use aes_gcm::{ - aead::{Aead, AeadCore, KeyInit, OsRng}, + aead::{stream, Aead, AeadCore, KeyInit, OsRng}, Aes256Gcm, Key, Nonce, }; use base64::prelude::*; +use bytes::Bytes; + +use futures::stream::Stream; +use futures::task; +use tokio::io::BufReader; use http::header::{HeaderName, HeaderValue}; use hyper::{body::Body, Request}; -use garage_net::stream::{ByteStream, ByteStreamReader}; +use garage_net::bytes_buf::BytesBuf; +use garage_net::stream::{stream_asyncread, ByteStream}; use garage_rpc::rpc_helper::OrderTag; use garage_util::data::Hash; +use garage_util::migrate::Migrate; use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders}; @@ -108,7 +116,7 @@ impl EncryptionParams { ) => { let enc = Self::SseC { client_key, - compression_level: if compressed { + compression_level: if *compressed { Some(garage.config.compression_level.unwrap_or(1)) } else { None @@ -173,7 +181,7 @@ impl EncryptionParams { Self::SseC { client_key, .. } => { let cipher = Aes256Gcm::new(&client_key); let nonce_size = Aes256Gcm::NonceSize::to_usize(); - let nonce: Nonce<Aes256Gcm::NonceSize> = blob + let nonce: Nonce<Aes256Gcm> = blob .get(..nonce_size) .ok_or_internal_error("invalid encrypted data")? .try_into() @@ -193,7 +201,7 @@ impl EncryptionParams { /// Get a data block from the storage node, and decrypt+decompress it /// if necessary. If object is plaintext, just get it without any processing. - pub async fn get_and_decrypt_block( + pub async fn get_block( &self, garage: &Garage, hash: &Hash, @@ -209,7 +217,15 @@ impl EncryptionParams { client_key, compression_level, } => { - todo!() + let plaintext = DecryptStream::new(raw_block, *client_key); + if compression_level.is_some() { + let reader = stream_asyncread(Box::pin(plaintext)); + let reader = BufReader::new(reader); + let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); + Ok(Box::pin(tokio_util::io::ReaderStream::new(reader))) + } else { + Ok(Box::pin(plaintext)) + } } } } @@ -257,3 +273,92 @@ fn parse_request_headers( None => Ok(None), } } + +// ---- encrypt & decrypt streams ---- + +#[pin_project::pin_project] +struct DecryptStream { + #[pin] + stream: ByteStream, + buf: BytesBuf, + key: Key<Aes256Gcm>, + cipher: Option<stream::DecryptorLE31<Aes256Gcm>>, +} + +impl DecryptStream { + fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self { + Self { + stream, + buf: BytesBuf::new(), + key, + cipher: None, + } + } +} + +impl Stream for DecryptStream { + type Item = Result<Bytes, std::io::Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<Option<Self::Item>> { + use std::task::Poll; + + let mut this = self.project(); + + while this.cipher.is_none() { + if this.buf.len() >= Aes256Gcm::NonceSize::as_usize() { + let nonce = this + .buf + .take_exact(Aes256Gcm::NonceSize::as_usize()) + .unwrap(); + let nonce: Aes256Gcm::Nonce = nonce.try_into().unwrap(); + *this.cipher = Some(stream::DecryptorLE31::new(&self.key, &nonce)); + break; + } + + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Decrypt: unexpected EOF, could not read nonce", + )))); + } + } + } + + let chunk_size = STREAM_ENC_CHUNK_SIZE + Aes256Gcm::TagSize::as_usize(); + while this.buf.len() < chunk_size { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => break, + } + } + + if this.buf.is_empty() { + return Poll::Ready(None); + } + + let chunk = this.buf.take_max(chunk_size); + let res = this.cipher.as_ref().unwrap().decrypt_next(&chunk); + match res { + Ok(bytes) => Poll::Ready(Some(Ok(bytes))), + Err(_) => Poll::Ready(SomeErr(std::io::Error::new( + std::io::ErrorKind::Other, + "Decryption failed", + ))), + } + } +} diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 3f85eecb..9fe4d63e 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -329,11 +329,7 @@ async fn handle_get_full( }); let stream_block_0 = encryption - .get_and_decrypt_block( - &garage, - &first_block_hash, - Some(order_stream.order(0)), - ) + .get_block(&garage, &first_block_hash, Some(order_stream.order(0))) .await?; tx.send(stream_block_0) @@ -343,11 +339,7 @@ async fn handle_get_full( let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { let stream_block_i = encryption - .get_and_decrypt_block( - &garage, - &vb.hash, - Some(order_stream.order(i as u64)), - ) + .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64))) .await?; tx.send(stream_block_i) .await @@ -544,7 +536,7 @@ fn body_from_blocks_range( let garage = garage.clone(); for (i, (block, block_offset)) in blocks.iter().enumerate() { let block_stream = encryption - .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64))) + .get_block(&garage, &block.hash, Some(order_stream.order(i as u64))) .await?; let block_stream = block_stream .scan(*block_offset, move |chunk_offset, chunk| { |