diff options
author | Alex <alex@adnab.me> | 2023-09-21 14:03:35 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-09-21 14:03:35 +0000 |
commit | 1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f (patch) | |
tree | 5b5f8d66637c4a10866b00e07a45081c93cf75cf /src/db/lib.rs | |
parent | fd7d8fec59c617b40e480ff855894cf35fdcfb40 (diff) | |
parent | 0635250b2bdcce4156704128de154f9052d34e9e (diff) | |
download | garage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.tar.gz garage-1d986bd889a5f5fe1bdc75e7d4b34acc2cfbe09f.zip |
Merge pull request 'Refactor db transactions and add on_commit for table.queue_insert' (#637) from k2v-indices-lmdb into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/637
Diffstat (limited to 'src/db/lib.rs')
-rw-r--r-- | src/db/lib.rs | 75 |
1 files changed, 40 insertions, 35 deletions
diff --git a/src/db/lib.rs b/src/db/lib.rs index 22bd9364..fe44b01e 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -22,10 +22,15 @@ use std::sync::Arc; use err_derive::Error; +pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>; + #[derive(Clone)] pub struct Db(pub(crate) Arc<dyn IDb>); -pub struct Transaction<'a>(&'a mut dyn ITx); +pub struct Transaction<'a> { + tx: &'a mut dyn ITx, + on_commit: OnCommit, +} #[derive(Clone)] pub struct Tree(Arc<dyn IDb>, usize); @@ -85,7 +90,7 @@ impl Db { pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { let f = TxFn { function: fun, @@ -98,14 +103,17 @@ impl Db { .expect("Transaction did not store result"); match tx_res { - Ok(()) => { - assert!(matches!(ret, Ok(_))); - ret - } - Err(TxError::Abort(())) => { - assert!(matches!(ret, Err(TxError::Abort(_)))); - ret - } + Ok(on_commit) => match ret { + Ok(value) => { + on_commit.into_iter().for_each(|f| f()); + Ok(value) + } + _ => unreachable!(), + }, + Err(TxError::Abort(())) => match ret { + Err(TxError::Abort(e)) => Err(TxError::Abort(e)), + _ => unreachable!(), + }, Err(TxError::Db(e2)) => match ret { // Ok was stored -> the error occured when finalizing // transaction @@ -139,7 +147,7 @@ impl Db { let ex_tree = other.open_tree(&name)?; - let tx_res = self.transaction(|mut tx| { + let tx_res = self.transaction(|tx| { let mut i = 0; for item in ex_tree.iter().map_err(TxError::Abort)? { let (k, v) = item.map_err(TxError::Abort)?; @@ -149,7 +157,7 @@ impl Db { println!("{}: imported {}", name, i); } } - tx.commit(i) + Ok(i) }); let total = match tx_res { Err(TxError::Db(e)) => return Err(e), @@ -249,11 +257,11 @@ impl Tree { impl<'a> Transaction<'a> { #[inline] pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { - self.0.get(tree.1, key.as_ref()) + self.tx.get(tree.1, key.as_ref()) } #[inline] pub fn len(&self, tree: &Tree) -> TxOpResult<usize> { - self.0.len(tree.1) + self.tx.len(tree.1) } /// Returns the old value if there was one @@ -264,21 +272,21 @@ impl<'a> Transaction<'a> { key: T, value: U, ) -> TxOpResult<Option<Value>> { - self.0.insert(tree.1, key.as_ref(), value.as_ref()) + self.tx.insert(tree.1, key.as_ref(), value.as_ref()) } /// Returns the old value if there was one #[inline] pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { - self.0.remove(tree.1, key.as_ref()) + self.tx.remove(tree.1, key.as_ref()) } #[inline] pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { - self.0.iter(tree.1) + self.tx.iter(tree.1) } #[inline] pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { - self.0.iter_rev(tree.1) + self.tx.iter_rev(tree.1) } #[inline] @@ -289,7 +297,7 @@ impl<'a> Transaction<'a> { { let sb = range.start_bound(); let eb = range.end_bound(); - self.0.range(tree.1, get_bound(sb), get_bound(eb)) + self.tx.range(tree.1, get_bound(sb), get_bound(eb)) } #[inline] pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>> @@ -299,19 +307,12 @@ impl<'a> Transaction<'a> { { let sb = range.start_bound(); let eb = range.end_bound(); - self.0.range_rev(tree.1, get_bound(sb), get_bound(eb)) + self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb)) } - // ---- - #[inline] - pub fn abort<R, E>(self, e: E) -> TxResult<R, E> { - Err(TxError::Abort(e)) - } - - #[inline] - pub fn commit<R, E>(self, r: R) -> TxResult<R, E> { - Ok(r) + pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) { + self.on_commit.push(Box::new(f)); } } @@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync { high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>>; - fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; + fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>; } pub(crate) trait ITx { @@ -380,14 +381,14 @@ pub(crate) trait ITxFn { } pub(crate) enum TxFnResult { - Ok, + Ok(OnCommit), Abort, DbErr, } struct TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { function: F, result: Cell<Option<TxResult<R, E>>>, @@ -395,12 +396,16 @@ where impl<F, R, E> ITxFn for TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { - let res = (self.function)(Transaction(tx)); + let mut tx = Transaction { + tx, + on_commit: vec![], + }; + let res = (self.function)(&mut tx); let res2 = match &res { - Ok(_) => TxFnResult::Ok, + Ok(_) => TxFnResult::Ok(tx.on_commit), Err(TxError::Abort(_)) => TxFnResult::Abort, Err(TxError::Db(_)) => TxFnResult::DbErr, }; |