diff options
author | Alex <alex@adnab.me> | 2022-06-08 10:01:44 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-06-08 10:01:44 +0200 |
commit | b44d3fc796484a50cd6854f20c9b46e5fddedc9d (patch) | |
tree | 29f6da0e8dc68485edf713aaa7331536f4ff4fde /src/table/merkle.rs | |
parent | 7eed3ceda9cf964e3435f22fc1852e27f4f5a8ae (diff) | |
download | garage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.tar.gz garage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.zip |
Abstract database behind generic interface and implement alternative drivers (#322)
- [x] Design interface
- [x] Implement Sled backend
- [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction)
- [x] Convert Garage code to use generic interface
- [x] Proof-read converted Garage code
- [ ] Test everything well
- [x] Implement sqlite backend
- [x] Implement LMDB backend
- [ ] (Implement Persy backend?)
- [ ] (Implement other backends? (like RocksDB, ...))
- [x] Implement backend choice in config file and garage server module
- [x] Add CLI for converting between DB formats
- Exploit the new interface to put more things in transactions
- [x] `.updated()` trigger on Garage tables
Fix #284
**Bugs**
- [x] When exporting sqlite, trees iterate empty??
- [x] LMDB doesn't work
**Known issues for various back-ends**
- Sled:
- Eats all my RAM and also all my disk space
- `.len()` has to traverse the whole table
- Is actually quite slow on some operations
- And is actually pretty bad code...
- Sqlite:
- Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason.
- (adapter uses a bunch of unsafe code)
- Heed (LMDB):
- Not suited for 32-bit machines as it has to map the whole DB in memory.
- (adpater uses a tiny bit of unsafe code)
**My recommendation:** avoid 32-bit machines and use LMDB as much as possible.
**Converting databases** is actually quite easy. For example from Sled to LMDB:
```bash
cd src/db
cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb
```
Then, just add this to your `config.toml`:
```toml
db_engine = "lmdb"
```
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 101 |
1 files changed, 51 insertions, 50 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..7685b193 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,11 +4,10 @@ use std::time::Duration; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; @@ -90,35 +89,35 @@ where async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; + match self.updater_loop_iter() { + Ok(true) => (), + Ok(false) => { + select! { + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, + Err(e) => { + warn!( + "({}) Error while updating Merkle tree item: {}", + F::TABLE_NAME, + e + ); + tokio::time::sleep(Duration::from_secs(10)).await; } } } } + fn updater_loop_iter(&self) -> Result<bool, Error> { + if let Some((key, valhash)) = self.data.merkle_todo.first()? { + self.update_item(&key, &valhash)?; + Ok(true) + } else { + Ok(false) + } + } + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); @@ -137,13 +136,16 @@ where }; self.data .merkle_tree - .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + .db() + .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); + if remove { + tx.remove(&self.data.merkle_todo, k)?; + } + Ok(remove) + })?; if !deleted { debug!( @@ -157,12 +159,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +205,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +285,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -312,15 +313,15 @@ where // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { let ent = self.data.merkle_tree.get(k.encode())?; - MerkleNode::decode_opt(ent) + MerkleNode::decode_opt(&ent) } - pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + pub fn merkle_tree_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_tree.len()?) } - pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() + pub fn todo_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_todo.len()?) } } @@ -347,7 +348,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), |