aboutsummaryrefslogtreecommitdiff
path: root/src/db
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-06-08 10:01:44 +0200
committerAlex <alex@adnab.me>2022-06-08 10:01:44 +0200
commitb44d3fc796484a50cd6854f20c9b46e5fddedc9d (patch)
tree29f6da0e8dc68485edf713aaa7331536f4ff4fde /src/db
parent7eed3ceda9cf964e3435f22fc1852e27f4f5a8ae (diff)
downloadgarage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.tar.gz
garage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.zip
Abstract database behind generic interface and implement alternative drivers (#322)
- [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/db')
-rw-r--r--src/db/Cargo.toml36
-rw-r--r--src/db/bin/convert.rs76
-rw-r--r--src/db/counted_tree_hack.rs127
-rw-r--r--src/db/lib.rs400
-rw-r--r--src/db/lmdb_adapter.rs329
-rw-r--r--src/db/sled_adapter.rs260
-rw-r--r--src/db/sqlite_adapter.rs500
-rw-r--r--src/db/test.rs106
8 files changed, 1834 insertions, 0 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
new file mode 100644
index 00000000..6d8f64be
--- /dev/null
+++ b/src/db/Cargo.toml
@@ -0,0 +1,36 @@
+[package]
+name = "garage_db"
+version = "0.8.0"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Abstraction over multiple key/value storage engines that supports transactions"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+[[bin]]
+name = "convert"
+path = "bin/convert.rs"
+required-features = ["cli"]
+
+[dependencies]
+err-derive = "0.3"
+hexdump = "0.1"
+log = "0.4"
+
+heed = "0.11"
+rusqlite = { version = "0.27", features = ["bundled"] }
+sled = "0.34"
+
+# cli deps
+clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
+pretty_env_logger = { version = "0.4", optional = true }
+
+[dev-dependencies]
+mktemp = "0.4"
+
+[features]
+cli = ["clap", "pretty_env_logger"]
diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs
new file mode 100644
index 00000000..9e45e61f
--- /dev/null
+++ b/src/db/bin/convert.rs
@@ -0,0 +1,76 @@
+use std::path::PathBuf;
+
+use garage_db::*;
+
+use clap::Parser;
+
+/// K2V command line interface
+#[derive(Parser, Debug)]
+#[clap(author, version, about, long_about = None)]
+struct Args {
+ /// Input DB path
+ #[clap(short = 'i')]
+ input_path: PathBuf,
+ /// Input DB engine
+ #[clap(short = 'a')]
+ input_engine: String,
+
+ /// Output DB path
+ #[clap(short = 'o')]
+ output_path: PathBuf,
+ /// Output DB engine
+ #[clap(short = 'b')]
+ output_engine: String,
+}
+
+fn main() {
+ let args = Args::parse();
+ pretty_env_logger::init();
+
+ match do_conversion(args) {
+ Ok(()) => println!("Success!"),
+ Err(e) => eprintln!("Error: {}", e),
+ }
+}
+
+fn do_conversion(args: Args) -> Result<()> {
+ let input = open_db(args.input_path, args.input_engine)?;
+ let output = open_db(args.output_path, args.output_engine)?;
+ output.import(&input)?;
+ Ok(())
+}
+
+fn open_db(path: PathBuf, engine: String) -> Result<Db> {
+ match engine.as_str() {
+ "sled" => {
+ let db = sled_adapter::sled::Config::default().path(&path).open()?;
+ Ok(sled_adapter::SledDb::init(db))
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
+ Ok(sqlite_adapter::SqliteDb::init(db))
+ }
+ "lmdb" | "heed" => {
+ std::fs::create_dir_all(&path).map_err(|e| {
+ Error(format!("Unable to create LMDB data directory: {}", e).into())
+ })?;
+
+ let map_size = if u32::MAX as usize == usize::MAX {
+ eprintln!(
+ "LMDB is not recommended on 32-bit systems, database size will be limited"
+ );
+ 1usize << 30 // 1GB for 32-bit systems
+ } else {
+ 1usize << 40 // 1TB for 64-bit systems
+ };
+
+ let db = lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&path)
+ .unwrap();
+ Ok(lmdb_adapter::LmdbDb::init(db))
+ }
+ e => Err(Error(format!("Invalid DB engine: {}", e).into())),
+ }
+}
diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs
new file mode 100644
index 00000000..bbe943a2
--- /dev/null
+++ b/src/db/counted_tree_hack.rs
@@ -0,0 +1,127 @@
+//! This hack allows a db tree to keep in RAM a counter of the number of entries
+//! it contains, which is used to call .len() on it. This is usefull only for
+//! the sled backend where .len() otherwise would have to traverse the whole
+//! tree to count items. For sqlite and lmdb, this is mostly useless (but
+//! hopefully not harmfull!). Note that a CountedTree cannot be part of a
+//! transaction.
+
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use crate::{Result, Tree, TxError, Value, ValueIter};
+
+#[derive(Clone)]
+pub struct CountedTree(Arc<CountedTreeInternal>);
+
+struct CountedTreeInternal {
+ tree: Tree,
+ len: AtomicUsize,
+}
+
+impl CountedTree {
+ pub fn new(tree: Tree) -> Result<Self> {
+ let len = tree.len()?;
+ Ok(Self(Arc::new(CountedTreeInternal {
+ tree,
+ len: AtomicUsize::new(len),
+ })))
+ }
+
+ pub fn len(&self) -> usize {
+ self.0.len.load(Ordering::SeqCst)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> {
+ self.0.tree.get(key)
+ }
+
+ pub fn first(&self) -> Result<Option<(Value, Value)>> {
+ self.0.tree.first()
+ }
+
+ pub fn iter(&self) -> Result<ValueIter<'_>> {
+ self.0.tree.iter()
+ }
+
+ // ---- writing functions ----
+
+ pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<Value>>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ let old_val = self.0.tree.insert(key, value)?;
+ if old_val.is_none() {
+ self.0.len.fetch_add(1, Ordering::SeqCst);
+ }
+ Ok(old_val)
+ }
+
+ pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> {
+ let old_val = self.0.tree.remove(key)?;
+ if old_val.is_some() {
+ self.0.len.fetch_sub(1, Ordering::SeqCst);
+ }
+ Ok(old_val)
+ }
+
+ pub fn compare_and_swap<K, OV, NV>(
+ &self,
+ key: K,
+ expected_old: Option<OV>,
+ new: Option<NV>,
+ ) -> Result<bool>
+ where
+ K: AsRef<[u8]>,
+ OV: AsRef<[u8]>,
+ NV: AsRef<[u8]>,
+ {
+ let old_some = expected_old.is_some();
+ let new_some = new.is_some();
+
+ let tx_res = self.0.tree.db().transaction(|mut tx| {
+ let old_val = tx.get(&self.0.tree, &key)?;
+ let is_same = match (&old_val, &expected_old) {
+ (None, None) => true,
+ (Some(x), Some(y)) if x == y.as_ref() => true,
+ _ => false,
+ };
+ if is_same {
+ match &new {
+ Some(v) => {
+ tx.insert(&self.0.tree, &key, v)?;
+ }
+ None => {
+ tx.remove(&self.0.tree, &key)?;
+ }
+ }
+ tx.commit(())
+ } else {
+ tx.abort(())
+ }
+ });
+
+ match tx_res {
+ Ok(()) => {
+ match (old_some, new_some) {
+ (false, true) => {
+ self.0.len.fetch_add(1, Ordering::SeqCst);
+ }
+ (true, false) => {
+ self.0.len.fetch_sub(1, Ordering::SeqCst);
+ }
+ _ => (),
+ }
+ Ok(true)
+ }
+ Err(TxError::Abort(())) => Ok(false),
+ Err(TxError::Db(e)) => Err(e),
+ }
+ }
+}
diff --git a/src/db/lib.rs b/src/db/lib.rs
new file mode 100644
index 00000000..e9d3ea18
--- /dev/null
+++ b/src/db/lib.rs
@@ -0,0 +1,400 @@
+pub mod lmdb_adapter;
+pub mod sled_adapter;
+pub mod sqlite_adapter;
+
+pub mod counted_tree_hack;
+
+#[cfg(test)]
+pub mod test;
+
+use core::ops::{Bound, RangeBounds};
+
+use std::borrow::Cow;
+use std::cell::Cell;
+use std::sync::Arc;
+
+use err_derive::Error;
+
+#[derive(Clone)]
+pub struct Db(pub(crate) Arc<dyn IDb>);
+
+pub struct Transaction<'a>(&'a mut dyn ITx);
+
+#[derive(Clone)]
+pub struct Tree(Arc<dyn IDb>, usize);
+
+pub type Value = Vec<u8>;
+pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>;
+pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>;
+
+// ----
+
+#[derive(Debug, Error)]
+#[error(display = "{}", _0)]
+pub struct Error(pub Cow<'static, str>);
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug, Error)]
+#[error(display = "{}", _0)]
+pub struct TxOpError(pub(crate) Error);
+pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
+
+pub enum TxError<E> {
+ Abort(E),
+ Db(Error),
+}
+pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
+
+impl<E> From<TxOpError> for TxError<E> {
+ fn from(e: TxOpError) -> TxError<E> {
+ TxError::Db(e.0)
+ }
+}
+
+pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
+ match res {
+ Ok(v) => Ok(Ok(v)),
+ Err(TxError::Abort(e)) => Ok(Err(e)),
+ Err(TxError::Db(e)) => Err(TxOpError(e)),
+ }
+}
+
+// ----
+
+impl Db {
+ pub fn engine(&self) -> String {
+ self.0.engine()
+ }
+
+ pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
+ let tree_id = self.0.open_tree(name.as_ref())?;
+ Ok(Tree(self.0.clone(), tree_id))
+ }
+
+ pub fn list_trees(&self) -> Result<Vec<String>> {
+ self.0.list_trees()
+ }
+
+ pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
+ where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+ {
+ let f = TxFn {
+ function: fun,
+ result: Cell::new(None),
+ };
+ let tx_res = self.0.transaction(&f);
+ let ret = f
+ .result
+ .into_inner()
+ .expect("Transaction did not store result");
+
+ match tx_res {
+ Ok(()) => {
+ assert!(matches!(ret, Ok(_)));
+ ret
+ }
+ Err(TxError::Abort(())) => {
+ assert!(matches!(ret, Err(TxError::Abort(_))));
+ ret
+ }
+ Err(TxError::Db(e2)) => match ret {
+ // Ok was stored -> the error occured when finalizing
+ // transaction
+ Ok(_) => Err(TxError::Db(e2)),
+ // An error was already stored: that's the one we want to
+ // return
+ Err(TxError::Db(e)) => Err(TxError::Db(e)),
+ _ => unreachable!(),
+ },
+ }
+ }
+
+ pub fn import(&self, other: &Db) -> Result<()> {
+ let existing_trees = self.list_trees()?;
+ if !existing_trees.is_empty() {
+ return Err(Error(
+ format!(
+ "destination database already contains data: {:?}",
+ existing_trees
+ )
+ .into(),
+ ));
+ }
+
+ let tree_names = other.list_trees()?;
+ for name in tree_names {
+ let tree = self.open_tree(&name)?;
+ if tree.len()? > 0 {
+ return Err(Error(format!("tree {} already contains data", name).into()));
+ }
+
+ let ex_tree = other.open_tree(&name)?;
+
+ let tx_res = self.transaction(|mut tx| {
+ let mut i = 0;
+ for item in ex_tree.iter().map_err(TxError::Abort)? {
+ let (k, v) = item.map_err(TxError::Abort)?;
+ tx.insert(&tree, k, v)?;
+ i += 1;
+ if i % 1000 == 0 {
+ println!("{}: imported {}", name, i);
+ }
+ }
+ tx.commit(i)
+ });
+ let total = match tx_res {
+ Err(TxError::Db(e)) => return Err(e),
+ Err(TxError::Abort(e)) => return Err(e),
+ Ok(x) => x,
+ };
+
+ println!("{}: finished importing, {} items", name, total);
+ }
+ Ok(())
+ }
+}
+
+#[allow(clippy::len_without_is_empty)]
+impl Tree {
+ #[inline]
+ pub fn db(&self) -> Db {
+ Db(self.0.clone())
+ }
+
+ #[inline]
+ pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
+ self.0.get(self.1, key.as_ref())
+ }
+ #[inline]
+ pub fn len(&self) -> Result<usize> {
+ self.0.len(self.1)
+ }
+
+ #[inline]
+ pub fn first(&self) -> Result<Option<(Value, Value)>> {
+ self.iter()?.next().transpose()
+ }
+ #[inline]
+ pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> {
+ self.range((Bound::Excluded(from), Bound::Unbounded))?
+ .next()
+ .transpose()
+ }
+
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
+ &self,
+ key: T,
+ value: U,
+ ) -> Result<Option<Value>> {
+ self.0.insert(self.1, key.as_ref(), value.as_ref())
+ }
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
+ self.0.remove(self.1, key.as_ref())
+ }
+
+ #[inline]
+ pub fn iter(&self) -> Result<ValueIter<'_>> {
+ self.0.iter(self.1)
+ }
+ #[inline]
+ pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
+ self.0.iter_rev(self.1)
+ }
+
+ #[inline]
+ pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range(self.1, get_bound(sb), get_bound(eb))
+ }
+ #[inline]
+ pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
+ }
+}
+
+#[allow(clippy::len_without_is_empty)]
+impl<'a> Transaction<'a> {
+ #[inline]
+ pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
+ self.0.get(tree.1, key.as_ref())
+ }
+ #[inline]
+ pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
+ self.0.len(tree.1)
+ }
+
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
+ &mut self,
+ tree: &Tree,
+ key: T,
+ value: U,
+ ) -> TxOpResult<Option<Value>> {
+ self.0.insert(tree.1, key.as_ref(), value.as_ref())
+ }
+ /// Returns the old value if there was one
+ #[inline]
+ pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
+ self.0.remove(tree.1, key.as_ref())
+ }
+
+ #[inline]
+ pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
+ self.0.iter(tree.1)
+ }
+ #[inline]
+ pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
+ self.0.iter_rev(tree.1)
+ }
+
+ #[inline]
+ pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range(tree.1, get_bound(sb), get_bound(eb))
+ }
+ #[inline]
+ pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
+ where
+ K: AsRef<[u8]>,
+ R: RangeBounds<K>,
+ {
+ let sb = range.start_bound();
+ let eb = range.end_bound();
+ self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
+ }
+
+ // ----
+
+ #[inline]
+ pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
+ Err(TxError::Abort(e))
+ }
+
+ #[inline]
+ pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
+ Ok(r)
+ }
+}
+
+// ---- Internal interfaces
+
+pub(crate) trait IDb: Send + Sync {
+ fn engine(&self) -> String;
+ fn open_tree(&self, name: &str) -> Result<usize>;
+ fn list_trees(&self) -> Result<Vec<String>>;
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
+ fn len(&self, tree: usize) -> Result<usize>;
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>>;
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>>;
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+}
+
+pub(crate) trait ITx {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
+ fn len(&self, tree: usize) -> TxOpResult<usize>;
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>;
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
+
+ fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
+ fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>>;
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>>;
+}
+
+pub(crate) trait ITxFn {
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
+}
+
+pub(crate) enum TxFnResult {
+ Ok,
+ Abort,
+ DbErr,
+}
+
+struct TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+{
+ function: F,
+ result: Cell<Option<TxResult<R, E>>>,
+}
+
+impl<F, R, E> ITxFn for TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E>,
+{
+ fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
+ let res = (self.function)(Transaction(tx));
+ let res2 = match &res {
+ Ok(_) => TxFnResult::Ok,
+ Err(TxError::Abort(_)) => TxFnResult::Abort,
+ Err(TxError::Db(_)) => TxFnResult::DbErr,
+ };
+ self.result.set(Some(res));
+ res2
+ }
+}
+
+// ----
+
+fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
+ match b {
+ Bound::Included(v) => Bound::Included(v.as_ref()),
+ Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
+ Bound::Unbounded => Bound::Unbounded,
+ }
+}
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
new file mode 100644
index 00000000..74622919
--- /dev/null
+++ b/src/db/lmdb_adapter.rs
@@ -0,0 +1,329 @@
+use core::ops::Bound;
+use core::ptr::NonNull;
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::sync::{Arc, RwLock};
+
+use heed::types::ByteSlice;
+use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use heed;
+
+// -- err
+
+impl From<heed::Error> for Error {
+ fn from(e: heed::Error) -> Error {
+ Error(format!("LMDB: {}", e).into())
+ }
+}
+
+impl From<heed::Error> for TxOpError {
+ fn from(e: heed::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct LmdbDb {
+ db: heed::Env,
+ trees: RwLock<(Vec<Database>, HashMap<String, usize>)>,
+}
+
+impl LmdbDb {
+ pub fn init(db: Env) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<Database> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for LmdbDb {
+ fn engine(&self) -> String {
+ "LMDB (using Heed crate)".into()
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.create_database(Some(name))?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let tree0 = match self.db.open_database::<heed::types::Str, ByteSlice>(None)? {
+ Some(x) => x,
+ None => return Ok(vec![]),
+ };
+
+ let mut ret = vec![];
+ let tx = self.db.read_txn()?;
+ for item in tree0.iter(&tx)? {
+ let (tree_name, _) = item?;
+ ret.push(tree_name.to_string());
+ }
+ drop(tx);
+
+ let mut ret2 = vec![];
+ for tree_name in ret {
+ if self
+ .db
+ .open_database::<ByteSlice, ByteSlice>(Some(&tree_name))?
+ .is_some()
+ {
+ ret2.push(tree_name);
+ }
+ }
+
+ Ok(ret2)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+
+ let tx = self.db.read_txn()?;
+ let val = tree.get(&tx, key)?;
+ match val {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.to_vec())),
+ }
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ Ok(tree.len(&tx)?.try_into().unwrap())
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.write_txn()?;
+ let old_val = tree.get(&tx, key)?.map(Vec::from);
+ tree.put(&mut tx, key, value)?;
+ tx.commit()?;
+ Ok(old_val)
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.write_txn()?;
+ let old_val = tree.get(&tx, key)?.map(Vec::from);
+ tree.delete(&mut tx, key)?;
+ tx.commit()?;
+ Ok(old_val)
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?))
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?))
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?))
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let tx = self.db.read_txn()?;
+ TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?))
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let mut tx = LmdbTx {
+ trees: &trees.0[..],
+ tx: self
+ .db
+ .write_txn()
+ .map_err(Error::from)
+ .map_err(TxError::Db)?,
+ };
+
+ let res = f.try_on(&mut tx);
+ match res {
+ TxFnResult::Ok => {
+ tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ }
+ }
+}
+
+// ----
+
+struct LmdbTx<'a> {
+ trees: &'a [Database],
+ tx: RwTxn<'a, 'a>,
+}
+
+impl<'a> LmdbTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&Database> {
+ self.trees.get(i).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+}
+
+impl<'a> ITx for LmdbTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ match tree.get(&self.tx, key)? {
+ Some(v) => Ok(Some(v.to_vec())),
+ None => Ok(None),
+ }
+ }
+ fn len(&self, _tree: usize) -> TxOpResult<usize> {
+ unimplemented!(".len() in transaction not supported with LMDB backend")
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = *self.get_tree(tree)?;
+ let old_val = tree.get(&self.tx, key)?.map(Vec::from);
+ tree.put(&mut self.tx, key, value)?;
+ Ok(old_val)
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = *self.get_tree(tree)?;
+ let old_val = tree.get(&self.tx, key)?.map(Vec::from);
+ tree.delete(&mut self.tx, key)?;
+ Ok(old_val)
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with LMDB backend");
+ }
+}
+
+// ----
+
+type IteratorItem<'a> = heed::Result<(
+ <ByteSlice as BytesDecode<'a>>::DItem,
+ <ByteSlice as BytesDecode<'a>>::DItem,
+)>;
+
+struct TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ tx: RoTxn<'a>,
+ iter: Option<I>,
+}
+
+impl<'a, I> TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ fn make<F>(tx: RoTxn<'a>, iterfun: F) -> Result<ValueIter<'a>>
+ where
+ F: FnOnce(&'a RoTxn<'a>) -> Result<I>,
+ {
+ let mut res = TxAndIterator { tx, iter: None };
+
+ let tx = unsafe { NonNull::from(&res.tx).as_ref() };
+ res.iter = Some(iterfun(tx)?);
+
+ Ok(Box::new(res))
+ }
+}
+
+impl<'a, I> Drop for TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ fn drop(&mut self) {
+ drop(self.iter.take());
+ }
+}
+
+impl<'a, I> Iterator for TxAndIterator<'a, I>
+where
+ I: Iterator<Item = IteratorItem<'a>> + 'a,
+{
+ type Item = Result<(Value, Value)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.iter.as_mut().unwrap().next() {
+ None => None,
+ Some(Err(e)) => Some(Err(e.into())),
+ Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))),
+ }
+ }
+}
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
new file mode 100644
index 00000000..982f8d82
--- /dev/null
+++ b/src/db/sled_adapter.rs
@@ -0,0 +1,260 @@
+use core::ops::Bound;
+
+use std::cell::Cell;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use sled::transaction::{
+ ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
+ UnabortableTransactionError,
+};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use sled;
+
+// -- err
+
+impl From<sled::Error> for Error {
+ fn from(e: sled::Error) -> Error {
+ Error(format!("Sled: {}", e).into())
+ }
+}
+
+impl From<sled::Error> for TxOpError {
+ fn from(e: sled::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct SledDb {
+ db: sled::Db,
+ trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>,
+}
+
+impl SledDb {
+ pub fn init(db: sled::Db) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<sled::Tree> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for SledDb {
+ fn engine(&self) -> String {
+ "Sled".into()
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.open_tree(name)?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let mut trees = vec![];
+ for name in self.db.tree_names() {
+ let name = std::str::from_utf8(&name)
+ .map_err(|e| Error(format!("{}", e).into()))?
+ .to_string();
+ if name != "__sled__default" {
+ trees.push(name);
+ }
+ }
+ Ok(trees)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let val = tree.get(key)?;
+ Ok(val.map(|x| x.to_vec()))
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ Ok(tree.len())
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = tree.insert(key, value)?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = tree.remove(key)?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.iter().map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.iter().rev().map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| {
+ v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into)
+ })))
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map(
+ |v| v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into),
+ )))
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let res = trees.0.transaction(|txtrees| {
+ let mut tx = SledTx {
+ trees: txtrees,
+ err: Cell::new(None),
+ };
+ match f.try_on(&mut tx) {
+ TxFnResult::Ok => {
+ assert!(tx.err.into_inner().is_none());
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ assert!(tx.err.into_inner().is_none());
+ Err(ConflictableTransactionError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ let e = tx.err.into_inner().expect("No DB error");
+ Err(e.into())
+ }
+ }
+ });
+ match res {
+ Ok(()) => Ok(()),
+ Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
+ Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
+ }
+ }
+}
+
+// ----
+
+struct SledTx<'a> {
+ trees: &'a [TransactionalTree],
+ err: Cell<Option<UnabortableTransactionError>>,
+}
+
+impl<'a> SledTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> {
+ self.trees.get(i).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+
+ fn save_error<R>(
+ &self,
+ v: std::result::Result<R, UnabortableTransactionError>,
+ ) -> TxOpResult<R> {
+ match v {
+ Ok(x) => Ok(x),
+ Err(e) => {
+ let txt = format!("{}", e);
+ self.err.set(Some(e));
+ Err(TxOpError(Error(txt.into())))
+ }
+ }
+ }
+}
+
+impl<'a> ITx for SledTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let tmp = self.save_error(tree.get(key))?;
+ Ok(tmp.map(|x| x.to_vec()))
+ }
+ fn len(&self, _tree: usize) -> TxOpResult<usize> {
+ unimplemented!(".len() in transaction not supported with Sled backend")
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.save_error(tree.insert(key, value))?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.save_error(tree.remove(key))?;
+ Ok(old_val.map(|x| x.to_vec()))
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!("Iterators in transactions not supported with Sled backend");
+ }
+}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
new file mode 100644
index 00000000..14bf35ff
--- /dev/null
+++ b/src/db/sqlite_adapter.rs
@@ -0,0 +1,500 @@
+use core::ops::Bound;
+
+use std::borrow::BorrowMut;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::{Arc, Mutex, MutexGuard};
+
+use log::trace;
+
+use rusqlite::{params, Connection, Rows, Statement, Transaction};
+
+use crate::{
+ Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
+ TxValueIter, Value, ValueIter,
+};
+
+pub use rusqlite;
+
+// --- err
+
+impl From<rusqlite::Error> for Error {
+ fn from(e: rusqlite::Error) -> Error {
+ Error(format!("Sqlite: {}", e).into())
+ }
+}
+
+impl From<rusqlite::Error> for TxOpError {
+ fn from(e: rusqlite::Error) -> TxOpError {
+ TxOpError(e.into())
+ }
+}
+
+// -- db
+
+pub struct SqliteDb(Mutex<SqliteDbInner>);
+
+struct SqliteDbInner {
+ db: Connection,
+ trees: Vec<String>,
+}
+
+impl SqliteDb {
+ pub fn init(db: rusqlite::Connection) -> Db {
+ let s = Self(Mutex::new(SqliteDbInner {
+ db,
+ trees: Vec::new(),
+ }));
+ Db(Arc::new(s))
+ }
+}
+
+impl SqliteDbInner {
+ fn get_tree(&self, i: usize) -> Result<&'_ str> {
+ self.trees
+ .get(i)
+ .map(String::as_str)
+ .ok_or_else(|| Error("invalid tree id".into()))
+ }
+
+ fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
+ let mut stmt = self
+ .db
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
+ }
+ }
+}
+
+impl IDb for SqliteDb {
+ fn engine(&self) -> String {
+ format!("sqlite3 v{} (using rusqlite crate)", rusqlite::version())
+ }
+
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let name = format!("tree_{}", name.replace(':', "_COLON_"));
+ let mut this = self.0.lock().unwrap();
+
+ if let Some(i) = this.trees.iter().position(|x| x == &name) {
+ Ok(i)
+ } else {
+ trace!("create table {}", name);
+ this.db.execute(
+ &format!(
+ "CREATE TABLE IF NOT EXISTS {} (
+ k BLOB PRIMARY KEY,
+ v BLOB
+ )",
+ name
+ ),
+ [],
+ )?;
+ trace!("table created: {}, unlocking", name);
+
+ let i = this.trees.len();
+ this.trees.push(name.to_string());
+ Ok(i)
+ }
+ }
+
+ fn list_trees(&self) -> Result<Vec<String>> {
+ let mut trees = vec![];
+
+ trace!("list_trees: lock db");
+ let this = self.0.lock().unwrap();
+ trace!("list_trees: lock acquired");
+
+ let mut stmt = this.db.prepare(
+ "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
+ )?;
+ let mut rows = stmt.query([])?;
+ while let Some(row) = rows.next()? {
+ let name = row.get::<_, String>(0)?;
+ let name = name.replace("_COLON_", ":");
+ let name = name.strip_prefix("tree_").unwrap().to_string();
+ trees.push(name);
+ }
+ Ok(trees)
+ }
+
+ // ----
+
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ trace!("get {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("get {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ this.internal_get(tree, key)
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ trace!("len {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("len {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?),
+ }
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
+ trace!("insert {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("insert {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let old_val = this.internal_get(tree, key)?;
+
+ let sql = match &old_val {
+ Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
+ None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
+ };
+ let n = this.db.execute(&sql, params![key, value])?;
+ assert_eq!(n, 1);
+
+ Ok(old_val)
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
+ trace!("remove {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("remove {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let old_val = this.internal_get(tree, key)?;
+
+ if old_val.is_some() {
+ let n = this
+ .db
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ assert_eq!(n, 1);
+ }
+
+ Ok(old_val)
+ }
+
+ fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
+ trace!("iter {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("iter {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
+ DbValueIterator::make(this, &sql, [])
+ }
+
+ fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
+ trace!("iter_rev {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("iter_rev {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
+ DbValueIterator::make(this, &sql, [])
+ }
+
+ fn range<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ trace!("range {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ }
+ fn range_rev<'r>(
+ &self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'_>> {
+ trace!("range_rev {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range_rev {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ trace!("transaction: lock db");
+ let mut this = self.0.lock().unwrap();
+ trace!("transaction: lock acquired");
+
+ let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+
+ let mut tx = SqliteTx {
+ tx: this_mut_ref
+ .db
+ .transaction()
+ .map_err(Error::from)
+ .map_err(TxError::Db)?,
+ trees: &this_mut_ref.trees,
+ };
+ let res = match f.try_on(&mut tx) {
+ TxFnResult::Ok => {
+ tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ };
+
+ trace!("transaction done");
+ res
+ }
+}
+
+// ----
+
+struct SqliteTx<'a> {
+ tx: Transaction<'a>,
+ trees: &'a [String],
+}
+
+impl<'a> SqliteTx<'a> {
+ fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
+ self.trees.get(i).map(String::as_ref).ok_or_else(|| {
+ TxOpError(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ })
+ }
+
+ fn internal_get(&self, tree: &str, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let mut stmt = self
+ .tx
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
+ }
+ }
+}
+
+impl<'a> ITx for SqliteTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ self.internal_get(tree, key)
+ }
+ fn len(&self, tree: usize) -> TxOpResult<usize> {
+ let tree = self.get_tree(tree)?;
+ let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?),
+ }
+ }
+
+ fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.internal_get(tree, key)?;
+
+ let sql = match &old_val {
+ Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
+ None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
+ };
+ let n = self.tx.execute(&sql, params![key, value])?;
+ assert_eq!(n, 1);
+
+ Ok(old_val)
+ }
+ fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
+ let tree = self.get_tree(tree)?;
+ let old_val = self.internal_get(tree, key)?;
+
+ if old_val.is_some() {
+ let n = self
+ .tx
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ assert_eq!(n, 1);
+ }
+
+ Ok(old_val)
+ }
+
+ fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+ fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> TxOpResult<TxValueIter<'_>> {
+ unimplemented!();
+ }
+}
+
+// ----
+
+struct DbValueIterator<'a> {
+ db: MutexGuard<'a, SqliteDbInner>,
+ stmt: Option<Statement<'a>>,
+ iter: Option<Rows<'a>>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> DbValueIterator<'a> {
+ fn make<P: rusqlite::Params>(
+ db: MutexGuard<'a, SqliteDbInner>,
+ sql: &str,
+ args: P,
+ ) -> Result<ValueIter<'a>> {
+ let res = DbValueIterator {
+ db,
+ stmt: None,
+ iter: None,
+ _pin: PhantomPinned,
+ };
+ let mut boxed = Box::pin(res);
+ trace!("make iterator with sql: {}", sql);
+
+ unsafe {
+ let db = NonNull::from(&boxed.db);
+ let stmt = db.as_ref().db.prepare(sql)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
+
+ let mut stmt = NonNull::from(&boxed.stmt);
+ let iter = stmt.as_mut().as_mut().unwrap().query(args)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
+ }
+
+ Ok(Box::new(DbValueIteratorPin(boxed)))
+ }
+}
+
+impl<'a> Drop for DbValueIterator<'a> {
+ fn drop(&mut self) {
+ trace!("drop iter");
+ drop(self.iter.take());
+ drop(self.stmt.take());
+ }
+}
+
+struct DbValueIteratorPin<'a>(Pin<Box<DbValueIterator<'a>>>);
+
+impl<'a> Iterator for DbValueIteratorPin<'a> {
+ type Item = Result<(Value, Value)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = unsafe {
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
+ Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
+ };
+ let row = match next {
+ Err(e) => return Some(Err(e.into())),
+ Ok(None) => return None,
+ Ok(Some(r)) => r,
+ };
+ let k = match row.get::<_, Vec<u8>>(0) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(x) => x,
+ };
+ let v = match row.get::<_, Vec<u8>>(1) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(y) => y,
+ };
+ Some(Ok((k, v)))
+ }
+}
+
+// ----
+
+fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) {
+ let mut sql = String::new();
+ let mut params: Vec<Vec<u8>> = vec![];
+
+ match low {
+ Bound::Included(b) => {
+ sql.push_str(" WHERE k >= ?1");
+ params.push(b.to_vec());
+ }
+ Bound::Excluded(b) => {
+ sql.push_str(" WHERE k > ?1");
+ params.push(b.to_vec());
+ }
+ Bound::Unbounded => (),
+ };
+
+ match high {
+ Bound::Included(b) => {
+ if !params.is_empty() {
+ sql.push_str(" AND k <= ?2");
+ } else {
+ sql.push_str(" WHERE k <= ?1");
+ }
+ params.push(b.to_vec());
+ }
+ Bound::Excluded(b) => {
+ if !params.is_empty() {
+ sql.push_str(" AND k < ?2");
+ } else {
+ sql.push_str(" WHERE k < ?1");
+ }
+ params.push(b.to_vec());
+ }
+ Bound::Unbounded => (),
+ }
+
+ (sql, params)
+}
diff --git a/src/db/test.rs b/src/db/test.rs
new file mode 100644
index 00000000..cfcee643
--- /dev/null
+++ b/src/db/test.rs
@@ -0,0 +1,106 @@
+use crate::*;
+
+use crate::lmdb_adapter::LmdbDb;
+use crate::sled_adapter::SledDb;
+use crate::sqlite_adapter::SqliteDb;
+
+fn test_suite(db: Db) {
+ let tree = db.open_tree("tree").unwrap();
+
+ let ka: &[u8] = &b"test"[..];
+ let kb: &[u8] = &b"zwello"[..];
+ let kint: &[u8] = &b"tz"[..];
+ let va: &[u8] = &b"plop"[..];
+ let vb: &[u8] = &b"plip"[..];
+ let vc: &[u8] = &b"plup"[..];
+
+ assert!(tree.insert(ka, va).unwrap().is_none());
+ assert_eq!(tree.get(ka).unwrap().unwrap(), va);
+
+ let res = db.transaction::<_, (), _>(|mut tx| {
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
+
+ assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
+
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
+
+ tx.commit(12)
+ });
+ assert!(matches!(res, Ok(12)));
+ assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
+
+ let res = db.transaction::<(), _, _>(|mut tx| {
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
+
+ assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
+
+ assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
+
+ tx.abort(42)
+ });
+ assert!(matches!(res, Err(TxError::Abort(42))));
+ assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
+
+ let mut iter = tree.iter().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ assert!(tree.insert(kb, vc).unwrap().is_none());
+ assert_eq!(tree.get(kb).unwrap().unwrap(), vc);
+
+ let mut iter = tree.iter().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.range(kint..).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.range_rev(..kint).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+
+ let mut iter = tree.iter_rev().unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ drop(iter);
+}
+
+#[test]
+fn test_lmdb_db() {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .open(&path)
+ .unwrap();
+ let db = LmdbDb::init(db);
+ test_suite(db);
+ drop(path);
+}
+
+#[test]
+fn test_sled_db() {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
+ test_suite(db);
+ drop(path);
+}
+
+#[test]
+fn test_sqlite_db() {
+ let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
+ test_suite(db);
+}