diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-23 18:10:38 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-27 10:15:17 +0100 |
commit | c7d3c9887f433fefece38c0a7f7774a4193ba869 (patch) | |
tree | 5808f21fdf1edacb70089f7951acad5352074588 /src/api/s3 | |
parent | a12153ad282b6d939b205191b2ee19671894428c (diff) | |
download | garage-c7d3c9887f433fefece38c0a7f7774a4193ba869.tar.gz garage-c7d3c9887f433fefece38c0a7f7774a4193ba869.zip |
[sse-c] hook sse-c decryption into GetObject
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/encryption.rs | 98 | ||||
-rw-r--r-- | src/api/s3/get.rs | 83 |
2 files changed, 140 insertions, 41 deletions
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 4055a67f..2d403ff3 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -4,10 +4,15 @@ use aes_gcm::{ aead::{Aead, AeadCore, KeyInit, OsRng}, Aes256Gcm, Key, Nonce, }; +use base64::prelude::*; use http::header::{HeaderName, HeaderValue}; use hyper::{body::Body, Request}; +use garage_net::stream::{ByteStream, ByteStreamReader}; +use garage_rpc::rpc_helper::OrderTag; +use garage_util::data::Hash; + use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders}; @@ -29,6 +34,9 @@ const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = const CUSTOMER_ALGORITHM_AES256: HeaderValue = HeaderValue::from_static("AES256"); +const STREAM_ENC_CHUNK_SIZE: usize = 0x1000; // 4096 bytes + +#[derive(Clone, Copy)] pub enum EncryptionParams { Plaintext, SseC { @@ -98,16 +106,6 @@ impl EncryptionParams { compressed, }, ) => { - let cipher = Aes256Gcm::new(&client_key); - let nonce: Nonce<Aes256Gcm::NonceSize> = headers - .get(..12) - .ok_or_internal_error("invalid encrypted data")? - .try_into() - .unwrap(); - let plaintext = cipher.decrypt(&nonce, &headers[12..]).ok_or_bad_request( - "Invalid encryption key, could not decrypt object metadata.", - )?; - let headers = ObjectVersionHeaders::decode(&plaintext)?; let enc = Self::SseC { client_key, compression_level: if compressed { @@ -116,6 +114,8 @@ impl EncryptionParams { None }, }; + let plaintext = enc.decrypt_blob(&headers)?; + let headers = ObjectVersionHeaders::decode(&plaintext)?; Ok((enc, headers.into())) } (None, ObjectVersionEncryption::Plaintext { headers }) => { @@ -140,21 +140,79 @@ impl EncryptionParams { client_key, compression_level, } => { - let cipher = Aes256Gcm::new(&client_key); - let nonce = Aes256Gcm::generate_nonce(&mut OsRng); let plaintext = h.encode()?; - let ciphertext = cipher - .encrypt(&nonce, &plaintext) - .ok_or_internal_error("Encryption failed")?; - let headers_enc = [nonce.to_vec(), ciphertext].concat(); + let ciphertext = self.encrypt_blob(plaintext)?; Ok(ObjectVersionEncryption::SseC { - headers: headers_enc, + headers: ciphertext, compressed: compression_level.is_some(), }) } Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }), } } + + // ---- generic function for encrypting / decrypting blobs ---- + // prepends a randomly-generated nonce to the encrypted value + + pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> { + match self { + Self::SseC { client_key, .. } => { + let cipher = Aes256Gcm::new(&client_key); + let nonce = Aes256Gcm::generate_nonce(&mut OsRng); + let ciphertext = cipher + .encrypt(&nonce, &blob) + .ok_or_internal_error("Encryption failed")?; + Ok([nonce.to_vec(), ciphertext].concat().into()) + } + Self::Plaintext => Ok(blob.into()), + } + } + + pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> { + match self { + Self::SseC { client_key, .. } => { + let cipher = Aes256Gcm::new(&client_key); + let nonce_size = Aes256Gcm::NonceSize::to_usize(); + let nonce: Nonce<Aes256Gcm::NonceSize> = blob + .get(..nonce_size) + .ok_or_internal_error("invalid encrypted data")? + .try_into() + .unwrap(); + let plaintext = cipher + .decrypt(&nonce, &blob[nonce_size..]) + .ok_or_bad_request( + "Invalid encryption key, could not decrypt object metadata.", + )?; + Ok(plaintext.into()) + } + Self::Plaintext => Ok(blob.into()), + } + } + + // ---- function for encrypting / decrypting byte streams ---- + + /// Get a data block from the storage node, and decrypt+decompress it + /// if necessary. If object is plaintext, just get it without any processing. + pub async fn get_and_decrypt_block( + &self, + garage: &Garage, + hash: &Hash, + order: Option<OrderTag>, + ) -> Result<ByteStream, Error> { + let raw_block = garage + .block_manager + .rpc_get_block_streaming(hash, order) + .await?; + match self { + Self::Plaintext => Ok(raw_block), + Self::SseC { + client_key, + compression_level, + } => { + todo!() + } + } + } } fn parse_request_headers( @@ -171,7 +229,8 @@ fn parse_request_headers( .headers() .get(key_header) .ok_or_bad_request(format!("Missing {} header", key_header))?; - let key_bytes: [u8; 32] = base64::decode(&key_b64) + let key_bytes: [u8; 32] = BASE64_STANDARD + .decode(&key_b64) .ok_or_bad_request(format!("Invalid {} header", key_header))? .try_into() .ok_or_bad_request(format!("Invalid {} header", key_header))?; @@ -180,7 +239,8 @@ fn parse_request_headers( .headers() .get(md5_header) .ok_or_bad_request(format!("Missing {} header", md5_header))?; - let md5_bytes = base64::decode(&md5_b64) + let md5_bytes = BASE64_STANDARD + .decode(&md5_b64) .ok_or_bad_request(format!("Invalid {} header", md5_header))?; let mut hasher = Md5::new(); diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 0d18e775..3f85eecb 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -25,6 +25,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::ResBody; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -42,6 +43,7 @@ pub struct GetObjectOverrides { fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, + headers: &ObjectVersionHeaders, ) -> http::response::Builder { debug!("Version meta: {:?}", version_meta); @@ -49,7 +51,7 @@ fn object_headers( let date_str = httpdate::fmt_http_date(date); let mut resp = Response::builder() - .header(CONTENT_TYPE, version_meta.headers.content_type.to_string()) + .header(CONTENT_TYPE, headers.content_type.to_string()) .header(LAST_MODIFIED, date_str) .header(ACCEPT_RANGES, "bytes".to_string()); @@ -57,7 +59,7 @@ fn object_headers( resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag)); } - for (k, v) in version_meta.headers.other.iter() { + for (k, v) in headers.other.iter() { resp = resp.header(k, v.to_string()); } @@ -165,13 +167,16 @@ pub async fn handle_head( return Ok(cached); } + let (_enc, headers) = + EncryptionParams::check_decrypt_for_get(&garage, req, &version_meta.encryption)?; + if let Some(pn) = part_number { match version_data { ObjectVersionData::Inline(_, bytes) => { if pn != 1 { return Err(Error::InvalidPart); } - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", bytes.len())) .header( CONTENT_RANGE, @@ -191,7 +196,7 @@ pub async fn handle_head( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) .header( CONTENT_RANGE, @@ -209,7 +214,7 @@ pub async fn handle_head( _ => unreachable!(), } } else { - Ok(object_headers(object_version, version_meta) + Ok(object_headers(object_version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) .body(empty_body())?) @@ -252,23 +257,41 @@ pub async fn handle_get( return Ok(cached); } + let (enc, headers) = + EncryptionParams::check_decrypt_for_get(&garage, req, &last_v_meta.encryption)?; + match (part_number, parse_range_header(req, last_v_meta.size)?) { (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => { + handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await + } (None, Some(range)) => { handle_get_range( garage, last_v, last_v_data, last_v_meta, + enc, + &headers, range.start, range.start + range.length, ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, + (None, None) => { + handle_get_full( + garage, + last_v, + last_v_data, + last_v_meta, + enc, + &headers, + overrides, + ) + .await + } } } @@ -277,9 +300,11 @@ async fn handle_get_full( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, overrides: GetObjectOverrides, ) -> Result<Response<ResBody>, Error> { - let mut resp_builder = object_headers(version, version_meta) + let mut resp_builder = object_headers(version, version_meta, &headers) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK); getobject_override_headers(overrides, &mut resp_builder)?; @@ -303,19 +328,26 @@ async fn handle_get_full( garage2.version_table.get(&version_uuid, &EmptyKey).await }); - let stream_block_0 = garage - .block_manager - .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + let stream_block_0 = encryption + .get_and_decrypt_block( + &garage, + &first_block_hash, + Some(order_stream.order(0)), + ) .await?; + tx.send(stream_block_0) .await .ok_or_message("channel closed")?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { - let stream_block_i = garage - .block_manager - .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + let stream_block_i = encryption + .get_and_decrypt_block( + &garage, + &vb.hash, + Some(order_stream.order(i as u64)), + ) .await?; tx.send(stream_block_i) .await @@ -344,13 +376,15 @@ async fn handle_get_range( version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, begin: u64, end: u64, ) -> Result<Response<ResBody>, Error> { // Here we do not use getobject_override_headers because we don't // want to add any overridden headers (those should not be added // when returning PARTIAL_CONTENT) - let resp_builder = object_headers(version, version_meta) + let resp_builder = object_headers(version, version_meta, headers) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( CONTENT_RANGE, @@ -377,7 +411,8 @@ async fn handle_get_range( .await? .ok_or(Error::NoSuchKey)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder.body(body)?) } } @@ -388,11 +423,13 @@ async fn handle_get_part( object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, + encryption: EncryptionParams, + headers: &ObjectVersionHeaders, part_number: u64, ) -> Result<Response<ResBody>, Error> { // Same as for get_range, no getobject_override_headers let resp_builder = - object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); + object_headers(object_version, version_meta, headers).status(StatusCode::PARTIAL_CONTENT); match version_data { ObjectVersionData::Inline(_, bytes) => { @@ -418,7 +455,8 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); + let body = + body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder .header(CONTENT_LENGTH, format!("{}", end - begin)) @@ -473,6 +511,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> { fn body_from_blocks_range( garage: Arc<Garage>, + encryption: EncryptionParams, all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, @@ -504,10 +543,10 @@ fn body_from_blocks_range( match async { let garage = garage.clone(); for (i, (block, block_offset)) in blocks.iter().enumerate() { - let block_stream = garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await? + let block_stream = encryption + .get_and_decrypt_block(&garage, &block.hash, Some(order_stream.order(i as u64))) + .await?; + let block_stream = block_stream .scan(*block_offset, move |chunk_offset, chunk| { let r = match chunk { Ok(chunk_bytes) => { |