aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs78
1 files changed, 59 insertions, 19 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 93da2110..40856b02 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
+
+ pub(crate) insert_queue: db::Tree,
+ pub(crate) insert_queue_notify: Notify,
+
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
@@ -53,9 +57,13 @@ where
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
+ let insert_queue = db
+ .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .expect("Unable to open insert queue DB tree");
+
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
- .expect("Unable to open DB tree");
+ .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(
@@ -74,6 +82,8 @@ where
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
+ insert_queue,
+ insert_queue_notify: Notify::new(),
gc_todo,
metrics,
})
@@ -173,9 +183,8 @@ where
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- self.update_entry_with(&tree_key[..], |ent| match ent {
+ self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
Some(mut ent) => {
ent.merge(&update);
ent
@@ -187,11 +196,14 @@ where
pub fn update_entry_with(
&self,
- tree_key: &[u8],
+ partition_key: &F::P,
+ sort_key: &F::S,
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
+ let tree_key = self.tree_key(partition_key, sort_key);
+
let changed = self.store.db().transaction(|mut tx| {
- let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
@@ -200,23 +212,23 @@ where
None => (None, None, f(None)),
};
- // Scenario 1: the value changed, so of course there is a change
- let value_changed = Some(&new_entry) != old_entry.as_ref();
-
+ // Changed can be true in two scenarios
+ // Scenario 1: the actual represented value changed,
+ // so of course the messagepack encoding changed as well
// Scenario 2: the value didn't change but due to a migration in the
- // data format, the messagepack encoding changed. In this case
- // we have to write the migrated value in the table and update
- // the associated Merkle tree entry.
+ // data format, the messagepack encoding changed. In this case,
+ // we also have to write the migrated value in the table and update
+ // the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
- let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
drop(old_bytes);
- if value_changed || encoding_changed {
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
- tx.insert(&self.store, tree_key, new_bytes)?;
+ if changed {
+ let new_bytes_hash = blake2sum(&new_bytes);
+ tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
@@ -242,7 +254,7 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
- GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
+ GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
}
@@ -258,10 +270,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -285,10 +298,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -302,6 +316,32 @@ where
Ok(removed)
}
+ // ---- Insert queue functions ----
+
+ pub(crate) fn queue_insert(
+ &self,
+ tx: &mut db::Transaction,
+ ins: &F::E,
+ ) -> db::TxResult<(), Error> {
+ let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
+
+ let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
+ Some(old_v) => {
+ let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
+ entry.merge(ins);
+ rmp_to_vec_all_named(&entry)
+ }
+ None => rmp_to_vec_all_named(ins),
+ };
+ let new_entry = new_entry
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+ self.insert_queue_notify.notify_one();
+
+ Ok(())
+ }
+
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {