aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-09-21 14:03:35 +0000
committerAlex <alex@adnab.me>2023-09-21 14:03:35 +0000
commit1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f (patch)
tree5b5f8d66637c4a10866b00e07a45081c93cf75cf /src
parentfd7d8fec59c617b40e480ff855894cf35fdcfb40 (diff)
parent0635250b2bdcce4156704128de154f9052d34e9e (diff)
downloadgarage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.tar.gz
garage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.zip
Merge pull request 'Refactor db transactions and add on_commit for table.queue_insert' (#637) from k2v-indices-lmdb into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/637
Diffstat (limited to 'src')
-rw-r--r--src/block/rc.rs4
-rw-r--r--src/db/counted_tree_hack.rs6
-rw-r--r--src/db/lib.rs75
-rw-r--r--src/db/lmdb_adapter.rs10
-rw-r--r--src/db/sled_adapter.rs12
-rw-r--r--src/db/sqlite_adapter.rs10
-rw-r--r--src/db/test.rs8
-rw-r--r--src/garage/tests/common/garage.rs2
-rw-r--r--src/garage/tests/k2v/item.rs9
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/model/s3/lifecycle_worker.rs8
-rw-r--r--src/table/data.rs24
-rw-r--r--src/table/merkle.rs4
-rw-r--r--src/table/queue.rs2
14 files changed, 91 insertions, 87 deletions
diff --git a/src/block/rc.rs b/src/block/rc.rs
index 94cb5eea..b6afb277 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -56,7 +56,7 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.db().transaction(|mut tx| {
+ self.rc.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
@@ -64,7 +64,7 @@ impl BlockRc {
}
_ => (),
};
- tx.commit(())
+ Ok(())
})?;
Ok(())
}
diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs
index bbe943a2..a4ce12e0 100644
--- a/src/db/counted_tree_hack.rs
+++ b/src/db/counted_tree_hack.rs
@@ -85,7 +85,7 @@ impl CountedTree {
let old_some = expected_old.is_some();
let new_some = new.is_some();
- let tx_res = self.0.tree.db().transaction(|mut tx| {
+ let tx_res = self.0.tree.db().transaction(|tx| {
let old_val = tx.get(&self.0.tree, &key)?;
let is_same = match (&old_val, &expected_old) {
(None, None) => true,
@@ -101,9 +101,9 @@ impl CountedTree {
tx.remove(&self.0.tree, &key)?;
}
}
- tx.commit(())
+ Ok(())
} else {
- tx.abort(())
+ Err(TxError::Abort(()))
}
});
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 22bd9364..fe44b01e 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -22,10 +22,15 @@ use std::sync::Arc;
use err_derive::Error;
+pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
+
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
-pub struct Transaction<'a>(&'a mut dyn ITx);
+pub struct Transaction<'a> {
+ tx: &'a mut dyn ITx,
+ on_commit: OnCommit,
+}
#[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize);
@@ -85,7 +90,7 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
let f = TxFn {
function: fun,
@@ -98,14 +103,17 @@ impl Db {
.expect("Transaction did not store result");
match tx_res {
- Ok(()) => {
- assert!(matches!(ret, Ok(_)));
- ret
- }
- Err(TxError::Abort(())) => {
- assert!(matches!(ret, Err(TxError::Abort(_))));
- ret
- }
+ Ok(on_commit) => match ret {
+ Ok(value) => {
+ on_commit.into_iter().for_each(|f| f());
+ Ok(value)
+ }
+ _ => unreachable!(),
+ },
+ Err(TxError::Abort(())) => match ret {
+ Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
+ _ => unreachable!(),
+ },
Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing
// transaction
@@ -139,7 +147,7 @@ impl Db {
let ex_tree = other.open_tree(&name)?;
- let tx_res = self.transaction(|mut tx| {
+ let tx_res = self.transaction(|tx| {
let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?;
@@ -149,7 +157,7 @@ impl Db {
println!("{}: imported {}", name, i);
}
}
- tx.commit(i)
+ Ok(i)
});
let total = match tx_res {
Err(TxError::Db(e)) => return Err(e),
@@ -249,11 +257,11 @@ impl Tree {
impl<'a> Transaction<'a> {
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.get(tree.1, key.as_ref())
+ self.tx.get(tree.1, key.as_ref())
}
#[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
- self.0.len(tree.1)
+ self.tx.len(tree.1)
}
/// Returns the old value if there was one
@@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
key: T,
value: U,
) -> TxOpResult<Option<Value>> {
- self.0.insert(tree.1, key.as_ref(), value.as_ref())
+ self.tx.insert(tree.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.remove(tree.1, key.as_ref())
+ self.tx.remove(tree.1, key.as_ref())
}
#[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter(tree.1)
+ self.tx.iter(tree.1)
}
#[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter_rev(tree.1)
+ self.tx.iter_rev(tree.1)
}
#[inline]
@@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
@@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
- // ----
-
#[inline]
- pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
- Err(TxError::Abort(e))
- }
-
- #[inline]
- pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
- Ok(r)
+ pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
+ self.on_commit.push(Box::new(f));
}
}
@@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
}
pub(crate) trait ITx {
@@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
}
pub(crate) enum TxFnResult {
- Ok,
+ Ok(OnCommit),
Abort,
DbErr,
}
struct TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
function: F,
result: Cell<Option<TxResult<R, E>>>,
@@ -395,12 +396,16 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
- let res = (self.function)(Transaction(tx));
+ let mut tx = Transaction {
+ tx,
+ on_commit: vec![],
+ };
+ let res = (self.function)(&mut tx);
let res2 = match &res {
- Ok(_) => TxFnResult::Ok,
+ Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr,
};
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index ecbc3b81..59fa132d 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -9,8 +9,8 @@ use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use heed;
@@ -186,7 +186,7 @@ impl IDb for LmdbDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let mut tx = LmdbTx {
trees: &trees.0[..],
@@ -199,9 +199,9 @@ impl IDb for LmdbDb {
let res = f.try_on(&mut tx);
match res {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 52393a95..84f2001b 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -10,8 +10,8 @@ use sled::transaction::{
};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use sled;
@@ -166,7 +166,7 @@ impl IDb for SledDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| {
let mut tx = SledTx {
@@ -174,9 +174,9 @@ impl IDb for SledDb {
err: Cell::new(None),
};
match f.try_on(&mut tx) {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
assert!(tx.err.into_inner().is_none());
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
assert!(tx.err.into_inner().is_none());
@@ -189,7 +189,7 @@ impl IDb for SledDb {
}
});
match res {
- Ok(()) => Ok(()),
+ Ok(on_commit) => Ok(on_commit),
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 63b4506e..9f967c66 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use rusqlite;
@@ -261,7 +261,7 @@ impl IDb for SqliteDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
trace!("transaction: lock db");
let mut this = self.0.lock().unwrap();
trace!("transaction: lock acquired");
@@ -277,9 +277,9 @@ impl IDb for SqliteDb {
trees: &this_mut_ref.trees,
};
let res = match f.try_on(&mut tx) {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
diff --git a/src/db/test.rs b/src/db/test.rs
index 40e6c41e..cd99eafa 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -13,26 +13,26 @@ fn test_suite(db: Db) {
assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
- let res = db.transaction::<_, (), _>(|mut tx| {
+ let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
- tx.commit(12)
+ Ok(12)
});
assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
- let res = db.transaction::<(), _, _>(|mut tx| {
+ let res = db.transaction::<(), _, _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
- tx.abort(42)
+ Err(TxError::Abort(42))
});
assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index cb1b7ea5..d1f0867a 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -52,7 +52,7 @@ impl Instance {
r#"
metadata_dir = "{path}/meta"
data_dir = "{path}/data"
-db_engine = "sled"
+db_engine = "lmdb"
replication_mode = "1"
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 25d9cce4..20add889 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -44,6 +44,7 @@ async fn test_items_and_indices() {
let content = format!("{}: hello world", sk).into_bytes();
let content2 = format!("{}: hello universe", sk).into_bytes();
let content3 = format!("{}: concurrent value", sk).into_bytes();
+ eprintln!("test iteration {}: {}", i, sk);
// Put initially, no causality token
let res = ctx
@@ -89,7 +90,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -158,7 +159,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content2);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -230,7 +231,7 @@ async fn test_items_and_indices() {
);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -299,7 +300,7 @@ async fn test_items_and_indices() {
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 35d6596d..a46c165f 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter
.db()
- .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
+ .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(local_counter_k);
}
@@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter
.db()
- .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
+ .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(counted_entry_k);
}
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 42e661eb..50d4283f 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -330,9 +330,7 @@ async fn process_object(
"Lifecycle: expiring 1 object in bucket {:?}",
object.bucket_id
);
- db.transaction(|mut tx| {
- garage.object_table.queue_insert(&mut tx, &deleted_object)
- })?;
+ db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?;
*objects_expired += 1;
}
}
@@ -365,9 +363,7 @@ async fn process_object(
);
let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
- db.transaction(|mut tx| {
- garage.object_table.queue_insert(&mut tx, &aborted_object)
- })?;
+ db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?;
*mpu_aborted += n_aborted;
}
}
diff --git a/src/table/data.rs b/src/table/data.rs
index 26101da4..bbfdf58b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree,
- pub(crate) insert_queue_notify: Notify,
+ pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: CountedTree,
@@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
merkle_todo,
merkle_todo_notify: Notify::new(),
insert_queue,
- insert_queue_notify: Notify::new(),
+ insert_queue_notify: Arc::new(Notify::new()),
gc_todo,
metrics,
})
@@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
- let changed = self.store.db().transaction(|mut tx| {
+ let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
+ let new_entry = update_fn(tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, update_fn(&mut tx, None)?),
+ None => (None, None, update_fn(tx, None)?),
};
// Changed can be true in two scenarios
@@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
- .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+ .updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash)))
} else {
@@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
- .transaction(|mut tx| match tx.get(&self.store, k)? {
+ .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
- .transaction(|mut tx| match tx.get(&self.store, k)? {
+ .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
- self.insert_queue_notify.notify_one();
+
+ let notif = self.insert_queue_notify.clone();
+ tx.on_commit(move || notif.notify_one());
Ok(())
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e86d0251..4577f872 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
self.data
.merkle_tree
.db()
- .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
+ .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
- let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;
diff --git a/src/table/queue.rs b/src/table/queue.rs
index 096ac8b4..ffe0a4a7 100644
--- a/src/table/queue.rs
+++ b/src/table/queue.rs
@@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
self.0.insert_many(values).await?;
- self.0.data.insert_queue.db().transaction(|mut tx| {
+ self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() {
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
if &v2 == v {