aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs67
1 files changed, 55 insertions, 12 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 5e7314d2..91607f7a 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -1,3 +1,4 @@
+use core::borrow::Borrow;
use std::sync::Arc;
use log::warn;
@@ -17,6 +18,7 @@ pub struct TableData<F: TableSchema> {
pub instance: F,
pub store: sled::Tree,
+ pub gc_todo: sled::Tree,
pub merkle_updater: Arc<MerkleUpdater>,
}
@@ -41,6 +43,10 @@ where
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
+ let gc_todo = db
+ .open_tree(&format!("{}:gc_todo", name))
+ .expect("Unable to open DB tree");
+
let merkle_updater = MerkleUpdater::launch(
name.clone(),
background,
@@ -52,6 +58,7 @@ where
name,
instance,
store,
+ gc_todo,
merkle_updater,
})
}
@@ -103,10 +110,17 @@ where
}
// Mutation functions
-
- pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
+ // When changing this code, take care of propagating modifications correctly:
+ // - When an entry is modified or deleted, call the updated() function
+ // on the table instance
+ // - When an entry is modified or deleted, add it to the merkle updater's todo list.
+ // This has to be done atomically with the modification for the merkle updater
+ // to maintain consistency. The merkle updater must then be notified with todo_notify.
+ // - When an entry is updated to be a tombstone, add it to the gc_todo tree
+
+ pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
for update_bytes in entries.iter() {
- self.update_entry(update_bytes.as_slice())?;
+ self.update_entry(update_bytes.borrow().as_slice())?;
}
Ok(())
}
@@ -115,8 +129,8 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
+ let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
Some(prev_bytes) => {
let old_entry = self
.decode_entry(&prev_bytes)
@@ -132,27 +146,32 @@ where
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry)))
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry)) = changed {
+ if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
self.merkle_updater.todo_notify.notify();
+ if is_tombstone {
+ self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
+ }
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
- if let Some(cur_v) = txn.get(k)? {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
if cur_v == v {
- txn.remove(k)?;
+ store.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(true);
}
@@ -168,6 +187,30 @@ where
Ok(removed)
}
+ pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
+ }
+ }
+ Ok(None)
+ })?;
+
+ if let Some(old_v) = removed {
+ let old_entry = self.decode_entry(&old_v[..])?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_updater.todo_notify.notify();
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ // ---- Utility functions ----
+
pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());