diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:09 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:09 +0100 |
commit | 54c9736a247bb3534a285caa637c9afb052bc2dd (patch) | |
tree | 91efb131a2f2bb65ec07a202031927d476f78e09 /src/storage/garage.rs | |
parent | 477a784e45d07d414fea77cf5b49ee241dc01f65 (diff) | |
download | aerogramme-54c9736a247bb3534a285caa637c9afb052bc2dd.tar.gz aerogramme-54c9736a247bb3534a285caa637c9afb052bc2dd.zip |
implemente garage storage
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 64 |
1 files changed, 59 insertions, 5 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index fa6fbc1..f9ba756 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -168,7 +168,65 @@ impl IStore for GarageStore { Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - unimplemented!(); + let del_op = match select { + Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: None, + start: Some(sort_begin), + end: Some(sort_end), + single_item: false, + }], + Selector::List(row_ref_list) => { + // Insert null values with causality token = delete + let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }).collect::<Vec<_>>(); + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: Some(sort_prefix), + start: None, + end: None, + single_item: false, + }], + Selector::Single(row_ref) => { + // Insert null values with causality token = delete + let batch_op = vec![k2v_client::BatchInsertOp { + partition_key: &row_ref.uid.shard, + sort_key: &row_ref.uid.sort, + causality: row_ref.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }]; + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + }; + + // Finally here we only have prefix & range + match self.k2v.delete_batch(&del_op).await { + Err(e) => { + tracing::error!("delete batch error: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + } } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { @@ -223,10 +281,6 @@ impl IStore for GarageStore { } } - async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { - unimplemented!(); - } - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { let maybe_out = self.s3 .get_object() |