diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/block/rc.rs | 4 | ||||
-rw-r--r-- | src/db/counted_tree_hack.rs | 6 | ||||
-rw-r--r-- | src/db/lib.rs | 75 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 10 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 12 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 10 | ||||
-rw-r--r-- | src/db/test.rs | 8 | ||||
-rw-r--r-- | src/garage/tests/common/garage.rs | 2 | ||||
-rw-r--r-- | src/garage/tests/k2v/item.rs | 9 | ||||
-rw-r--r-- | src/model/index_counter.rs | 4 | ||||
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 8 | ||||
-rw-r--r-- | src/table/data.rs | 24 | ||||
-rw-r--r-- | src/table/merkle.rs | 4 | ||||
-rw-r--r-- | src/table/queue.rs | 2 |
14 files changed, 91 insertions, 87 deletions
diff --git a/src/block/rc.rs b/src/block/rc.rs index 94cb5eea..b6afb277 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -56,7 +56,7 @@ impl BlockRc { /// deletion time has passed pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); - self.rc.db().transaction(|mut tx| { + self.rc.db().transaction(|tx| { let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); match rcval { RcEntry::Deletable { at_time } if now > at_time => { @@ -64,7 +64,7 @@ impl BlockRc { } _ => (), }; - tx.commit(()) + Ok(()) })?; Ok(()) } diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs index bbe943a2..a4ce12e0 100644 --- a/src/db/counted_tree_hack.rs +++ b/src/db/counted_tree_hack.rs @@ -85,7 +85,7 @@ impl CountedTree { let old_some = expected_old.is_some(); let new_some = new.is_some(); - let tx_res = self.0.tree.db().transaction(|mut tx| { + let tx_res = self.0.tree.db().transaction(|tx| { let old_val = tx.get(&self.0.tree, &key)?; let is_same = match (&old_val, &expected_old) { (None, None) => true, @@ -101,9 +101,9 @@ impl CountedTree { tx.remove(&self.0.tree, &key)?; } } - tx.commit(()) + Ok(()) } else { - tx.abort(()) + Err(TxError::Abort(())) } }); diff --git a/src/db/lib.rs b/src/db/lib.rs index 22bd9364..fe44b01e 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -22,10 +22,15 @@ use std::sync::Arc; use err_derive::Error; +pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>; + #[derive(Clone)] pub struct Db(pub(crate) Arc<dyn IDb>); -pub struct Transaction<'a>(&'a mut dyn ITx); +pub struct Transaction<'a> { + tx: &'a mut dyn ITx, + on_commit: OnCommit, +} #[derive(Clone)] pub struct Tree(Arc<dyn IDb>, usize); @@ -85,7 +90,7 @@ impl Db { pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { let f = TxFn { function: fun, @@ -98,14 +103,17 @@ impl Db { .expect("Transaction did not store result"); match tx_res { - Ok(()) => { - assert!(matches!(ret, Ok(_))); - ret - } - Err(TxError::Abort(())) => { - assert!(matches!(ret, Err(TxError::Abort(_)))); - ret - } + Ok(on_commit) => match ret { + Ok(value) => { + on_commit.into_iter().for_each(|f| f()); + Ok(value) + } + _ => unreachable!(), + }, + Err(TxError::Abort(())) => match ret { + Err(TxError::Abort(e)) => Err(TxError::Abort(e)), + _ => unreachable!(), + }, Err(TxError::Db(e2)) => match ret { // Ok was stored -> the error occured when finalizing // transaction @@ -139,7 +147,7 @@ impl Db { let ex_tree = other.open_tree(&name)?; - let tx_res = self.transaction(|mut tx| { + let tx_res = self.transaction(|tx| { let mut i = 0; for item in ex_tree.iter().map_err(TxError::Abort)? { let (k, v) = item.map_err(TxError::Abort)?; @@ -149,7 +157,7 @@ impl Db { println!("{}: imported {}", name, i); } } - tx.commit(i) + Ok(i) }); let total = match tx_res { Err(TxError::Db(e)) => return Err(e), @@ -249,11 +257,11 @@ impl Tree { impl<'a> Transaction<'a> { #[inline] pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { - self.0.get(tree.1, key.as_ref()) + self.tx.get(tree.1, key.as_ref()) } #[inline] pub fn len(&self, tree: &Tree) -> TxOpResult<usize> { - self.0.len(tree.1) + self.tx.len(tree.1) } /// Returns the old value if there was one @@ -264,21 +272,21 @@ impl<'a> Transaction<'a> { key: T, value: U, ) -> TxOpResult<Option<Value>> { - self.0.insert(tree.1, key.as_ref(), value.as_ref()) + self.tx.insert(tree.1, key.as_ref(), value.as_ref()) } /// Returns the old value if there was one #[inline] pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { - self.0.remove(tree.1, key.as_ref()) + self.tx.remove(tree.1, key.as_ref()) } #[inline] pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { - self.0.iter(tree.1) + self.tx.iter(tree.1) } #[inline] pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { - self.0.iter_rev(tree.1) + self.tx.iter_rev(tree.1) } #[inline] @@ -289,7 +297,7 @@ impl<'a> Transaction<'a> { { let sb = range.start_bound(); let eb = range.end_bound(); - self.0.range(tree.1, get_bound(sb), get_bound(eb)) + self.tx.range(tree.1, get_bound(sb), get_bound(eb)) } #[inline] pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>> @@ -299,19 +307,12 @@ impl<'a> Transaction<'a> { { let sb = range.start_bound(); let eb = range.end_bound(); - self.0.range_rev(tree.1, get_bound(sb), get_bound(eb)) + self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb)) } - // ---- - #[inline] - pub fn abort<R, E>(self, e: E) -> TxResult<R, E> { - Err(TxError::Abort(e)) - } - - #[inline] - pub fn commit<R, E>(self, r: R) -> TxResult<R, E> { - Ok(r) + pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) { + self.on_commit.push(Box::new(f)); } } @@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync { high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>>; - fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; + fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>; } pub(crate) trait ITx { @@ -380,14 +381,14 @@ pub(crate) trait ITxFn { } pub(crate) enum TxFnResult { - Ok, + Ok(OnCommit), Abort, DbErr, } struct TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { function: F, result: Cell<Option<TxResult<R, E>>>, @@ -395,12 +396,16 @@ where impl<F, R, E> ITxFn for TxFn<F, R, E> where - F: Fn(Transaction<'_>) -> TxResult<R, E>, + F: Fn(&mut Transaction<'_>) -> TxResult<R, E>, { fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { - let res = (self.function)(Transaction(tx)); + let mut tx = Transaction { + tx, + on_commit: vec![], + }; + let res = (self.function)(&mut tx); let res2 = match &res { - Ok(_) => TxFnResult::Ok, + Ok(_) => TxFnResult::Ok(tx.on_commit), Err(TxError::Abort(_)) => TxFnResult::Abort, Err(TxError::Db(_)) => TxFnResult::DbErr, }; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index ecbc3b81..59fa132d 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -9,8 +9,8 @@ use heed::types::ByteSlice; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; use crate::{ - Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, - TxValueIter, Value, ValueIter, + Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, + TxResult, TxValueIter, Value, ValueIter, }; pub use heed; @@ -186,7 +186,7 @@ impl IDb for LmdbDb { // ---- - fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { let trees = self.trees.read().unwrap(); let mut tx = LmdbTx { trees: &trees.0[..], @@ -199,9 +199,9 @@ impl IDb for LmdbDb { let res = f.try_on(&mut tx); match res { - TxFnResult::Ok => { + TxFnResult::Ok(on_commit) => { tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; - Ok(()) + Ok(on_commit) } TxFnResult::Abort => { tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 52393a95..84f2001b 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -10,8 +10,8 @@ use sled::transaction::{ }; use crate::{ - Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, - TxValueIter, Value, ValueIter, + Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, + TxResult, TxValueIter, Value, ValueIter, }; pub use sled; @@ -166,7 +166,7 @@ impl IDb for SledDb { // ---- - fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { let trees = self.trees.read().unwrap(); let res = trees.0.transaction(|txtrees| { let mut tx = SledTx { @@ -174,9 +174,9 @@ impl IDb for SledDb { err: Cell::new(None), }; match f.try_on(&mut tx) { - TxFnResult::Ok => { + TxFnResult::Ok(on_commit) => { assert!(tx.err.into_inner().is_none()); - Ok(()) + Ok(on_commit) } TxFnResult::Abort => { assert!(tx.err.into_inner().is_none()); @@ -189,7 +189,7 @@ impl IDb for SledDb { } }); match res { - Ok(()) => Ok(()), + Ok(on_commit) => Ok(on_commit), Err(TransactionError::Abort(())) => Err(TxError::Abort(())), Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 63b4506e..9f967c66 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard}; use rusqlite::{params, Connection, Rows, Statement, Transaction}; use crate::{ - Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, - TxValueIter, Value, ValueIter, + Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, + TxResult, TxValueIter, Value, ValueIter, }; pub use rusqlite; @@ -261,7 +261,7 @@ impl IDb for SqliteDb { // ---- - fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { trace!("transaction: lock db"); let mut this = self.0.lock().unwrap(); trace!("transaction: lock acquired"); @@ -277,9 +277,9 @@ impl IDb for SqliteDb { trees: &this_mut_ref.trees, }; let res = match f.try_on(&mut tx) { - TxFnResult::Ok => { + TxFnResult::Ok(on_commit) => { tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; - Ok(()) + Ok(on_commit) } TxFnResult::Abort => { tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; diff --git a/src/db/test.rs b/src/db/test.rs index 40e6c41e..cd99eafa 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -13,26 +13,26 @@ fn test_suite(db: Db) { assert!(tree.insert(ka, va).unwrap().is_none()); assert_eq!(tree.get(ka).unwrap().unwrap(), va); - let res = db.transaction::<_, (), _>(|mut tx| { + let res = db.transaction::<_, (), _>(|tx| { assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); - tx.commit(12) + Ok(12) }); assert!(matches!(res, Ok(12))); assert_eq!(tree.get(ka).unwrap().unwrap(), vb); - let res = db.transaction::<(), _, _>(|mut tx| { + let res = db.transaction::<(), _, _>(|tx| { assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc); - tx.abort(42) + Err(TxError::Abort(42)) }); assert!(matches!(res, Err(TxError::Abort(42)))); assert_eq!(tree.get(ka).unwrap().unwrap(), vb); diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index cb1b7ea5..d1f0867a 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -52,7 +52,7 @@ impl Instance { r#" metadata_dir = "{path}/meta" data_dir = "{path}/data" -db_engine = "sled" +db_engine = "lmdb" replication_mode = "1" diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 25d9cce4..20add889 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -44,6 +44,7 @@ async fn test_items_and_indices() { let content = format!("{}: hello world", sk).into_bytes(); let content2 = format!("{}: hello universe", sk).into_bytes(); let content3 = format!("{}: concurrent value", sk).into_bytes(); + eprintln!("test iteration {}: {}", i, sk); // Put initially, no causality token let res = ctx @@ -89,7 +90,7 @@ async fn test_items_and_indices() { assert_eq!(res_body, content); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -158,7 +159,7 @@ async fn test_items_and_indices() { assert_eq!(res_body, content2); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -230,7 +231,7 @@ async fn test_items_and_indices() { ); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request @@ -299,7 +300,7 @@ async fn test_items_and_indices() { assert_eq!(res.status(), StatusCode::NO_CONTENT); // ReadIndex -- now there should be some stuff - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = ctx .k2v .request diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 35d6596d..a46c165f 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> { let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() - .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; + .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?; next_start = Some(local_counter_k); } @@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> { let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() - .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; + .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?; next_start = Some(counted_entry_k); } diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 42e661eb..50d4283f 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -330,9 +330,7 @@ async fn process_object( "Lifecycle: expiring 1 object in bucket {:?}", object.bucket_id ); - db.transaction(|mut tx| { - garage.object_table.queue_insert(&mut tx, &deleted_object) - })?; + db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?; *objects_expired += 1; } } @@ -365,9 +363,7 @@ async fn process_object( ); let aborted_object = Object::new(object.bucket_id, object.key.clone(), aborted_versions); - db.transaction(|mut tx| { - garage.object_table.queue_insert(&mut tx, &aborted_object) - })?; + db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?; *mpu_aborted += n_aborted; } } diff --git a/src/table/data.rs b/src/table/data.rs index 26101da4..bbfdf58b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_todo_notify: Notify, pub(crate) insert_queue: db::Tree, - pub(crate) insert_queue_notify: Notify, + pub(crate) insert_queue_notify: Arc<Notify>, pub(crate) gc_todo: CountedTree, @@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { merkle_todo, merkle_todo_notify: Notify::new(), insert_queue, - insert_queue_notify: Notify::new(), + insert_queue_notify: Arc::new(Notify::new()), gc_todo, metrics, }) @@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { ) -> Result<Option<F::E>, Error> { let tree_key = self.tree_key(partition_key, sort_key); - let changed = self.store.db().transaction(|mut tx| { + let changed = self.store.db().transaction(|tx| { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; + let new_entry = update_fn(tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update_fn(&mut tx, None)?), + None => (None, None, update_fn(tx, None)?), }; // Changed can be true in two scenarios @@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { tx.insert(&self.store, &tree_key, new_bytes)?; self.instance - .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + .updated(tx, old_entry.as_ref(), Some(&new_entry))?; Ok(Some((new_entry, new_bytes_hash))) } else { @@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let removed = self .store .db() - .transaction(|mut tx| match tx.get(&self.store, k)? { + .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(&mut tx, Some(&old_entry), None)?; + self.instance.updated(tx, Some(&old_entry), None)?; Ok(true) } _ => Ok(false), @@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let removed = self .store .db() - .transaction(|mut tx| match tx.get(&self.store, k)? { + .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(&mut tx, Some(&old_entry), None)?; + self.instance.updated(tx, Some(&old_entry), None)?; Ok(true) } _ => Ok(false), @@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; - self.insert_queue_notify.notify_one(); + + let notif = self.insert_queue_notify.clone(); + tx.on_commit(move || notif.notify_one()); Ok(()) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e86d0251..4577f872 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { self.data .merkle_tree .db() - .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; + .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let deleted = self.data.merkle_todo.db().transaction(|tx| { let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); if remove { tx.remove(&self.data.merkle_todo, k)?; diff --git a/src/table/queue.rs b/src/table/queue.rs index 096ac8b4..ffe0a4a7 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { self.0.insert_many(values).await?; - self.0.data.insert_queue.db().transaction(|mut tx| { + self.0.data.insert_queue.db().transaction(|tx| { for (k, v) in kv_pairs.iter() { if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { if &v2 == v { |