diff options
author | Alex Auvolat <alex@adnab.me> | 2022-12-14 11:58:06 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-12-14 11:58:06 +0100 |
commit | 83c8467e23c1f531ae233766d5dc7244afe57f08 (patch) | |
tree | d2f959bccc6c779917ce64bee2b2f1cc236164db /src/table/data.rs | |
parent | f8e528c15de0c9d31c16e5cd8e58f99f4132f103 (diff) | |
download | garage-83c8467e23c1f531ae233766d5dc7244afe57f08.tar.gz garage-83c8467e23c1f531ae233766d5dc7244afe57f08.zip |
Proper queueing for delayed inserts, now backed to disk
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index cae18999..e3b8c93f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -306,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)? + } + None => rmp_to_vec_all_named(ins) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?, + }; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { |