diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-22 19:32:07 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-22 19:32:07 +0100 |
commit | 0f7764d9f05b3fccfa30ddebb52997200af13bf2 (patch) | |
tree | 14c938e5bc2edeb4814290ffb0135b600d0bacf1 /src/storage | |
parent | 1057661da77c3d94e5a1ff51ab7fc58ecdb6a53a (diff) | |
download | aerogramme-0f7764d9f05b3fccfa30ddebb52997200af13bf2.tar.gz aerogramme-0f7764d9f05b3fccfa30ddebb52997200af13bf2.zip |
s3 is now implemented
Diffstat (limited to 'src/storage')
-rw-r--r-- | src/storage/garage.rs | 83 | ||||
-rw-r--r-- | src/storage/in_memory.rs | 2 | ||||
-rw-r--r-- | src/storage/mod.rs | 2 |
3 files changed, 80 insertions, 7 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 97494f6..ec26e80 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -117,20 +117,93 @@ impl IStore for GarageStore { } }; + tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0); Ok(BlobVal::new(blob_ref.clone(), buffer)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { - unimplemented!(); + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + + let maybe_send = self.s3 + .put_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_val.blob_ref.0.to_string()) + .body(streamable_value) + .send() + .await; + + match maybe_send { + Err(e) => { + tracing::error!("unable to send object: {}", e); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0); + Ok(()) + } + } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_copy = self.s3 + .copy_object() + .bucket(self.s3_bucket.to_string()) + .key(dst.0.clone()) + .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone())) + .send() + .await; + + match maybe_copy { + Err(e) => { + tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket); + Ok(()) + } + } } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - unimplemented!(); + let maybe_list = self.s3 + .list_objects_v2() + .bucket(self.s3_bucket.to_string()) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await; + + match maybe_list { + Err(e) => { + tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e); + Err(StorageError::Internal) + } + Ok(pagin_list_out) => Ok(pagin_list_out + .into_iter() + .map(|list_out| list_out.contents.unwrap_or(vec![])) + .flatten() + .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) + .collect::<Vec<_>>()), + } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_delete = self.s3 + .delete_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_ref.0.clone()) + .send() + .await; + + match maybe_delete { + Err(e) => { + tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket); + Ok(()) + } + } } } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 723bca0..d764da1 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -278,7 +278,7 @@ impl IStore for MemStore { let store = self.blob.read().or(Err(StorageError::Internal))?; store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let mut store = self.blob.write().or(Err(StorageError::Internal))?; let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 10149e9..1b1faad 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -148,7 +148,7 @@ pub trait IStore { async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>; async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>; - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>; + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; |