aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/s3/encryption.rs117
-rw-r--r--src/api/s3/get.rs14
4 files changed, 118 insertions, 17 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0ed568c3..afd360aa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1377,6 +1377,7 @@ name = "garage_api"
version = "0.10.0"
dependencies = [
"aes-gcm",
+ "async-compression",
"async-trait",
"base64 0.21.7",
"bytes",
@@ -1417,6 +1418,7 @@ dependencies = [
"sha2",
"tokio",
"tokio-stream",
+ "tokio-util 0.7.10",
"tracing",
"url",
]
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index c759d5a4..8099f765 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -22,6 +22,7 @@ garage_util.workspace = true
garage_rpc.workspace = true
aes-gcm.workspace = true
+async-compression.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
@@ -41,6 +42,7 @@ futures.workspace = true
futures-util.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
+tokio-util.workspace = true
form_urlencoded.workspace = true
http.workspace = true
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs
index 2d403ff3..1515d903 100644
--- a/src/api/s3/encryption.rs
+++ b/src/api/s3/encryption.rs
@@ -1,17 +1,25 @@
use std::borrow::Cow;
+use std::pin::Pin;
use aes_gcm::{
- aead::{Aead, AeadCore, KeyInit, OsRng},
+ aead::{stream, Aead, AeadCore, KeyInit, OsRng},
Aes256Gcm, Key, Nonce,
};
use base64::prelude::*;
+use bytes::Bytes;
+
+use futures::stream::Stream;
+use futures::task;
+use tokio::io::BufReader;
use http::header::{HeaderName, HeaderValue};
use hyper::{body::Body, Request};
-use garage_net::stream::{ByteStream, ByteStreamReader};
+use garage_net::bytes_buf::BytesBuf;
+use garage_net::stream::{stream_asyncread, ByteStream};
use garage_rpc::rpc_helper::OrderTag;
use garage_util::data::Hash;
+use garage_util::migrate::Migrate;
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders};
@@ -108,7 +116,7 @@ impl EncryptionParams {
) => {
let enc = Self::SseC {
client_key,
- compression_level: if compressed {
+ compression_level: if *compressed {
Some(garage.config.compression_level.unwrap_or(1))
} else {
None
@@ -173,7 +181,7 @@ impl EncryptionParams {
Self::SseC { client_key, .. } => {
let cipher = Aes256Gcm::new(&client_key);
let nonce_size = Aes256Gcm::NonceSize::to_usize();
- let nonce: Nonce<Aes256Gcm::NonceSize> = blob
+ let nonce: Nonce<Aes256Gcm> = blob
.get(..nonce_size)
.ok_or_internal_error("invalid encrypted data")?
.try_into()
@@ -193,7 +201,7 @@ impl EncryptionParams {
/// Get a data block from the storage node, and decrypt+decompress it
/// if necessary. If object is plaintext, just get it without any processing.
- pub async fn get_and_decrypt_block(
+ pub async fn get_block(
&self,
garage: &Garage,
hash: &Hash,
@@ -209,7 +217,15 @@ impl EncryptionParams {
client_key,
compression_level,
} => {
- todo!()
+ let plaintext = DecryptStream::new(raw_block, *client_key);
+ if compression_level.is_some() {
+ let reader = stream_asyncread(Box::pin(plaintext));
+ let reader = BufReader::new(reader);
+ let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
+ Ok(Box::pin(tokio_util::io::ReaderStream::new(reader)))
+ } else {
+ Ok(Box::pin(plaintext))
+ }
}
}
}
@@ -257,3 +273,92 @@ fn parse_request_headers(
None => Ok(None),
}
}
+
+// ---- encrypt & decrypt streams ----
+
+#[pin_project::pin_project]
+struct DecryptStream {
+ #[pin]
+ stream: ByteStream,
+ buf: BytesBuf,
+ key: Key<Aes256Gcm>,
+ cipher: Option<stream::DecryptorLE31<Aes256Gcm>>,
+}
+
+impl DecryptStream {
+ fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self {
+ Self {
+ stream,
+ buf: BytesBuf::new(),
+ key,
+ cipher: None,
+ }
+ }
+}
+
+impl Stream for DecryptStream {
+ type Item = Result<Bytes, std::io::Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> task::Poll<Option<Self::Item>> {
+ use std::task::Poll;
+
+ let mut this = self.project();
+
+ while this.cipher.is_none() {
+ if this.buf.len() >= Aes256Gcm::NonceSize::as_usize() {
+ let nonce = this
+ .buf
+ .take_exact(Aes256Gcm::NonceSize::as_usize())
+ .unwrap();
+ let nonce: Aes256Gcm::Nonce = nonce.try_into().unwrap();
+ *this.cipher = Some(stream::DecryptorLE31::new(&self.key, &nonce));
+ break;
+ }
+
+ match futures::ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(Ok(bytes)) => {
+ this.buf.extend(bytes);
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ None => {
+ return Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "Decrypt: unexpected EOF, could not read nonce",
+ ))));
+ }
+ }
+ }
+
+ let chunk_size = STREAM_ENC_CHUNK_SIZE + Aes256Gcm::TagSize::as_usize();
+ while this.buf.len() < chunk_size {
+ match futures::ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(Ok(bytes)) => {
+ this.buf.extend(bytes);
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ None => break,
+ }
+ }
+
+ if this.buf.is_empty() {
+ return Poll::Ready(None);
+ }
+
+ let chunk = this.buf.take_max(chunk_size);
+ let res = this.cipher.as_ref().unwrap().decrypt_next(&chunk);
+ match res {
+ Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
+ Err(_) => Poll::Ready(SomeErr(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Decryption failed",
+ ))),
+ }
+ }
+}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 3f85eecb..9fe4d63e 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -329,11 +329,7 @@ async fn handle_get_full(
});
let stream_block_0 = encryption
- .get_and_decrypt_block(
- &garage,
- &first_block_hash,
- Some(order_stream.order(0)),
- )
+ .get_block(&garage, &first_block_hash, Some(order_stream.order(0)))
.await?;
tx.send(stream_block_0)
@@ -343,11 +339,7 @@ async fn handle_get_full(
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
let stream_block_i = encryption
- .get_and_decrypt_block(
- &garage,
- &vb.hash,
- Some(order_stream.order(i as u64)),
- )
+ .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64)))
.await?;
tx.send(stream_block_i)
.await
@@ -544,7 +536,7 @@ fn body_from_blocks_range(
let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
let block_stream = encryption
- .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
+ .get_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
.await?;
let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {