aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-27 14:58:09 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-27 14:58:09 +0100
commit54c9736a247bb3534a285caa637c9afb052bc2dd (patch)
tree91efb131a2f2bb65ec07a202031927d476f78e09
parent477a784e45d07d414fea77cf5b49ee241dc01f65 (diff)
downloadaerogramme-54c9736a247bb3534a285caa637c9afb052bc2dd.tar.gz
aerogramme-54c9736a247bb3534a285caa637c9afb052bc2dd.zip
implemente garage storage
-rw-r--r--src/mail/incoming.rs2
-rw-r--r--src/mail/mailbox.rs2
-rw-r--r--src/storage/garage.rs64
-rw-r--r--src/storage/in_memory.rs60
-rw-r--r--src/storage/mod.rs7
5 files changed, 95 insertions, 40 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index b17959a..7e33a9a 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
- match storage.row_rm_single(&ct).await {
+ match storage.row_rm(&storage::Selector::Single(&ct)).await {
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
Ok(_) => (),
};
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index c925f39..6fb7dea 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -365,7 +365,7 @@ impl MailboxInternal {
.row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk)))
.await?;
if let Some(row_val) = res.into_iter().next() {
- self.storage.row_rm_single(&row_val.row_ref).await?;
+ self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?;
}
Ok::<_, anyhow::Error>(())
}
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index fa6fbc1..f9ba756 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -168,7 +168,65 @@ impl IStore for GarageStore {
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- unimplemented!();
+ let del_op = match select {
+ Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: None,
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ single_item: false,
+ }],
+ Selector::List(row_ref_list) => {
+ // Insert null values with causality token = delete
+ let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.uid.shard,
+ sort_key: &v.uid.sort,
+ causality: v.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ }).collect::<Vec<_>>();
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ };
+ },
+ Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
+ partition_key: shard,
+ prefix: Some(sort_prefix),
+ start: None,
+ end: None,
+ single_item: false,
+ }],
+ Selector::Single(row_ref) => {
+ // Insert null values with causality token = delete
+ let batch_op = vec![k2v_client::BatchInsertOp {
+ partition_key: &row_ref.uid.shard,
+ sort_key: &row_ref.uid.sort,
+ causality: row_ref.causality.clone().map(|ct| ct.into()),
+ value: k2v_client::K2vValue::Tombstone,
+ }];
+
+ return match self.k2v.insert_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("Unable to delete the list of values: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ };
+ },
+ };
+
+ // Finally here we only have prefix & range
+ match self.k2v.delete_batch(&del_op).await {
+ Err(e) => {
+ tracing::error!("delete batch error: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(_) => Ok(()),
+ }
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
@@ -223,10 +281,6 @@ impl IStore for GarageStore {
}
}
- async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
- unimplemented!();
- }
-
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
let maybe_out = self.s3
.get_object()
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index d764da1..ee7c9a6 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -137,6 +137,32 @@ fn prefix_last_bound(prefix: &str) -> Bound<String> {
}
}
+impl MemStore {
+ fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%entry, command="row_rm_single");
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ let shard = &entry.uid.shard;
+ let sort = &entry.uid.sort;
+
+ let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let bt = store.entry(shard.to_string()).or_default();
+ let intval = bt.entry(sort.to_string()).or_default();
+
+ if cauz == intval.version {
+ intval.data.clear();
+ }
+ intval.data.push(InternalData::Tombstone);
+ intval.version += 1;
+ intval.change.notify_waiters();
+
+ Ok(())
+ }
+}
+
#[async_trait]
impl IStore for MemStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
@@ -183,37 +209,17 @@ impl IStore for MemStore {
}
}
- async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%entry, command="row_rm_single");
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- let shard = &entry.uid.shard;
- let sort = &entry.uid.sort;
-
- let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let bt = store.entry(shard.to_string()).or_default();
- let intval = bt.entry(sort.to_string()).or_default();
-
- if cauz == intval.version {
- intval.data.clear();
- }
- intval.data.push(InternalData::Tombstone);
- intval.version += 1;
- intval.change.notify_waiters();
-
- Ok(())
- }
-
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
tracing::trace!(select=%select, command="row_rm");
- //@FIXME not efficient at all...
- let values = self.row_fetch(select).await?;
+
+ let values = match select {
+ Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(),
+ Selector::List(rlist) => rlist.clone(),
+ Selector::Single(row_ref) => vec![(*row_ref).clone()],
+ };
for v in values.into_iter() {
- self.row_rm_single(&v.row_ref).await?;
+ self.row_rm_single(&v)?;
}
Ok(())
}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 1b1faad..c81ffe4 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -90,11 +90,6 @@ impl RowVal {
#[derive(Debug, Clone)]
pub struct BlobRef(pub String);
-impl BlobRef {
- pub fn new(key: &str) -> Self {
- Self(key.to_string())
- }
-}
impl std::fmt::Display for BlobRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlobRef({})", self.0)
@@ -125,6 +120,7 @@ impl BlobVal {
pub enum Selector<'a> {
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
List (Vec<RowRef>), // list of (shard_key, sort_key)
+ #[allow(dead_code)]
Prefix { shard: &'a str, sort_prefix: &'a str },
Single(&'a RowRef),
}
@@ -143,7 +139,6 @@ impl<'a> std::fmt::Display for Selector<'a> {
pub trait IStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
- async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError>;
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;