aboutsummaryrefslogtreecommitdiff
path: root/src/db
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-21 15:32:25 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-21 15:35:31 +0200
commitf97168f80567f43e15cf236092703e6ae5d8dc2e (patch)
treeca9b06fa1d9bf3af9b33c5db0a8c09a53b403fa9 /src/db
parentfd7d8fec59c617b40e480ff855894cf35fdcfb40 (diff)
downloadgarage-f97168f80567f43e15cf236092703e6ae5d8dc2e.tar.gz
garage-f97168f80567f43e15cf236092703e6ae5d8dc2e.zip
garage_db: refactor transactions and add on_commit mechanism
Diffstat (limited to 'src/db')
-rw-r--r--src/db/counted_tree_hack.rs6
-rw-r--r--src/db/lib.rs75
-rw-r--r--src/db/lmdb_adapter.rs10
-rw-r--r--src/db/sled_adapter.rs12
-rw-r--r--src/db/sqlite_adapter.rs10
-rw-r--r--src/db/test.rs8
6 files changed, 63 insertions, 58 deletions
diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs
index bbe943a2..a4ce12e0 100644
--- a/src/db/counted_tree_hack.rs
+++ b/src/db/counted_tree_hack.rs
@@ -85,7 +85,7 @@ impl CountedTree {
let old_some = expected_old.is_some();
let new_some = new.is_some();
- let tx_res = self.0.tree.db().transaction(|mut tx| {
+ let tx_res = self.0.tree.db().transaction(|tx| {
let old_val = tx.get(&self.0.tree, &key)?;
let is_same = match (&old_val, &expected_old) {
(None, None) => true,
@@ -101,9 +101,9 @@ impl CountedTree {
tx.remove(&self.0.tree, &key)?;
}
}
- tx.commit(())
+ Ok(())
} else {
- tx.abort(())
+ Err(TxError::Abort(()))
}
});
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 22bd9364..fe44b01e 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -22,10 +22,15 @@ use std::sync::Arc;
use err_derive::Error;
+pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
+
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
-pub struct Transaction<'a>(&'a mut dyn ITx);
+pub struct Transaction<'a> {
+ tx: &'a mut dyn ITx,
+ on_commit: OnCommit,
+}
#[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize);
@@ -85,7 +90,7 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
let f = TxFn {
function: fun,
@@ -98,14 +103,17 @@ impl Db {
.expect("Transaction did not store result");
match tx_res {
- Ok(()) => {
- assert!(matches!(ret, Ok(_)));
- ret
- }
- Err(TxError::Abort(())) => {
- assert!(matches!(ret, Err(TxError::Abort(_))));
- ret
- }
+ Ok(on_commit) => match ret {
+ Ok(value) => {
+ on_commit.into_iter().for_each(|f| f());
+ Ok(value)
+ }
+ _ => unreachable!(),
+ },
+ Err(TxError::Abort(())) => match ret {
+ Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
+ _ => unreachable!(),
+ },
Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing
// transaction
@@ -139,7 +147,7 @@ impl Db {
let ex_tree = other.open_tree(&name)?;
- let tx_res = self.transaction(|mut tx| {
+ let tx_res = self.transaction(|tx| {
let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?;
@@ -149,7 +157,7 @@ impl Db {
println!("{}: imported {}", name, i);
}
}
- tx.commit(i)
+ Ok(i)
});
let total = match tx_res {
Err(TxError::Db(e)) => return Err(e),
@@ -249,11 +257,11 @@ impl Tree {
impl<'a> Transaction<'a> {
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.get(tree.1, key.as_ref())
+ self.tx.get(tree.1, key.as_ref())
}
#[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
- self.0.len(tree.1)
+ self.tx.len(tree.1)
}
/// Returns the old value if there was one
@@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
key: T,
value: U,
) -> TxOpResult<Option<Value>> {
- self.0.insert(tree.1, key.as_ref(), value.as_ref())
+ self.tx.insert(tree.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.remove(tree.1, key.as_ref())
+ self.tx.remove(tree.1, key.as_ref())
}
#[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter(tree.1)
+ self.tx.iter(tree.1)
}
#[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter_rev(tree.1)
+ self.tx.iter_rev(tree.1)
}
#[inline]
@@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
@@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
- // ----
-
#[inline]
- pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
- Err(TxError::Abort(e))
- }
-
- #[inline]
- pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
- Ok(r)
+ pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
+ self.on_commit.push(Box::new(f));
}
}
@@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
}
pub(crate) trait ITx {
@@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
}
pub(crate) enum TxFnResult {
- Ok,
+ Ok(OnCommit),
Abort,
DbErr,
}
struct TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
function: F,
result: Cell<Option<TxResult<R, E>>>,
@@ -395,12 +396,16 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
- let res = (self.function)(Transaction(tx));
+ let mut tx = Transaction {
+ tx,
+ on_commit: vec![],
+ };
+ let res = (self.function)(&mut tx);
let res2 = match &res {
- Ok(_) => TxFnResult::Ok,
+ Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr,
};
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index ecbc3b81..59fa132d 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -9,8 +9,8 @@ use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use heed;
@@ -186,7 +186,7 @@ impl IDb for LmdbDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let mut tx = LmdbTx {
trees: &trees.0[..],
@@ -199,9 +199,9 @@ impl IDb for LmdbDb {
let res = f.try_on(&mut tx);
match res {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 52393a95..84f2001b 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -10,8 +10,8 @@ use sled::transaction::{
};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use sled;
@@ -166,7 +166,7 @@ impl IDb for SledDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| {
let mut tx = SledTx {
@@ -174,9 +174,9 @@ impl IDb for SledDb {
err: Cell::new(None),
};
match f.try_on(&mut tx) {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
assert!(tx.err.into_inner().is_none());
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
assert!(tx.err.into_inner().is_none());
@@ -189,7 +189,7 @@ impl IDb for SledDb {
}
});
match res {
- Ok(()) => Ok(()),
+ Ok(on_commit) => Ok(on_commit),
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 63b4506e..9f967c66 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{
- Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
- TxValueIter, Value, ValueIter,
+ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
+ TxResult, TxValueIter, Value, ValueIter,
};
pub use rusqlite;
@@ -261,7 +261,7 @@ impl IDb for SqliteDb {
// ----
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
trace!("transaction: lock db");
let mut this = self.0.lock().unwrap();
trace!("transaction: lock acquired");
@@ -277,9 +277,9 @@ impl IDb for SqliteDb {
trees: &this_mut_ref.trees,
};
let res = match f.try_on(&mut tx) {
- TxFnResult::Ok => {
+ TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
- Ok(())
+ Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
diff --git a/src/db/test.rs b/src/db/test.rs
index 40e6c41e..cd99eafa 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -13,26 +13,26 @@ fn test_suite(db: Db) {
assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
- let res = db.transaction::<_, (), _>(|mut tx| {
+ let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
- tx.commit(12)
+ Ok(12)
});
assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
- let res = db.transaction::<(), _, _>(|mut tx| {
+ let res = db.transaction::<(), _, _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
- tx.abort(42)
+ Err(TxError::Abort(42))
});
assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);