From f8e528c15de0c9d31c16e5cd8e58f99f4132f103 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 10:48:49 +0100 Subject: Small refactor of tables internals --- src/table/data.rs | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) (limited to 'src/table/data.rs') diff --git a/src/table/data.rs b/src/table/data.rs index 93da2110..cae18999 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -173,9 +173,8 @@ where pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - self.update_entry_with(&tree_key[..], |ent| match ent { + self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { Some(mut ent) => { ent.merge(&update); ent @@ -187,11 +186,14 @@ where pub fn update_entry_with( &self, - tree_key: &[u8], + partition_key: &F::P, + sort_key: &F::S, f: impl Fn(Option) -> F::E, ) -> Result, Error> { + let tree_key = self.tree_key(partition_key, sort_key); + let changed = self.store.db().transaction(|mut tx| { - let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); @@ -200,23 +202,23 @@ where None => (None, None, f(None)), }; - // Scenario 1: the value changed, so of course there is a change - let value_changed = Some(&new_entry) != old_entry.as_ref(); - + // Changed can be true in two scenarios + // Scenario 1: the actual represented value changed, + // so of course the messagepack encoding changed as well // Scenario 2: the value didn't change but due to a migration in the - // data format, the messagepack encoding changed. In this case - // we have to write the migrated value in the table and update - // the associated Merkle tree entry. + // data format, the messagepack encoding changed. In this case, + // we also have to write the migrated value in the table and update + // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; - let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); drop(old_bytes); - if value_changed || encoding_changed { - let new_bytes_hash = blake2sum(&new_bytes[..]); - tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; - tx.insert(&self.store, tree_key, new_bytes)?; + if changed { + let new_bytes_hash = blake2sum(&new_bytes); + tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, &tree_key, new_bytes)?; self.instance .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; @@ -242,7 +244,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } @@ -258,10 +260,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } @@ -285,10 +288,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } -- cgit v1.2.3 From 83c8467e23c1f531ae233766d5dc7244afe57f08 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 11:58:06 +0100 Subject: Proper queueing for delayed inserts, now backed to disk --- src/table/data.rs | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) (limited to 'src/table/data.rs') diff --git a/src/table/data.rs b/src/table/data.rs index cae18999..e3b8c93f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -306,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)? + } + None => rmp_to_vec_all_named(ins) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?, + }; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { -- cgit v1.2.3 From d4af27f920ce48a60f2073e98b17bdf963241686 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 13:54:21 +0100 Subject: Add missing notify --- src/table/data.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/table/data.rs') diff --git a/src/table/data.rs b/src/table/data.rs index e3b8c93f..bf2bf88b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -338,6 +338,7 @@ where .map_err(db::TxError::Abort)?, }; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) } -- cgit v1.2.3 From 13c86621267272af5bc89ec037d097739dae9aaf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:16:55 +0100 Subject: factorize --- src/table/data.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/table/data.rs') diff --git a/src/table/data.rs b/src/table/data.rs index bf2bf88b..40856b02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -330,13 +330,12 @@ where let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)? } - None => rmp_to_vec_all_named(ins) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)?, + None => rmp_to_vec_all_named(ins), }; + let new_entry = new_entry + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; self.insert_queue_notify.notify_one(); -- cgit v1.2.3