From 0f7764d9f05b3fccfa30ddebb52997200af13bf2 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 22 Dec 2023 19:32:07 +0100 Subject: s3 is now implemented --- src/bayou.rs | 2 +- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 2 +- src/storage/garage.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++--- src/storage/in_memory.rs | 2 +- src/storage/mod.rs | 2 +- 6 files changed, 83 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/bayou.rs b/src/bayou.rs index 3c525b3..1361e49 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -352,7 +352,7 @@ impl Bayou { storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), cryptoblob.into(), ); - self.storage.blob_insert(&blob_val).await?; + self.storage.blob_insert(blob_val).await?; // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 3eafac7..b17959a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -429,7 +429,7 @@ impl EncryptedMessage { storage::BlobRef(format!("incoming/{}", gen_ident())), self.encrypted_body.clone().into(), ).with_meta(MESSAGE_KEY.to_string(), key_header); - storage.blob_insert(&blob_val).await?; + storage.blob_insert(blob_val).await?; // Update watch key to signal new mail let watch_val = storage::RowVal::new( diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 60a91dd..c925f39 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -266,7 +266,7 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.storage.blob_insert(&BlobVal::new( + self.storage.blob_insert(BlobVal::new( BlobRef(format!("{}/{}", self.mail_path, ident)), message_blob, )).await?; diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 97494f6..ec26e80 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -117,20 +117,93 @@ impl IStore for GarageStore { } }; + tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0); Ok(BlobVal::new(blob_ref.clone(), buffer)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { - unimplemented!(); + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + + let maybe_send = self.s3 + .put_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_val.blob_ref.0.to_string()) + .body(streamable_value) + .send() + .await; + + match maybe_send { + Err(e) => { + tracing::error!("unable to send object: {}", e); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0); + Ok(()) + } + } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_copy = self.s3 + .copy_object() + .bucket(self.s3_bucket.to_string()) + .key(dst.0.clone()) + .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone())) + .send() + .await; + + match maybe_copy { + Err(e) => { + tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket); + Ok(()) + } + } } async fn blob_list(&self, prefix: &str) -> Result, StorageError> { - unimplemented!(); + let maybe_list = self.s3 + .list_objects_v2() + .bucket(self.s3_bucket.to_string()) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await; + + match maybe_list { + Err(e) => { + tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e); + Err(StorageError::Internal) + } + Ok(pagin_list_out) => Ok(pagin_list_out + .into_iter() + .map(|list_out| list_out.contents.unwrap_or(vec![])) + .flatten() + .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) + .collect::>()), + } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_delete = self.s3 + .delete_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_ref.0.clone()) + .send() + .await; + + match maybe_delete { + Err(e) => { + tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket); + Ok(()) + } + } } } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 723bca0..d764da1 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -278,7 +278,7 @@ impl IStore for MemStore { let store = self.blob.read().or(Err(StorageError::Internal))?; store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 10149e9..1b1faad 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -148,7 +148,7 @@ pub trait IStore { async fn row_poll(&self, value: &RowRef) -> Result; async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result; - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>; + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; async fn blob_list(&self, prefix: &str) -> Result, StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; -- cgit v1.2.3