aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-03 15:31:07 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-03 15:31:07 +0200
commit4e72c713f157ae9d5103a461c4c213b2aa6a84b9 (patch)
tree50fca54ff6540490846d0aa65be5a3c7bfde0e17
parent16e0a655d0d01e3871aee81a0a9660102d6df74e (diff)
downloadgarage-4e72c713f157ae9d5103a461c4c213b2aa6a84b9.tar.gz
garage-4e72c713f157ae9d5103a461c4c213b2aa6a84b9.zip
Start LMDB adapter, with fixed semantics
-rw-r--r--Cargo.lock23
-rw-r--r--src/block/rc.rs6
-rw-r--r--src/db/Cargo.toml3
-rw-r--r--src/db/lib.rs38
-rw-r--r--src/db/lmdb_adapter.rs270
-rw-r--r--src/db/sled_adapter.rs22
-rw-r--r--src/db/sqlite_adapter.rs22
-rw-r--r--src/db/test.rs17
-rw-r--r--src/model/index_counter.rs2
-rw-r--r--src/table/data.rs37
-rw-r--r--src/table/gc.rs14
-rw-r--r--src/table/merkle.rs26
12 files changed, 395 insertions, 85 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a953779b..b051e72e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1017,6 +1017,7 @@ dependencies = [
"clap 3.1.18",
"err-derive 0.3.1",
"hexdump",
+ "lmdb",
"log",
"mktemp",
"rusqlite",
@@ -1823,6 +1824,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
+name = "lmdb"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539"
+dependencies = [
+ "bitflags",
+ "libc",
+ "lmdb-sys",
+]
+
+[[package]]
+name = "lmdb-sys"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9"
+dependencies = [
+ "cc",
+ "libc",
+ "pkg-config",
+]
+
+[[package]]
name = "lock_api"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/src/block/rc.rs b/src/block/rc.rs
index 7d85f67e..e0b952fd 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -20,7 +20,7 @@ impl BlockRc {
/// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero.
pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
- let old_rc = self.rc.db().transaction(|tx| {
+ let old_rc = self.rc.db().transaction(|mut tx| {
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
match old_rc.increment().serialize() {
Some(x) => {
@@ -36,7 +36,7 @@ impl BlockRc {
/// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero.
pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
- let new_rc = self.rc.db().transaction(|tx| {
+ let new_rc = self.rc.db().transaction(|mut tx| {
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
match new_rc.serialize() {
Some(x) => {
@@ -60,7 +60,7 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.db().transaction(|tx| {
+ self.rc.db().transaction(|mut tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 36b96229..b4601ff7 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -21,8 +21,9 @@ err-derive = "0.3"
hexdump = "0.1"
log = "0.4"
-sled = "0.34"
+lmdb = "0.8"
rusqlite = "0.27"
+sled = "0.34"
# cli deps
clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 045c16c5..86042eaf 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -1,3 +1,4 @@
+pub mod lmdb_adapter;
pub mod sled_adapter;
pub mod sqlite_adapter;
@@ -15,8 +16,7 @@ use err_derive::Error;
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
-#[derive(Clone, Copy)]
-pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
+pub struct Transaction<'a>(pub(crate) &'a mut dyn ITx);
#[derive(Clone)]
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
@@ -271,7 +271,7 @@ impl Tree {
#[allow(clippy::len_without_is_empty)]
impl<'a> Transaction<'a> {
- pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> {
+ pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'_>>> {
self.0.get(tree.1, key.as_ref())
}
pub fn len(&self, tree: &Tree) -> Result<usize> {
@@ -279,25 +279,25 @@ impl<'a> Transaction<'a> {
}
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
- &self,
+ &mut self,
tree: &Tree,
key: T,
value: U,
) -> Result<()> {
self.0.insert(tree.1, key.as_ref(), value.as_ref())
}
- pub fn remove<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<bool> {
+ pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<bool> {
self.0.remove(tree.1, key.as_ref())
}
- pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'a>> {
+ pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'_>> {
self.0.iter(tree.1)
}
- pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'a>> {
+ pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'_>> {
self.0.iter_rev(tree.1)
}
- pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'a>>
+ pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
@@ -306,7 +306,7 @@ impl<'a> Transaction<'a> {
let eb = range.end_bound();
self.0.range(tree.1, get_bound(sb), get_bound(eb))
}
- pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'a>>
+ pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
@@ -358,32 +358,32 @@ pub(crate) trait IDb: Send + Sync {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
}
-pub(crate) trait ITx<'a> {
- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
+pub(crate) trait ITx {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>>;
fn len(&self, tree: usize) -> Result<usize>;
- fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
- fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>;
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
+ fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool>;
- fn iter(&self, tree: usize) -> Result<ValueIter<'a>>;
- fn iter_rev(&self, tree: usize) -> Result<ValueIter<'a>>;
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>>;
+ ) -> Result<ValueIter<'_>>;
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>>;
+ ) -> Result<ValueIter<'_>>;
}
pub(crate) trait ITxFn {
- fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult;
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
}
pub(crate) enum TxFnResult {
@@ -404,7 +404,7 @@ impl<F, R, E> ITxFn for TxFn<F, R, E>
where
F: Fn(Transaction<'_>) -> TxResult<R, E>,
{
- fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult {
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
let res = (self.function)(Transaction(tx));
let res2 = match &res {
Ok(_) => TxFnResult::Ok,
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
new file mode 100644
index 00000000..caf21517
--- /dev/null
+++ b/src/db/lmdb_adapter.rs
@@ -0,0 +1,270 @@
+use core::marker::PhantomPinned;
+use core::ops::Bound;
+use core::pin::Pin;
+use core::ptr::NonNull;
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use lmdb::{
+ Database, DatabaseFlags, Environment, RoTransaction, RwTransaction, Transaction, WriteFlags,
+};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, IValue, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
+};
+
+pub use lmdb;
+
+// -- err
+
+impl From<lmdb::Error> for Error {
+ fn from(e: lmdb::Error) -> Error {
+ Error(format!("LMDB: {}", e).into())
+ }
+}
+
+impl<T> From<lmdb::Error> for TxError<T> {
+ fn from(e: lmdb::Error) -> TxError<T> {
+ TxError::Db(e.into())
+ }
+}
+
+// -- db
+
+pub struct LmdbDb {
+ db: lmdb::Environment,
+ trees: RwLock<(Vec<lmdb::Database>, HashMap<String, usize>)>,
+}
+
+impl LmdbDb {
+ pub fn init(db: lmdb::Environment) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<lmdb::Database> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for LmdbDb {
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.create_db(Some(name), DatabaseFlags::empty())?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ unimplemented!()
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
+ let tree = self.get_tree(tree)?;
+
+ let res = TxAndValue {
+ tx: self.db.begin_ro_txn()?,
+ value: NonNull::dangling(),
+ _pin: PhantomPinned,
+ };
+ let mut boxed = Box::pin(res);
+
+ unsafe {
+ let tx = NonNull::from(&boxed.tx);
+ let val = match tx.as_ref().get(tree, &key) {
+ Err(lmdb::Error::NotFound) => return Ok(None),
+ v => v?,
+ };
+
+ let mut_ref: Pin<&mut TxAndValue<'_>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).value = NonNull::from(&val);
+ }
+
+ Ok(Some(Value(Box::new(TxAndValuePin(boxed)))))
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
+ unimplemented!()
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ unimplemented!()
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.begin_rw_txn()?;
+ tx.put(tree, &key, &value, WriteFlags::empty())?;
+ tx.commit()?;
+ Ok(())
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ unimplemented!()
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ unimplemented!()
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ unimplemented!()
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ unimplemented!()
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let mut tx = LmdbTx {
+ trees: &trees.0[..],
+ tx: self.db.begin_rw_txn()?,
+ };
+
+ let res = f.try_on(&mut tx);
+ match res {
+ TxFnResult::Ok => {
+ tx.tx.commit()?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.abort();
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.abort();
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ }
+ }
+}
+
+// ----
+
+struct LmdbTx<'a, 'db> {
+ trees: &'db [Database],
+ tx: RwTransaction<'a>,
+}
+
+impl<'a, 'db> LmdbTx<'a, 'db> {
+ fn get_tree(&self, i: usize) -> Result<&Database> {
+ self.trees.get(i).ok_or_else(|| {
+ Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ )
+ })
+ }
+}
+
+impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
+ let tree = self.get_tree(tree)?;
+ match self.tx.get::<'a, _>(*tree, &key) {
+ Err(lmdb::Error::NotFound) => Ok(None),
+ Err(e) => Err(e.into()),
+ Ok(v) => Ok(Some(Value(Box::new(v)))),
+ }
+ }
+ fn len(&self, _tree: usize) -> Result<usize> {
+ unimplemented!(".len() in transaction not supported with LMDB backend")
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ self.tx.put(*tree, &key, &value, WriteFlags::empty())?;
+ Ok(())
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
+ let tree = self.get_tree(tree)?;
+ match self.tx.del::<'a, _>(*tree, &key, None) {
+ Ok(()) => Ok(true),
+ Err(lmdb::Error::NotFound) => Ok(false),
+ Err(e) => Err(e.into()),
+ }
+ }
+
+ fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+}
+
+// ----
+
+struct TxAndValue<'a> {
+ tx: RoTransaction<'a>,
+ value: NonNull<&'a [u8]>,
+ _pin: PhantomPinned,
+}
+
+struct TxAndValuePin<'a>(Pin<Box<TxAndValue<'a>>>);
+
+impl<'a> IValue<'a> for TxAndValuePin<'a> {
+ fn take_maybe(&mut self) -> Vec<u8> {
+ self.as_ref().to_vec()
+ }
+}
+
+impl<'a> AsRef<[u8]> for TxAndValuePin<'a> {
+ fn as_ref(&self) -> &[u8] {
+ unsafe { self.0.value.as_ref() }
+ }
+}
+
+impl<'a> std::borrow::Borrow<[u8]> for TxAndValuePin<'a> {
+ fn borrow(&self) -> &[u8] {
+ self.as_ref()
+ }
+}
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 3388b0ca..2953785e 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -19,7 +19,7 @@ pub use sled;
impl From<sled::Error> for Error {
fn from(e: sled::Error) -> Error {
- Error(format!("{}", e).into())
+ Error(format!("Sled: {}", e).into())
}
}
@@ -162,11 +162,11 @@ impl IDb for SledDb {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| {
- let tx = SledTx {
+ let mut tx = SledTx {
trees: txtrees,
err: Cell::new(None),
};
- match f.try_on(&tx) {
+ match f.try_on(&mut tx) {
TxFnResult::Ok => {
assert!(tx.err.into_inner().is_none());
Ok(())
@@ -217,8 +217,8 @@ impl<'a> SledTx<'a> {
}
}
-impl<'a> ITx<'a> for SledTx<'a> {
- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+impl<'a> ITx for SledTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
let tmp = self.save_error(tree.get(key))?;
Ok(tmp.map(From::from))
@@ -227,20 +227,20 @@ impl<'a> ITx<'a> for SledTx<'a> {
unimplemented!(".len() in transaction not supported with Sled backend")
}
- fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
self.save_error(tree.insert(key, value))?;
Ok(())
}
- fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
+ fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
Ok(self.save_error(tree.remove(key))?.is_some())
}
- fn iter(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend");
}
- fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend");
}
@@ -249,7 +249,7 @@ impl<'a> ITx<'a> for SledTx<'a> {
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>> {
+ ) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend");
}
fn range_rev<'r>(
@@ -257,7 +257,7 @@ impl<'a> ITx<'a> for SledTx<'a> {
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>> {
+ ) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend");
}
}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 4f79b34b..9f2bf919 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -17,7 +17,7 @@ pub use rusqlite;
impl From<rusqlite::Error> for Error {
fn from(e: rusqlite::Error) -> Error {
- Error(format!("{}", e).into())
+ Error(format!("Sqlite: {}", e).into())
}
}
@@ -235,11 +235,11 @@ impl IDb for SqliteDb {
let mut db = self.db.lock().unwrap();
trace!("transaction: lock acquired");
- let tx = SqliteTx {
+ let mut tx = SqliteTx {
tx: db.transaction()?,
trees: trees.as_ref(),
};
- let res = match f.try_on(&tx) {
+ let res = match f.try_on(&mut tx) {
TxFnResult::Ok => {
tx.tx.commit()?;
Ok(())
@@ -278,8 +278,8 @@ impl<'a> SqliteTx<'a> {
}
}
-impl<'a> ITx<'a> for SqliteTx<'a> {
- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+impl<'a> ITx for SqliteTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
let mut stmt = self
.tx
@@ -300,7 +300,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
}
}
- fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
self.tx.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
@@ -308,7 +308,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
)?;
Ok(())
}
- fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
+ fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
let res = self
.tx
@@ -316,10 +316,10 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
Ok(res > 0)
}
- fn iter(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!();
}
- fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!();
}
@@ -328,7 +328,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>> {
+ ) -> Result<ValueIter<'_>> {
unimplemented!();
}
fn range_rev<'r>(
@@ -336,7 +336,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
- ) -> Result<ValueIter<'a>> {
+ ) -> Result<ValueIter<'_>> {
unimplemented!();
}
}
diff --git a/src/db/test.rs b/src/db/test.rs
index 75200cf1..e5b83ab5 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -1,5 +1,6 @@
use crate::*;
+use crate::lmdb_adapter::LmdbDb;
use crate::sled_adapter::SledDb;
use crate::sqlite_adapter::SqliteDb;
@@ -16,7 +17,7 @@ fn test_suite(db: Db) {
tree.insert(ka, va).unwrap();
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
- let res = db.transaction::<_, (), _>(|tx| {
+ let res = db.transaction::<_, (), _>(|mut tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
tx.insert(&tree, ka, vb).unwrap();
@@ -28,7 +29,7 @@ fn test_suite(db: Db) {
assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
- let res = db.transaction::<(), _, _>(|tx| {
+ let res = db.transaction::<(), _, _>(|mut tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
tx.insert(&tree, ka, vc).unwrap();
@@ -79,6 +80,18 @@ fn test_suite(db: Db) {
}
#[test]
+fn test_lmdb_db() {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = lmdb::Environment::new()
+ .set_max_dbs(100)
+ .open(&path)
+ .unwrap();
+ let db = LmdbDb::init(db);
+ test_suite(db);
+ drop(path);
+}
+
+#[test]
fn test_sled_db() {
let path = mktemp::Temp::new_dir().unwrap();
let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 9e343e5f..d8c1229a 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -179,7 +179,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk);
- let new_entry = self.local_counter.db().transaction(|tx| {
+ let new_entry = self.local_counter.db().transaction(|mut tx| {
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => rmp_serde::decode::from_slice::<LocalCounterEntry>(&old_bytes)
.map_err(Error::RmpDecode)
diff --git a/src/table/data.rs b/src/table/data.rs
index 17402bb6..cca96f68 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -182,7 +182,7 @@ where
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
- let changed = self.store.db().transaction(|tx| {
+ let changed = self.store.db().transaction(|mut tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
@@ -203,6 +203,7 @@ where
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ drop(old_bytes);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
@@ -241,15 +242,16 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.db().transaction(|tx| {
- if let Some(cur_v) = tx.get(&self.store, k)? {
- if cur_v == v {
- tx.remove(&self.store, k)?;
- tx.insert(&self.merkle_todo, k, vec![])?;
- return Ok(true);
- }
+ let removed = self.store.db().transaction(|mut tx| {
+ let remove = match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => true,
+ _ => false,
+ };
+ if remove {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
}
- Ok(false)
+ Ok(remove)
})?;
if removed {
@@ -267,15 +269,16 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = self.store.db().transaction(|tx| {
- if let Some(cur_v) = tx.get(&self.store, k)? {
- if blake2sum(&cur_v[..]) == vhash {
- tx.remove(&self.store, k)?;
- tx.insert(&self.merkle_todo, k, vec![])?;
- return Ok(Some(cur_v.into_vec()));
- }
+ let removed = self.store.db().transaction(|mut tx| {
+ let remove_v = match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v.into_vec()),
+ _ => None,
+ };
+ if remove_v.is_some() {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
}
- Ok(None)
+ Ok(remove_v)
})?;
if let Some(old_v) = removed {
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 260fecfa..e2611389 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -376,13 +376,13 @@ impl GcTodoEntry {
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
let key = self.todo_table_key();
- gc_todo_tree.db().transaction(|tx| {
- let old_val = tx.get(gc_todo_tree, &key)?;
- match old_val {
- Some(ov) if ov == self.value_hash.as_slice() => {
- tx.remove(gc_todo_tree, &key)?;
- }
- _ => (),
+ gc_todo_tree.db().transaction(|mut tx| {
+ let remove = match tx.get(gc_todo_tree, &key)? {
+ Some(ov) if ov == self.value_hash.as_slice() => true,
+ _ => false,
+ };
+ if remove {
+ tx.remove(gc_todo_tree, &key)?;
}
tx.commit(())
})?;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 8c574d09..92e1445b 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -137,17 +137,17 @@ where
self.data
.merkle_tree
.db()
- .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
-
- let deleted = self.data.merkle_todo.db().transaction(|tx| {
- let old_val = tx.get(&self.data.merkle_todo, k)?;
- match old_val {
- Some(ov) if ov == vhash_by => {
- tx.remove(&self.data.merkle_todo, k)?;
- tx.commit(true)
- }
- _ => tx.commit(false),
+ .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
+
+ let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let remove = match tx.get(&self.data.merkle_todo, k)? {
+ Some(ov) if ov == vhash_by => true,
+ _ => false,
+ };
+ if remove {
+ tx.remove(&self.data.merkle_todo, k)?;
}
+ Ok(remove)
})?;
if !deleted {
@@ -162,7 +162,7 @@ where
fn update_item_rec(
&self,
- tx: db::Transaction<'_>,
+ tx: &mut db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
@@ -288,7 +288,7 @@ where
fn read_node_txn(
&self,
- tx: db::Transaction<'_>,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
) -> db::TxResult<MerkleNode, Error> {
let ent = tx.get(&self.data.merkle_tree, k.encode())?;
@@ -297,7 +297,7 @@ where
fn put_node_txn(
&self,
- tx: db::Transaction<'_>,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
) -> db::TxResult<Hash, Error> {