diff options
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 64 |
1 files changed, 59 insertions, 5 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index fa6fbc1..f9ba756 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -168,7 +168,65 @@ impl IStore for GarageStore { Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - unimplemented!(); + let del_op = match select { + Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: None, + start: Some(sort_begin), + end: Some(sort_end), + single_item: false, + }], + Selector::List(row_ref_list) => { + // Insert null values with causality token = delete + let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }).collect::<Vec<_>>(); + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: Some(sort_prefix), + start: None, + end: None, + single_item: false, + }], + Selector::Single(row_ref) => { + // Insert null values with causality token = delete + let batch_op = vec![k2v_client::BatchInsertOp { + partition_key: &row_ref.uid.shard, + sort_key: &row_ref.uid.sort, + causality: row_ref.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }]; + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + }; + + // Finally here we only have prefix & range + match self.k2v.delete_batch(&del_op).await { + Err(e) => { + tracing::error!("delete batch error: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + } } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { @@ -223,10 +281,6 @@ impl IStore for GarageStore { } } - async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { - unimplemented!(); - } - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { let maybe_out = self.s3 .get_object() |