aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/encryption.rs98
-rw-r--r--src/api/s3/get.rs83
2 files changed, 140 insertions, 41 deletions
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs
index 4055a67f..2d403ff3 100644
--- a/src/api/s3/encryption.rs
+++ b/src/api/s3/encryption.rs
@@ -4,10 +4,15 @@ use aes_gcm::{
aead::{Aead, AeadCore, KeyInit, OsRng},
Aes256Gcm, Key, Nonce,
};
+use base64::prelude::*;
use http::header::{HeaderName, HeaderValue};
use hyper::{body::Body, Request};
+use garage_net::stream::{ByteStream, ByteStreamReader};
+use garage_rpc::rpc_helper::OrderTag;
+use garage_util::data::Hash;
+
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders};
@@ -29,6 +34,9 @@ const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
const CUSTOMER_ALGORITHM_AES256: HeaderValue = HeaderValue::from_static("AES256");
+const STREAM_ENC_CHUNK_SIZE: usize = 0x1000; // 4096 bytes
+
+#[derive(Clone, Copy)]
pub enum EncryptionParams {
Plaintext,
SseC {
@@ -98,16 +106,6 @@ impl EncryptionParams {
compressed,
},
) => {
- let cipher = Aes256Gcm::new(&client_key);
- let nonce: Nonce<Aes256Gcm::NonceSize> = headers
- .get(..12)
- .ok_or_internal_error("invalid encrypted data")?
- .try_into()
- .unwrap();
- let plaintext = cipher.decrypt(&nonce, &headers[12..]).ok_or_bad_request(
- "Invalid encryption key, could not decrypt object metadata.",
- )?;
- let headers = ObjectVersionHeaders::decode(&plaintext)?;
let enc = Self::SseC {
client_key,
compression_level: if compressed {
@@ -116,6 +114,8 @@ impl EncryptionParams {
None
},
};
+ let plaintext = enc.decrypt_blob(&headers)?;
+ let headers = ObjectVersionHeaders::decode(&plaintext)?;
Ok((enc, headers.into()))
}
(None, ObjectVersionEncryption::Plaintext { headers }) => {
@@ -140,21 +140,79 @@ impl EncryptionParams {
client_key,
compression_level,
} => {
- let cipher = Aes256Gcm::new(&client_key);
- let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
let plaintext = h.encode()?;
- let ciphertext = cipher
- .encrypt(&nonce, &plaintext)
- .ok_or_internal_error("Encryption failed")?;
- let headers_enc = [nonce.to_vec(), ciphertext].concat();
+ let ciphertext = self.encrypt_blob(plaintext)?;
Ok(ObjectVersionEncryption::SseC {
- headers: headers_enc,
+ headers: ciphertext,
compressed: compression_level.is_some(),
})
}
Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }),
}
}
+
+ // ---- generic function for encrypting / decrypting blobs ----
+ // prepends a randomly-generated nonce to the encrypted value
+
+ pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
+ match self {
+ Self::SseC { client_key, .. } => {
+ let cipher = Aes256Gcm::new(&client_key);
+ let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
+ let ciphertext = cipher
+ .encrypt(&nonce, &blob)
+ .ok_or_internal_error("Encryption failed")?;
+ Ok([nonce.to_vec(), ciphertext].concat().into())
+ }
+ Self::Plaintext => Ok(blob.into()),
+ }
+ }
+
+ pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
+ match self {
+ Self::SseC { client_key, .. } => {
+ let cipher = Aes256Gcm::new(&client_key);
+ let nonce_size = Aes256Gcm::NonceSize::to_usize();
+ let nonce: Nonce<Aes256Gcm::NonceSize> = blob
+ .get(..nonce_size)
+ .ok_or_internal_error("invalid encrypted data")?
+ .try_into()
+ .unwrap();
+ let plaintext = cipher
+ .decrypt(&nonce, &blob[nonce_size..])
+ .ok_or_bad_request(
+ "Invalid encryption key, could not decrypt object metadata.",
+ )?;
+ Ok(plaintext.into())
+ }
+ Self::Plaintext => Ok(blob.into()),
+ }
+ }
+
+ // ---- function for encrypting / decrypting byte streams ----
+
+ /// Get a data block from the storage node, and decrypt+decompress it
+ /// if necessary. If object is plaintext, just get it without any processing.
+ pub async fn get_and_decrypt_block(
+ &self,
+ garage: &Garage,
+ hash: &Hash,
+ order: Option<OrderTag>,
+ ) -> Result<ByteStream, Error> {
+ let raw_block = garage
+ .block_manager
+ .rpc_get_block_streaming(hash, order)
+ .await?;
+ match self {
+ Self::Plaintext => Ok(raw_block),
+ Self::SseC {
+ client_key,
+ compression_level,
+ } => {
+ todo!()
+ }
+ }
+ }
}
fn parse_request_headers(
@@ -171,7 +229,8 @@ fn parse_request_headers(
.headers()
.get(key_header)
.ok_or_bad_request(format!("Missing {} header", key_header))?;
- let key_bytes: [u8; 32] = base64::decode(&key_b64)
+ let key_bytes: [u8; 32] = BASE64_STANDARD
+ .decode(&key_b64)
.ok_or_bad_request(format!("Invalid {} header", key_header))?
.try_into()
.ok_or_bad_request(format!("Invalid {} header", key_header))?;
@@ -180,7 +239,8 @@ fn parse_request_headers(
.headers()
.get(md5_header)
.ok_or_bad_request(format!("Missing {} header", md5_header))?;
- let md5_bytes = base64::decode(&md5_b64)
+ let md5_bytes = BASE64_STANDARD
+ .decode(&md5_b64)
.ok_or_bad_request(format!("Invalid {} header", md5_header))?;
let mut hasher = Md5::new();
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 0d18e775..3f85eecb 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -25,6 +25,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
+use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@@ -42,6 +43,7 @@ pub struct GetObjectOverrides {
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
+ headers: &ObjectVersionHeaders,
) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta);
@@ -49,7 +51,7 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
- .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
+ .header(CONTENT_TYPE, headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
@@ -57,7 +59,7 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
- for (k, v) in version_meta.headers.other.iter() {
+ for (k, v) in headers.other.iter() {
resp = resp.header(k, v.to_string());
}
@@ -165,13 +167,16 @@ pub async fn handle_head(
return Ok(cached);
}
+ let (_enc, headers) =
+ EncryptionParams::check_decrypt_for_get(&garage, req, &version_meta.encryption)?;
+
if let Some(pn) = part_number {
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
CONTENT_RANGE,
@@ -191,7 +196,7 @@ pub async fn handle_head(
let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
.header(
CONTENT_RANGE,
@@ -209,7 +214,7 @@ pub async fn handle_head(
_ => unreachable!(),
}
} else {
- Ok(object_headers(object_version, version_meta)
+ Ok(object_headers(object_version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(empty_body())?)
@@ -252,23 +257,41 @@ pub async fn handle_get(
return Ok(cached);
}
+ let (enc, headers) =
+ EncryptionParams::check_decrypt_for_get(&garage, req, &last_v_meta.encryption)?;
+
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
- (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
+ (Some(pn), None) => {
+ handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await
+ }
(None, Some(range)) => {
handle_get_range(
garage,
last_v,
last_v_data,
last_v_meta,
+ enc,
+ &headers,
range.start,
range.start + range.length,
)
.await
}
- (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
+ (None, None) => {
+ handle_get_full(
+ garage,
+ last_v,
+ last_v_data,
+ last_v_meta,
+ enc,
+ &headers,
+ overrides,
+ )
+ .await
+ }
}
}
@@ -277,9 +300,11 @@ async fn handle_get_full(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
- let mut resp_builder = object_headers(version, version_meta)
+ let mut resp_builder = object_headers(version, version_meta, &headers)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?;
@@ -303,19 +328,26 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await
});
- let stream_block_0 = garage
- .block_manager
- .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ let stream_block_0 = encryption
+ .get_and_decrypt_block(
+ &garage,
+ &first_block_hash,
+ Some(order_stream.order(0)),
+ )
.await?;
+
tx.send(stream_block_0)
.await
.ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
- let stream_block_i = garage
- .block_manager
- .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ let stream_block_i = encryption
+ .get_and_decrypt_block(
+ &garage,
+ &vb.hash,
+ Some(order_stream.order(i as u64)),
+ )
.await?;
tx.send(stream_block_i)
.await
@@ -344,13 +376,15 @@ async fn handle_get_range(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
begin: u64,
end: u64,
) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT)
- let resp_builder = object_headers(version, version_meta)
+ let resp_builder = object_headers(version, version_meta, headers)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
CONTENT_RANGE,
@@ -377,7 +411,8 @@ async fn handle_get_range(
.await?
.ok_or(Error::NoSuchKey)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
@@ -388,11 +423,13 @@ async fn handle_get_part(
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
+ encryption: EncryptionParams,
+ headers: &ObjectVersionHeaders,
part_number: u64,
) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers
let resp_builder =
- object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
+ object_headers(object_version, version_meta, headers).status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
@@ -418,7 +455,8 @@ async fn handle_get_part(
let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
- let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
+ let body =
+ body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
@@ -473,6 +511,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
fn body_from_blocks_range(
garage: Arc<Garage>,
+ encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
@@ -504,10 +543,10 @@ fn body_from_blocks_range(
match async {
let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
- let block_stream = garage
- .block_manager
- .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await?
+ let block_stream = encryption
+ .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {