aboutsummaryrefslogtreecommitdiff
path: root/src/storage/garage.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r--src/storage/garage.rs64
1 files changed, 59 insertions, 5 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index fa6fbc1..f9ba756 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -168,7 +168,65 @@ impl IStore for GarageStore {
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- unimplemented!();
+ let del_op = match select {
+ Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: None,
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ single_item: false,
+ }],
+ Selector::List(row_ref_list) => {
+ // Insert null values with causality token = delete
+ let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.uid.shard,
+ sort_key: &v.uid.sort,
+ causality: v.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ }).collect::<Vec<_>>();
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ };
+ },
+ Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: Some(sort_prefix),
+ start: None,
+ end: None,
+ single_item: false,
+ }],
+ Selector::Single(row_ref) => {
+ // Insert null values with causality token = delete
+ let batch_op = vec![k2v_client::BatchInsertOp {
+ partition_key: &row_ref.uid.shard,
+ sort_key: &row_ref.uid.sort,
+ causality: row_ref.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ }];
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ };
+ },
+ };
+
+ // Finally here we only have prefix & range
+ match self.k2v.delete_batch(&del_op).await {
+ Err(e) => {
+ tracing::error!("delete batch error: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ }
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
@@ -223,10 +281,6 @@ impl IStore for GarageStore {
}
}
- async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
- unimplemented!();
- }
-
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
let maybe_out = self.s3
.get_object()