diff options
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 83 |
1 files changed, 78 insertions, 5 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 97494f6..ec26e80 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -117,20 +117,93 @@ impl IStore for GarageStore { } }; + tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0); Ok(BlobVal::new(blob_ref.clone(), buffer)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { - unimplemented!(); + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + + let maybe_send = self.s3 + .put_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_val.blob_ref.0.to_string()) + .body(streamable_value) + .send() + .await; + + match maybe_send { + Err(e) => { + tracing::error!("unable to send object: {}", e); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0); + Ok(()) + } + } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_copy = self.s3 + .copy_object() + .bucket(self.s3_bucket.to_string()) + .key(dst.0.clone()) + .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone())) + .send() + .await; + + match maybe_copy { + Err(e) => { + tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket); + Ok(()) + } + } } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - unimplemented!(); + let maybe_list = self.s3 + .list_objects_v2() + .bucket(self.s3_bucket.to_string()) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await; + + match maybe_list { + Err(e) => { + tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e); + Err(StorageError::Internal) + } + Ok(pagin_list_out) => Ok(pagin_list_out + .into_iter() + .map(|list_out| list_out.contents.unwrap_or(vec![])) + .flatten() + .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) + .collect::<Vec<_>>()), + } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let maybe_delete = self.s3 + .delete_object() + .bucket(self.s3_bucket.to_string()) + .key(blob_ref.0.clone()) + .send() + .await; + + match maybe_delete { + Err(e) => { + tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e); + Err(StorageError::Internal) + }, + Ok(_) => { + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket); + Ok(()) + } + } } } |