diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/s3/copy.rs | 20 | ||||
-rw-r--r-- | src/api/s3/encryption.rs | 22 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 37 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 3 | ||||
-rw-r--r-- | src/api/s3/put.rs | 17 |
5 files changed, 78 insertions, 21 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 880ce5f4..b370b709 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -24,6 +24,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::multipart; use crate::s3::put::get_headers; @@ -43,6 +44,13 @@ pub async fn handle_copy( let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; + let (source_encryption, _source_object_headers) = + EncryptionParams::check_decrypt_for_copy_source( + &garage, + req, + &source_version_meta.encryption, + )?; + // Check precondition, e.g. x-amz-copy-source-if-match copy_precondition.check(source_version, &source_version_meta.etag)?; @@ -50,10 +58,18 @@ pub async fn handle_copy( let new_uuid = gen_uuid(); let new_timestamp = now_msec(); + let new_encryption = EncryptionParams::new_from_req(&garage, &req)?; + if source_encryption.is_encrypted() || new_encryption.is_encrypted() { + // TODO + unimplemented!("encryption for copyobject"); + } + // Implement x-amz-metadata-directive: REPLACE let new_meta = match req.headers().get("x-amz-metadata-directive") { Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { - headers: get_headers(req.headers())?, + encryption: ObjectVersionEncryption::Plaintext { + headers: get_headers(req.headers())?, + }, size: source_version_meta.size, etag: source_version_meta.etag.clone(), }, @@ -96,7 +112,7 @@ pub async fn handle_copy( uuid: new_uuid, timestamp: new_timestamp, state: ObjectVersionState::Uploading { - headers: new_meta.headers.clone(), + encryption: new_meta.encryption.clone(), multipart: false, }, }; diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs index 29b26a37..f13333b2 100644 --- a/src/api/s3/encryption.rs +++ b/src/api/s3/encryption.rs @@ -37,14 +37,14 @@ const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = HeaderName::from_static("x-amz-server-side-encryption-customer-key"); const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = - HeaderName::from_static("x-amz-server-side-encryption-customer-key-MD5"); + HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName = HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key"); const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName = - HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-MD5"); + HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5"); const CUSTOMER_ALGORITHM_AES256: HeaderValue = HeaderValue::from_static("AES256"); @@ -52,8 +52,7 @@ type StreamNonce = aes_gcm::aead::stream::Nonce<Aes256Gcm, StreamLE31<Aes256Gcm> type StreamNonceSize = aes_gcm::aead::stream::NonceSize<Aes256Gcm, StreamLE31<Aes256Gcm>>; const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes -const STREAM_ENC_CYPER_CHUNK_SIZE: usize = - STREAM_ENC_CYPER_CHUNK_SIZE + <Aes256Gcm as AeadCore>::TagSize::to_usize(); +const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16; #[derive(Clone, Copy)] pub enum EncryptionParams { @@ -65,6 +64,10 @@ pub enum EncryptionParams { } impl EncryptionParams { + pub fn is_encrypted(&self) -> bool { + matches!(self, Self::SseC { .. }) + } + pub fn new_from_req( garage: &Garage, req: &Request<impl Body>, @@ -117,7 +120,7 @@ impl EncryptionParams { key: Option<Key<Aes256Gcm>>, obj_enc: &'a ObjectVersionEncryption, ) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> { - match (key, obj_enc) { + match (key, &obj_enc) { ( Some(client_key), ObjectVersionEncryption::SseC { @@ -136,7 +139,7 @@ impl EncryptionParams { let plaintext = enc.decrypt_blob(&headers)?; let headers = ObjectVersionHeaders::decode(&plaintext) .ok_or_internal_error("Could not decode encrypted headers")?; - Ok((enc, Cow::Borrowed(&headers))) + Ok((enc, Cow::Owned(headers))) } (None, ObjectVersionEncryption::Plaintext { headers }) => { Ok((Self::Plaintext, Cow::Borrowed(headers))) @@ -157,8 +160,7 @@ impl EncryptionParams { ) -> Result<ObjectVersionEncryption, Error> { match self { Self::SseC { - client_key, - compression_level, + compression_level, .. } => { let plaintext = h.encode().map_err(GarageError::from)?; let ciphertext = self.encrypt_blob(&plaintext)?; @@ -367,7 +369,7 @@ impl Stream for DecryptStream { if this.buf.len() >= nonce_size { let nonce = this.buf.take_exact(nonce_size).unwrap(); let nonce = Nonce::from_slice(nonce.as_ref()); - *this.cipher = Some(DecryptorLE31::new(&self.key, nonce)); + *this.cipher = Some(DecryptorLE31::new(&this.key, nonce)); break; } @@ -405,7 +407,7 @@ impl Stream for DecryptStream { let chunk = this.buf.take_max(STREAM_ENC_CYPER_CHUNK_SIZE); // TODO: use decrypt_last for last chunk - let res = this.cipher.as_ref().unwrap().decrypt_next(chunk.as_ref()); + let res = this.cipher.as_mut().unwrap().decrypt_next(chunk.as_ref()); match res { Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))), Err(_) => Poll::Ready(Some(Err(std::io::Error::new( diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 5959bcd6..a7c549f6 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -17,6 +17,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -38,13 +39,21 @@ pub async fn handle_create_multipart_upload( let headers = get_headers(req.headers())?; + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_req(&garage, &req)?; + if encryption.is_encrypted() { + // TODO + unimplemented!("multipart upload encryption"); + } + let object_encryption = encryption.encrypt_headers(headers)?; + // Create object in object table let object_version = ObjectVersion { uuid: upload_id, timestamp, state: ObjectVersionState::Uploading { multipart: true, - headers, + encryption: object_encryption, }, }; let object = Object::new(bucket_id, key.to_string(), vec![object_version]); @@ -87,14 +96,30 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let stream = body_stream(req.into_body()); + let (req_head, req_body) = req.into_parts(); + let stream = body_stream(req_body); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = futures::try_join!( + let ((_, object_version, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), chunker.next(), )?; + // Check encryption params + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, + _ => unreachable!(), + }; + let (encryption, _) = EncryptionParams::check_decrypt_for_get( + &garage, + &Request::from_parts(req_head, empty_body::<Error>()), + &object_encryption, + )?; + if encryption.is_encrypted() { + // TODO + unimplemented!("encryption for mpu"); + } + // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -235,8 +260,8 @@ pub async fn handle_complete_multipart_upload( return Err(Error::bad_request("No data was uploaded")); } - let headers = match object_version.state { - ObjectVersionState::Uploading { headers, .. } => headers, + let object_encryption = match object_version.state { + ObjectVersionState::Uploading { encryption, .. } => encryption, _ => unreachable!(), }; @@ -338,7 +363,7 @@ pub async fn handle_complete_multipart_upload( // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, etag: etag.clone(), }, diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index bca8d6c6..0b4ce6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -18,6 +18,7 @@ use garage_model::garage::Garage; use crate::helpers::*; use crate::s3::api_server::ResBody; use crate::s3::cors::*; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; @@ -232,10 +233,12 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; + // TODO: encryption let stream = field.map(|r| r.map_err(Into::into)); let (_, md5) = save_stream( garage, headers, + EncryptionParams::Plaintext, // TODO StreamLimiter::new(stream, conditions.content_length), &bucket, &key, diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c1af513c..fdcd5aca 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -37,6 +37,7 @@ use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; +use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; @@ -52,6 +53,9 @@ pub async fn handle_put( let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); + // Determine whether object should be encrypted, and if so the key + let encryption = EncryptionParams::new_from_req(&garage, &req)?; + let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, @@ -62,6 +66,7 @@ pub async fn handle_put( save_stream( garage, headers, + encryption, stream, bucket, key, @@ -75,6 +80,7 @@ pub async fn handle_put( pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: Arc<Garage>, headers: ObjectVersionHeaders, + encryption: EncryptionParams, body: S, bucket: &Bucket, key: &String, @@ -92,6 +98,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let first_block = first_block_opt.unwrap_or_default(); + let object_encryption = encryption.encrypt_headers(headers)?; + if encryption.is_encrypted() { + unimplemented!("encryption in putobject"); + } + // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); @@ -121,7 +132,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { - headers, + encryption: object_encryption, size, etag: data_md5sum_hex.clone(), }, @@ -152,7 +163,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Uploading { - headers: headers.clone(), + encryption: object_encryption.clone(), multipart: false, }, }; @@ -190,7 +201,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { - headers, + encryption: object_encryption, size: total_size, etag: md5sum_hex.clone(), }, |