aboutsummaryrefslogtreecommitdiff
path: root/src/db/lib.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-09-21 14:03:35 +0000
committerAlex <alex@adnab.me>2023-09-21 14:03:35 +0000
commit1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f (patch)
tree5b5f8d66637c4a10866b00e07a45081c93cf75cf /src/db/lib.rs
parentfd7d8fec59c617b40e480ff855894cf35fdcfb40 (diff)
parent0635250b2bdcce4156704128de154f9052d34e9e (diff)
downloadgarage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.tar.gz
garage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.zip
Merge pull request 'Refactor db transactions and add on_commit for table.queue_insert' (#637) from k2v-indices-lmdb into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/637
Diffstat (limited to 'src/db/lib.rs')
-rw-r--r--src/db/lib.rs75
1 files changed, 40 insertions, 35 deletions
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 22bd9364..fe44b01e 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -22,10 +22,15 @@ use std::sync::Arc;
use err_derive::Error;
+pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
+
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
-pub struct Transaction<'a>(&'a mut dyn ITx);
+pub struct Transaction<'a> {
+ tx: &'a mut dyn ITx,
+ on_commit: OnCommit,
+}
#[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize);
@@ -85,7 +90,7 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
let f = TxFn {
function: fun,
@@ -98,14 +103,17 @@ impl Db {
.expect("Transaction did not store result");
match tx_res {
- Ok(()) => {
- assert!(matches!(ret, Ok(_)));
- ret
- }
- Err(TxError::Abort(())) => {
- assert!(matches!(ret, Err(TxError::Abort(_))));
- ret
- }
+ Ok(on_commit) => match ret {
+ Ok(value) => {
+ on_commit.into_iter().for_each(|f| f());
+ Ok(value)
+ }
+ _ => unreachable!(),
+ },
+ Err(TxError::Abort(())) => match ret {
+ Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
+ _ => unreachable!(),
+ },
Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing
// transaction
@@ -139,7 +147,7 @@ impl Db {
let ex_tree = other.open_tree(&name)?;
- let tx_res = self.transaction(|mut tx| {
+ let tx_res = self.transaction(|tx| {
let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?;
@@ -149,7 +157,7 @@ impl Db {
println!("{}: imported {}", name, i);
}
}
- tx.commit(i)
+ Ok(i)
});
let total = match tx_res {
Err(TxError::Db(e)) => return Err(e),
@@ -249,11 +257,11 @@ impl Tree {
impl<'a> Transaction<'a> {
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.get(tree.1, key.as_ref())
+ self.tx.get(tree.1, key.as_ref())
}
#[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
- self.0.len(tree.1)
+ self.tx.len(tree.1)
}
/// Returns the old value if there was one
@@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
key: T,
value: U,
) -> TxOpResult<Option<Value>> {
- self.0.insert(tree.1, key.as_ref(), value.as_ref())
+ self.tx.insert(tree.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
- self.0.remove(tree.1, key.as_ref())
+ self.tx.remove(tree.1, key.as_ref())
}
#[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter(tree.1)
+ self.tx.iter(tree.1)
}
#[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
- self.0.iter_rev(tree.1)
+ self.tx.iter_rev(tree.1)
}
#[inline]
@@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
@@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
- self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
+ self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
- // ----
-
#[inline]
- pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
- Err(TxError::Abort(e))
- }
-
- #[inline]
- pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
- Ok(r)
+ pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
+ self.on_commit.push(Box::new(f));
}
}
@@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
}
pub(crate) trait ITx {
@@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
}
pub(crate) enum TxFnResult {
- Ok,
+ Ok(OnCommit),
Abort,
DbErr,
}
struct TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
function: F,
result: Cell<Option<TxResult<R, E>>>,
@@ -395,12 +396,16 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E>
where
- F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
- let res = (self.function)(Transaction(tx));
+ let mut tx = Transaction {
+ tx,
+ on_commit: vec![],
+ };
+ let res = (self.function)(&mut tx);
let res2 = match &res {
- Ok(_) => TxFnResult::Ok,
+ Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr,
};