diff options
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 67 |
1 files changed, 55 insertions, 12 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 5e7314d2..91607f7a 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,3 +1,4 @@ +use core::borrow::Borrow; use std::sync::Arc; use log::warn; @@ -17,6 +18,7 @@ pub struct TableData<F: TableSchema> { pub instance: F, pub store: sled::Tree, + pub gc_todo: sled::Tree, pub merkle_updater: Arc<MerkleUpdater>, } @@ -41,6 +43,10 @@ where .open_tree(&format!("{}:merkle_tree", name)) .expect("Unable to open DB Merkle tree tree"); + let gc_todo = db + .open_tree(&format!("{}:gc_todo", name)) + .expect("Unable to open DB tree"); + let merkle_updater = MerkleUpdater::launch( name.clone(), background, @@ -52,6 +58,7 @@ where name, instance, store, + gc_todo, merkle_updater, }) } @@ -103,10 +110,17 @@ where } // Mutation functions - - pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { + // When changing this code, take care of propagating modifications correctly: + // - When an entry is modified or deleted, call the updated() function + // on the table instance + // - When an entry is modified or deleted, add it to the merkle updater's todo list. + // This has to be done atomically with the modification for the merkle updater + // to maintain consistency. The merkle updater must then be notified with todo_notify. + // - When an entry is updated to be a tombstone, add it to the gc_todo tree + + pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> { for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; + self.update_entry(update_bytes.borrow().as_slice())?; } Ok(()) } @@ -115,8 +129,8 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { Some(prev_bytes) => { let old_entry = self .decode_entry(&prev_bytes) @@ -132,27 +146,32 @@ where let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry)) = changed { + if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); self.merkle_updater.todo_notify.notify(); + if is_tombstone { + self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + } } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { if cur_v == v { - txn.remove(k)?; + store.remove(k)?; mkl_todo.insert(k, vec![])?; return Ok(true); } @@ -168,6 +187,30 @@ where Ok(removed) } + pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); + } + } + Ok(None) + })?; + + if let Some(old_v) = removed { + let old_entry = self.decode_entry(&old_v[..])?; + self.instance.updated(Some(old_entry), None); + self.merkle_updater.todo_notify.notify(); + Ok(true) + } else { + Ok(false) + } + } + + // ---- Utility functions ---- + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); |