aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-14 11:58:06 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-14 11:58:06 +0100
commit83c8467e23c1f531ae233766d5dc7244afe57f08 (patch)
treed2f959bccc6c779917ce64bee2b2f1cc236164db /src/table/data.rs
parentf8e528c15de0c9d31c16e5cd8e58f99f4132f103 (diff)
downloadgarage-83c8467e23c1f531ae233766d5dc7244afe57f08.tar.gz
garage-83c8467e23c1f531ae233766d5dc7244afe57f08.zip
Proper queueing for delayed inserts, now backed to disk
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs38
1 files changed, 37 insertions, 1 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index cae18999..e3b8c93f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
+
+ pub(crate) insert_queue: db::Tree,
+ pub(crate) insert_queue_notify: Notify,
+
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
@@ -53,9 +57,13 @@ where
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
+ let insert_queue = db
+ .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .expect("Unable to open insert queue DB tree");
+
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
- .expect("Unable to open DB tree");
+ .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(
@@ -74,6 +82,8 @@ where
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
+ insert_queue,
+ insert_queue_notify: Notify::new(),
gc_todo,
metrics,
})
@@ -306,6 +316,32 @@ where
Ok(removed)
}
+ // ---- Insert queue functions ----
+
+ pub(crate) fn queue_insert(
+ &self,
+ tx: &mut db::Transaction,
+ ins: &F::E,
+ ) -> db::TxResult<(), Error> {
+ let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
+
+ let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
+ Some(old_v) => {
+ let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
+ entry.merge(ins);
+ rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?
+ }
+ None => rmp_to_vec_all_named(ins)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?,
+ };
+ tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+
+ Ok(())
+ }
+
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {