aboutsummaryrefslogtreecommitdiff
path: root/src/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage')
-rw-r--r--src/storage/garage.rs83
-rw-r--r--src/storage/in_memory.rs2
-rw-r--r--src/storage/mod.rs2
3 files changed, 80 insertions, 7 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 97494f6..ec26e80 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -117,20 +117,93 @@ impl IStore for GarageStore {
}
};
+ tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0);
Ok(BlobVal::new(blob_ref.clone(), buffer))
}
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
- unimplemented!();
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
+ let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
+
+ let maybe_send = self.s3
+ .put_object()
+ .bucket(self.s3_bucket.to_string())
+ .key(blob_val.blob_ref.0.to_string())
+ .body(streamable_value)
+ .send()
+ .await;
+
+ match maybe_send {
+ Err(e) => {
+ tracing::error!("unable to send object: {}", e);
+ Err(StorageError::Internal)
+ }
+ Ok(_) => {
+ tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0);
+ Ok(())
+ }
+ }
}
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
- unimplemented!();
+ let maybe_copy = self.s3
+ .copy_object()
+ .bucket(self.s3_bucket.to_string())
+ .key(dst.0.clone())
+ .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone()))
+ .send()
+ .await;
+
+ match maybe_copy {
+ Err(e) => {
+ tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => {
+ tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket);
+ Ok(())
+ }
+ }
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- unimplemented!();
+ let maybe_list = self.s3
+ .list_objects_v2()
+ .bucket(self.s3_bucket.to_string())
+ .prefix(prefix)
+ .into_paginator()
+ .send()
+ .try_collect()
+ .await;
+
+ match maybe_list {
+ Err(e) => {
+ tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e);
+ Err(StorageError::Internal)
+ }
+ Ok(pagin_list_out) => Ok(pagin_list_out
+ .into_iter()
+ .map(|list_out| list_out.contents.unwrap_or(vec![]))
+ .flatten()
+ .map(|obj| BlobRef(obj.key.unwrap_or(String::new())))
+ .collect::<Vec<_>>()),
+ }
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- unimplemented!();
+ let maybe_delete = self.s3
+ .delete_object()
+ .bucket(self.s3_bucket.to_string())
+ .key(blob_ref.0.clone())
+ .send()
+ .await;
+
+ match maybe_delete {
+ Err(e) => {
+ tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => {
+ tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket);
+ Ok(())
+ }
+ }
}
}
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index 723bca0..d764da1 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -278,7 +278,7 @@ impl IStore for MemStore {
let store = self.blob.read().or(Err(StorageError::Internal))?;
store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
}
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 10149e9..1b1faad 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -148,7 +148,7 @@ pub trait IStore {
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>;
+ async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>;
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;