use std::borrow::Cow; use std::convert::TryInto; use std::pin::Pin; use aes_gcm::{ aead::stream::{DecryptorLE31, EncryptorLE31, StreamLE31}, aead::{Aead, AeadCore, KeyInit, OsRng}, aes::cipher::crypto_common::rand_core::RngCore, aes::cipher::typenum::Unsigned, Aes256Gcm, Key, Nonce, }; use base64::prelude::*; use bytes::Bytes; use futures::stream::Stream; use futures::task; use tokio::io::BufReader; use http::header::{HeaderName, HeaderValue}; use hyper::{body::Body, Request}; use garage_net::bytes_buf::BytesBuf; use garage_net::stream::{stream_asyncread, ByteStream}; use garage_rpc::rpc_helper::OrderTag; use garage_util::data::Hash; use garage_util::error::Error as GarageError; use garage_util::migrate::Migrate; use garage_model::garage::Garage; use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders}; use crate::common_error::*; use crate::s3::error::Error; const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm"); const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-key"); const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5"); const CUSTOMER_ALGORITHM_AES256: HeaderValue = HeaderValue::from_static("AES256"); type StreamNonce = aes_gcm::aead::stream::Nonce>; type StreamNonceSize = aes_gcm::aead::stream::NonceSize>; const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16; #[derive(Clone, Copy)] pub enum EncryptionParams { Plaintext, SseC { client_key: Key, compression_level: Option, }, } impl EncryptionParams { pub fn is_encrypted(&self) -> bool { matches!(self, Self::SseC { .. }) } pub fn new_from_req( garage: &Garage, req: &Request, ) -> Result { let key = parse_request_headers( req, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, )?; match key { Some(client_key) => Ok(EncryptionParams::SseC { client_key, compression_level: garage.config.compression_level, }), None => Ok(EncryptionParams::Plaintext), } } pub fn check_decrypt_for_get<'a>( garage: &Garage, req: &Request, obj_enc: &'a ObjectVersionEncryption, ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { let key = parse_request_headers( req, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, &X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, )?; Self::check_decrypt(garage, key, obj_enc) } pub fn check_decrypt_for_copy_source<'a>( garage: &Garage, req: &Request, obj_enc: &'a ObjectVersionEncryption, ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { let key = parse_request_headers( req, &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, &X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, )?; Self::check_decrypt(garage, key, obj_enc) } fn check_decrypt<'a>( garage: &Garage, key: Option>, obj_enc: &'a ObjectVersionEncryption, ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { match (key, &obj_enc) { ( Some(client_key), ObjectVersionEncryption::SseC { headers, compressed, }, ) => { let enc = Self::SseC { client_key, compression_level: if *compressed { Some(garage.config.compression_level.unwrap_or(1)) } else { None }, }; let plaintext = enc.decrypt_blob(&headers)?; let headers = ObjectVersionHeaders::decode(&plaintext) .ok_or_internal_error("Could not decode encrypted headers")?; Ok((enc, Cow::Owned(headers))) } (None, ObjectVersionEncryption::Plaintext { headers }) => { Ok((Self::Plaintext, Cow::Borrowed(headers))) } (_, ObjectVersionEncryption::SseC { .. }) => { Err(Error::bad_request("Object is encrypted")) } (Some(_), _) => { // TODO: should this be an OK scenario? Err(Error::bad_request("Trying to decrypt a plaintext object")) } } } pub fn encrypt_headers( &self, h: ObjectVersionHeaders, ) -> Result { match self { Self::SseC { compression_level, .. } => { let plaintext = h.encode().map_err(GarageError::from)?; let ciphertext = self.encrypt_blob(&plaintext)?; Ok(ObjectVersionEncryption::SseC { headers: ciphertext.into_owned(), compressed: compression_level.is_some(), }) } Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }), } } // ---- generic function for encrypting / decrypting blobs ---- // prepends a randomly-generated nonce to the encrypted value pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result, Error> { match self { Self::SseC { client_key, .. } => { let cipher = Aes256Gcm::new(&client_key); let nonce = Aes256Gcm::generate_nonce(&mut OsRng); let ciphertext = cipher .encrypt(&nonce, blob) .ok_or_internal_error("Encryption failed")?; Ok([nonce.to_vec(), ciphertext].concat().into()) } Self::Plaintext => Ok(blob.into()), } } pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result, Error> { match self { Self::SseC { client_key, .. } => { let cipher = Aes256Gcm::new(&client_key); let nonce_size = ::NonceSize::to_usize(); let nonce = Nonce::from_slice( blob.get(..nonce_size) .ok_or_internal_error("invalid encrypted data")?, ); let plaintext = cipher .decrypt(nonce, &blob[nonce_size..]) .ok_or_bad_request( "Invalid encryption key, could not decrypt object metadata.", )?; Ok(plaintext.into()) } Self::Plaintext => Ok(blob.into()), } } // ---- function for encrypting / decrypting byte streams ---- /// Get a data block from the storage node, and decrypt+decompress it /// if necessary. If object is plaintext, just get it without any processing. pub async fn get_block( &self, garage: &Garage, hash: &Hash, order: Option, ) -> Result { let raw_block = garage .block_manager .rpc_get_block_streaming(hash, order) .await?; match self { Self::Plaintext => Ok(raw_block), Self::SseC { client_key, compression_level, } => { let plaintext = DecryptStream::new(raw_block, *client_key); if compression_level.is_some() { let reader = stream_asyncread(Box::pin(plaintext)); let reader = BufReader::new(reader); let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); Ok(Box::pin(tokio_util::io::ReaderStream::new(reader))) } else { Ok(Box::pin(plaintext)) } } } } /// Encrypt a data block if encryption is set, for use before /// putting the data blocks into storage pub fn encrpyt_block(&self, block: Bytes) -> Result { match self { Self::Plaintext => Ok(block), Self::SseC { client_key, compression_level, } => { let block = if let Some(level) = compression_level { Cow::Owned( garage_block::zstd_encode(block.as_ref(), *level) .ok_or_internal_error("failed to compress data block")?, ) } else { Cow::Borrowed(block.as_ref()) }; let mut ret = Vec::with_capacity(block.len() + 32 + block.len() / 64); let mut nonce: Nonce = Default::default(); OsRng.fill_bytes(&mut nonce); ret.extend_from_slice(nonce.as_slice()); let mut cipher = EncryptorLE31::::new(&client_key, &nonce); for chunk in block.chunks(STREAM_ENC_PLAIN_CHUNK_SIZE) { // TODO: use encrypt_last for last chunk let chunk_enc = cipher .encrypt_next(chunk) .ok_or_internal_error("failed to encrypt chunk")?; if chunk.len() == STREAM_ENC_PLAIN_CHUNK_SIZE { assert_eq!(chunk_enc.len(), STREAM_ENC_CYPER_CHUNK_SIZE); } ret.extend_from_slice(&chunk_enc); } Ok(ret.into()) } } } } fn parse_request_headers( req: &Request, alg_header: &HeaderName, key_header: &HeaderName, md5_header: &HeaderName, ) -> Result>, Error> { match req.headers().get(alg_header) { Some(alg) if *alg == CUSTOMER_ALGORITHM_AES256 => { use md5::{Digest, Md5}; let key_b64 = req .headers() .get(key_header) .ok_or_bad_request(format!("Missing {} header", key_header))?; let key_bytes: [u8; 32] = BASE64_STANDARD .decode(&key_b64) .ok_or_bad_request(format!("Invalid {} header", key_header))? .try_into() .ok() .ok_or_bad_request(format!("Invalid {} header", key_header))?; let md5_b64 = req .headers() .get(md5_header) .ok_or_bad_request(format!("Missing {} header", md5_header))?; let md5_bytes = BASE64_STANDARD .decode(&md5_b64) .ok_or_bad_request(format!("Invalid {} header", md5_header))?; let mut hasher = Md5::new(); hasher.update(&key_bytes[..]); if hasher.finalize().as_slice() != md5_bytes.as_slice() { return Err(Error::bad_request( "Encryption key MD5 checksum does not match", )); } Ok(Some(key_bytes.into())) } Some(alg) => Err(Error::InvalidEncryptionAlgorithm( alg.to_str().unwrap_or("??").to_string(), )), None => Ok(None), } } // ---- encrypt & decrypt streams ---- #[pin_project::pin_project] struct DecryptStream { #[pin] stream: ByteStream, buf: BytesBuf, key: Key, cipher: Option>, } impl DecryptStream { fn new(stream: ByteStream, key: Key) -> Self { Self { stream, buf: BytesBuf::new(), key, cipher: None, } } } impl Stream for DecryptStream { type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll> { use std::task::Poll; let mut this = self.project(); while this.cipher.is_none() { let nonce_size = StreamNonceSize::to_usize(); if this.buf.len() >= nonce_size { let nonce = this.buf.take_exact(nonce_size).unwrap(); let nonce = Nonce::from_slice(nonce.as_ref()); *this.cipher = Some(DecryptorLE31::new(&this.key, nonce)); break; } match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { this.buf.extend(bytes); } Some(Err(e)) => { return Poll::Ready(Some(Err(e))); } None => { return Poll::Ready(Some(Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "Decrypt: unexpected EOF, could not read nonce", )))); } } } while this.buf.len() < STREAM_ENC_CYPER_CHUNK_SIZE { match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { this.buf.extend(bytes); } Some(Err(e)) => { return Poll::Ready(Some(Err(e))); } None => break, } } if this.buf.is_empty() { return Poll::Ready(None); } let chunk = this.buf.take_max(STREAM_ENC_CYPER_CHUNK_SIZE); // TODO: use decrypt_last for last chunk let res = this.cipher.as_mut().unwrap().decrypt_next(chunk.as_ref()); match res { Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))), Err(_) => Poll::Ready(Some(Err(std::io::Error::new( std::io::ErrorKind::Other, "Decryption failed", )))), } } }