From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/db/Cargo.toml | 36 ++++ src/db/bin/convert.rs | 76 +++++++ src/db/counted_tree_hack.rs | 127 +++++++++++ src/db/lib.rs | 400 +++++++++++++++++++++++++++++++++++ src/db/lmdb_adapter.rs | 329 +++++++++++++++++++++++++++++ src/db/sled_adapter.rs | 260 +++++++++++++++++++++++ src/db/sqlite_adapter.rs | 500 ++++++++++++++++++++++++++++++++++++++++++++ src/db/test.rs | 106 ++++++++++ 8 files changed, 1834 insertions(+) create mode 100644 src/db/Cargo.toml create mode 100644 src/db/bin/convert.rs create mode 100644 src/db/counted_tree_hack.rs create mode 100644 src/db/lib.rs create mode 100644 src/db/lmdb_adapter.rs create mode 100644 src/db/sled_adapter.rs create mode 100644 src/db/sqlite_adapter.rs create mode 100644 src/db/test.rs (limited to 'src/db') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml new file mode 100644 index 00000000..6d8f64be --- /dev/null +++ b/src/db/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "garage_db" +version = "0.8.0" +authors = ["Alex Auvolat "] +edition = "2018" +license = "AGPL-3.0" +description = "Abstraction over multiple key/value storage engines that supports transactions" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +[[bin]] +name = "convert" +path = "bin/convert.rs" +required-features = ["cli"] + +[dependencies] +err-derive = "0.3" +hexdump = "0.1" +log = "0.4" + +heed = "0.11" +rusqlite = { version = "0.27", features = ["bundled"] } +sled = "0.34" + +# cli deps +clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } +pretty_env_logger = { version = "0.4", optional = true } + +[dev-dependencies] +mktemp = "0.4" + +[features] +cli = ["clap", "pretty_env_logger"] diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs new file mode 100644 index 00000000..9e45e61f --- /dev/null +++ b/src/db/bin/convert.rs @@ -0,0 +1,76 @@ +use std::path::PathBuf; + +use garage_db::*; + +use clap::Parser; + +/// K2V command line interface +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + /// Input DB path + #[clap(short = 'i')] + input_path: PathBuf, + /// Input DB engine + #[clap(short = 'a')] + input_engine: String, + + /// Output DB path + #[clap(short = 'o')] + output_path: PathBuf, + /// Output DB engine + #[clap(short = 'b')] + output_engine: String, +} + +fn main() { + let args = Args::parse(); + pretty_env_logger::init(); + + match do_conversion(args) { + Ok(()) => println!("Success!"), + Err(e) => eprintln!("Error: {}", e), + } +} + +fn do_conversion(args: Args) -> Result<()> { + let input = open_db(args.input_path, args.input_engine)?; + let output = open_db(args.output_path, args.output_engine)?; + output.import(&input)?; + Ok(()) +} + +fn open_db(path: PathBuf, engine: String) -> Result { + match engine.as_str() { + "sled" => { + let db = sled_adapter::sled::Config::default().path(&path).open()?; + Ok(sled_adapter::SledDb::init(db)) + } + "sqlite" | "sqlite3" | "rusqlite" => { + let db = sqlite_adapter::rusqlite::Connection::open(&path)?; + Ok(sqlite_adapter::SqliteDb::init(db)) + } + "lmdb" | "heed" => { + std::fs::create_dir_all(&path).map_err(|e| { + Error(format!("Unable to create LMDB data directory: {}", e).into()) + })?; + + let map_size = if u32::MAX as usize == usize::MAX { + eprintln!( + "LMDB is not recommended on 32-bit systems, database size will be limited" + ); + 1usize << 30 // 1GB for 32-bit systems + } else { + 1usize << 40 // 1TB for 64-bit systems + }; + + let db = lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&path) + .unwrap(); + Ok(lmdb_adapter::LmdbDb::init(db)) + } + e => Err(Error(format!("Invalid DB engine: {}", e).into())), + } +} diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs new file mode 100644 index 00000000..bbe943a2 --- /dev/null +++ b/src/db/counted_tree_hack.rs @@ -0,0 +1,127 @@ +//! This hack allows a db tree to keep in RAM a counter of the number of entries +//! it contains, which is used to call .len() on it. This is usefull only for +//! the sled backend where .len() otherwise would have to traverse the whole +//! tree to count items. For sqlite and lmdb, this is mostly useless (but +//! hopefully not harmfull!). Note that a CountedTree cannot be part of a +//! transaction. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crate::{Result, Tree, TxError, Value, ValueIter}; + +#[derive(Clone)] +pub struct CountedTree(Arc); + +struct CountedTreeInternal { + tree: Tree, + len: AtomicUsize, +} + +impl CountedTree { + pub fn new(tree: Tree) -> Result { + let len = tree.len()?; + Ok(Self(Arc::new(CountedTreeInternal { + tree, + len: AtomicUsize::new(len), + }))) + } + + pub fn len(&self) -> usize { + self.0.len.load(Ordering::SeqCst) + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn get>(&self, key: K) -> Result> { + self.0.tree.get(key) + } + + pub fn first(&self) -> Result> { + self.0.tree.first() + } + + pub fn iter(&self) -> Result> { + self.0.tree.iter() + } + + // ---- writing functions ---- + + pub fn insert(&self, key: K, value: V) -> Result> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let old_val = self.0.tree.insert(key, value)?; + if old_val.is_none() { + self.0.len.fetch_add(1, Ordering::SeqCst); + } + Ok(old_val) + } + + pub fn remove>(&self, key: K) -> Result> { + let old_val = self.0.tree.remove(key)?; + if old_val.is_some() { + self.0.len.fetch_sub(1, Ordering::SeqCst); + } + Ok(old_val) + } + + pub fn compare_and_swap( + &self, + key: K, + expected_old: Option, + new: Option, + ) -> Result + where + K: AsRef<[u8]>, + OV: AsRef<[u8]>, + NV: AsRef<[u8]>, + { + let old_some = expected_old.is_some(); + let new_some = new.is_some(); + + let tx_res = self.0.tree.db().transaction(|mut tx| { + let old_val = tx.get(&self.0.tree, &key)?; + let is_same = match (&old_val, &expected_old) { + (None, None) => true, + (Some(x), Some(y)) if x == y.as_ref() => true, + _ => false, + }; + if is_same { + match &new { + Some(v) => { + tx.insert(&self.0.tree, &key, v)?; + } + None => { + tx.remove(&self.0.tree, &key)?; + } + } + tx.commit(()) + } else { + tx.abort(()) + } + }); + + match tx_res { + Ok(()) => { + match (old_some, new_some) { + (false, true) => { + self.0.len.fetch_add(1, Ordering::SeqCst); + } + (true, false) => { + self.0.len.fetch_sub(1, Ordering::SeqCst); + } + _ => (), + } + Ok(true) + } + Err(TxError::Abort(())) => Ok(false), + Err(TxError::Db(e)) => Err(e), + } + } +} diff --git a/src/db/lib.rs b/src/db/lib.rs new file mode 100644 index 00000000..e9d3ea18 --- /dev/null +++ b/src/db/lib.rs @@ -0,0 +1,400 @@ +pub mod lmdb_adapter; +pub mod sled_adapter; +pub mod sqlite_adapter; + +pub mod counted_tree_hack; + +#[cfg(test)] +pub mod test; + +use core::ops::{Bound, RangeBounds}; + +use std::borrow::Cow; +use std::cell::Cell; +use std::sync::Arc; + +use err_derive::Error; + +#[derive(Clone)] +pub struct Db(pub(crate) Arc); + +pub struct Transaction<'a>(&'a mut dyn ITx); + +#[derive(Clone)] +pub struct Tree(Arc, usize); + +pub type Value = Vec; +pub type ValueIter<'a> = Box> + 'a>; +pub type TxValueIter<'a> = Box> + 'a>; + +// ---- + +#[derive(Debug, Error)] +#[error(display = "{}", _0)] +pub struct Error(pub Cow<'static, str>); + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +#[error(display = "{}", _0)] +pub struct TxOpError(pub(crate) Error); +pub type TxOpResult = std::result::Result; + +pub enum TxError { + Abort(E), + Db(Error), +} +pub type TxResult = std::result::Result>; + +impl From for TxError { + fn from(e: TxOpError) -> TxError { + TxError::Db(e.0) + } +} + +pub fn unabort(res: TxResult) -> TxOpResult> { + match res { + Ok(v) => Ok(Ok(v)), + Err(TxError::Abort(e)) => Ok(Err(e)), + Err(TxError::Db(e)) => Err(TxOpError(e)), + } +} + +// ---- + +impl Db { + pub fn engine(&self) -> String { + self.0.engine() + } + + pub fn open_tree>(&self, name: S) -> Result { + let tree_id = self.0.open_tree(name.as_ref())?; + Ok(Tree(self.0.clone(), tree_id)) + } + + pub fn list_trees(&self) -> Result> { + self.0.list_trees() + } + + pub fn transaction(&self, fun: F) -> TxResult + where + F: Fn(Transaction<'_>) -> TxResult, + { + let f = TxFn { + function: fun, + result: Cell::new(None), + }; + let tx_res = self.0.transaction(&f); + let ret = f + .result + .into_inner() + .expect("Transaction did not store result"); + + match tx_res { + Ok(()) => { + assert!(matches!(ret, Ok(_))); + ret + } + Err(TxError::Abort(())) => { + assert!(matches!(ret, Err(TxError::Abort(_)))); + ret + } + Err(TxError::Db(e2)) => match ret { + // Ok was stored -> the error occured when finalizing + // transaction + Ok(_) => Err(TxError::Db(e2)), + // An error was already stored: that's the one we want to + // return + Err(TxError::Db(e)) => Err(TxError::Db(e)), + _ => unreachable!(), + }, + } + } + + pub fn import(&self, other: &Db) -> Result<()> { + let existing_trees = self.list_trees()?; + if !existing_trees.is_empty() { + return Err(Error( + format!( + "destination database already contains data: {:?}", + existing_trees + ) + .into(), + )); + } + + let tree_names = other.list_trees()?; + for name in tree_names { + let tree = self.open_tree(&name)?; + if tree.len()? > 0 { + return Err(Error(format!("tree {} already contains data", name).into())); + } + + let ex_tree = other.open_tree(&name)?; + + let tx_res = self.transaction(|mut tx| { + let mut i = 0; + for item in ex_tree.iter().map_err(TxError::Abort)? { + let (k, v) = item.map_err(TxError::Abort)?; + tx.insert(&tree, k, v)?; + i += 1; + if i % 1000 == 0 { + println!("{}: imported {}", name, i); + } + } + tx.commit(i) + }); + let total = match tx_res { + Err(TxError::Db(e)) => return Err(e), + Err(TxError::Abort(e)) => return Err(e), + Ok(x) => x, + }; + + println!("{}: finished importing, {} items", name, total); + } + Ok(()) + } +} + +#[allow(clippy::len_without_is_empty)] +impl Tree { + #[inline] + pub fn db(&self) -> Db { + Db(self.0.clone()) + } + + #[inline] + pub fn get>(&self, key: T) -> Result> { + self.0.get(self.1, key.as_ref()) + } + #[inline] + pub fn len(&self) -> Result { + self.0.len(self.1) + } + + #[inline] + pub fn first(&self) -> Result> { + self.iter()?.next().transpose() + } + #[inline] + pub fn get_gt>(&self, from: T) -> Result> { + self.range((Bound::Excluded(from), Bound::Unbounded))? + .next() + .transpose() + } + + /// Returns the old value if there was one + #[inline] + pub fn insert, U: AsRef<[u8]>>( + &self, + key: T, + value: U, + ) -> Result> { + self.0.insert(self.1, key.as_ref(), value.as_ref()) + } + /// Returns the old value if there was one + #[inline] + pub fn remove>(&self, key: T) -> Result> { + self.0.remove(self.1, key.as_ref()) + } + + #[inline] + pub fn iter(&self) -> Result> { + self.0.iter(self.1) + } + #[inline] + pub fn iter_rev(&self) -> Result> { + self.0.iter_rev(self.1) + } + + #[inline] + pub fn range(&self, range: R) -> Result> + where + K: AsRef<[u8]>, + R: RangeBounds, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range(self.1, get_bound(sb), get_bound(eb)) + } + #[inline] + pub fn range_rev(&self, range: R) -> Result> + where + K: AsRef<[u8]>, + R: RangeBounds, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range_rev(self.1, get_bound(sb), get_bound(eb)) + } +} + +#[allow(clippy::len_without_is_empty)] +impl<'a> Transaction<'a> { + #[inline] + pub fn get>(&self, tree: &Tree, key: T) -> TxOpResult> { + self.0.get(tree.1, key.as_ref()) + } + #[inline] + pub fn len(&self, tree: &Tree) -> TxOpResult { + self.0.len(tree.1) + } + + /// Returns the old value if there was one + #[inline] + pub fn insert, U: AsRef<[u8]>>( + &mut self, + tree: &Tree, + key: T, + value: U, + ) -> TxOpResult> { + self.0.insert(tree.1, key.as_ref(), value.as_ref()) + } + /// Returns the old value if there was one + #[inline] + pub fn remove>(&mut self, tree: &Tree, key: T) -> TxOpResult> { + self.0.remove(tree.1, key.as_ref()) + } + + #[inline] + pub fn iter(&self, tree: &Tree) -> TxOpResult> { + self.0.iter(tree.1) + } + #[inline] + pub fn iter_rev(&self, tree: &Tree) -> TxOpResult> { + self.0.iter_rev(tree.1) + } + + #[inline] + pub fn range(&self, tree: &Tree, range: R) -> TxOpResult> + where + K: AsRef<[u8]>, + R: RangeBounds, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range(tree.1, get_bound(sb), get_bound(eb)) + } + #[inline] + pub fn range_rev(&self, tree: &Tree, range: R) -> TxOpResult> + where + K: AsRef<[u8]>, + R: RangeBounds, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range_rev(tree.1, get_bound(sb), get_bound(eb)) + } + + // ---- + + #[inline] + pub fn abort(self, e: E) -> TxResult { + Err(TxError::Abort(e)) + } + + #[inline] + pub fn commit(self, r: R) -> TxResult { + Ok(r) + } +} + +// ---- Internal interfaces + +pub(crate) trait IDb: Send + Sync { + fn engine(&self) -> String; + fn open_tree(&self, name: &str) -> Result; + fn list_trees(&self) -> Result>; + + fn get(&self, tree: usize, key: &[u8]) -> Result>; + fn len(&self, tree: usize) -> Result; + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result>; + fn remove(&self, tree: usize, key: &[u8]) -> Result>; + + fn iter(&self, tree: usize) -> Result>; + fn iter_rev(&self, tree: usize) -> Result>; + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result>; + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result>; + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; +} + +pub(crate) trait ITx { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult>; + fn len(&self, tree: usize) -> TxOpResult; + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult>; + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult>; + + fn iter(&self, tree: usize) -> TxOpResult>; + fn iter_rev(&self, tree: usize) -> TxOpResult>; + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult>; + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult>; +} + +pub(crate) trait ITxFn { + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult; +} + +pub(crate) enum TxFnResult { + Ok, + Abort, + DbErr, +} + +struct TxFn +where + F: Fn(Transaction<'_>) -> TxResult, +{ + function: F, + result: Cell>>, +} + +impl ITxFn for TxFn +where + F: Fn(Transaction<'_>) -> TxResult, +{ + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { + let res = (self.function)(Transaction(tx)); + let res2 = match &res { + Ok(_) => TxFnResult::Ok, + Err(TxError::Abort(_)) => TxFnResult::Abort, + Err(TxError::Db(_)) => TxFnResult::DbErr, + }; + self.result.set(Some(res)); + res2 + } +} + +// ---- + +fn get_bound>(b: Bound<&K>) -> Bound<&[u8]> { + match b { + Bound::Included(v) => Bound::Included(v.as_ref()), + Bound::Excluded(v) => Bound::Excluded(v.as_ref()), + Bound::Unbounded => Bound::Unbounded, + } +} diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs new file mode 100644 index 00000000..74622919 --- /dev/null +++ b/src/db/lmdb_adapter.rs @@ -0,0 +1,329 @@ +use core::ops::Bound; +use core::ptr::NonNull; + +use std::collections::HashMap; +use std::convert::TryInto; +use std::sync::{Arc, RwLock}; + +use heed::types::ByteSlice; +use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; + +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + TxValueIter, Value, ValueIter, +}; + +pub use heed; + +// -- err + +impl From for Error { + fn from(e: heed::Error) -> Error { + Error(format!("LMDB: {}", e).into()) + } +} + +impl From for TxOpError { + fn from(e: heed::Error) -> TxOpError { + TxOpError(e.into()) + } +} + +// -- db + +pub struct LmdbDb { + db: heed::Env, + trees: RwLock<(Vec, HashMap)>, +} + +impl LmdbDb { + pub fn init(db: Env) -> Db { + let s = Self { + db, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or_else(|| Error("invalid tree id".into())) + } +} + +impl IDb for LmdbDb { + fn engine(&self) -> String { + "LMDB (using Heed crate)".into() + } + + fn open_tree(&self, name: &str) -> Result { + let mut trees = self.trees.write().unwrap(); + if let Some(i) = trees.1.get(name) { + Ok(*i) + } else { + let tree = self.db.create_database(Some(name))?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(name.to_string(), i); + Ok(i) + } + } + + fn list_trees(&self) -> Result> { + let tree0 = match self.db.open_database::(None)? { + Some(x) => x, + None => return Ok(vec![]), + }; + + let mut ret = vec![]; + let tx = self.db.read_txn()?; + for item in tree0.iter(&tx)? { + let (tree_name, _) = item?; + ret.push(tree_name.to_string()); + } + drop(tx); + + let mut ret2 = vec![]; + for tree_name in ret { + if self + .db + .open_database::(Some(&tree_name))? + .is_some() + { + ret2.push(tree_name); + } + } + + Ok(ret2) + } + + // ---- + + fn get(&self, tree: usize, key: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + + let tx = self.db.read_txn()?; + let val = tree.get(&tx, key)?; + match val { + None => Ok(None), + Some(v) => Ok(Some(v.to_vec())), + } + } + + fn len(&self, tree: usize) -> Result { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + Ok(tree.len(&tx)?.try_into().unwrap()) + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + let mut tx = self.db.write_txn()?; + let old_val = tree.get(&tx, key)?.map(Vec::from); + tree.put(&mut tx, key, value)?; + tx.commit()?; + Ok(old_val) + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + let mut tx = self.db.write_txn()?; + let old_val = tree.get(&tx, key)?.map(Vec::from); + tree.delete(&mut tx, key)?; + tx.commit()?; + Ok(old_val) + } + + fn iter(&self, tree: usize) -> Result> { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?)) + } + + fn iter_rev(&self, tree: usize) -> Result> { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?)) + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?)) + } + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?)) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + let trees = self.trees.read().unwrap(); + let mut tx = LmdbTx { + trees: &trees.0[..], + tx: self + .db + .write_txn() + .map_err(Error::from) + .map_err(TxError::Db)?, + }; + + let res = f.try_on(&mut tx); + match res { + TxFnResult::Ok => { + tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; + Ok(()) + } + TxFnResult::Abort => { + tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; + Err(TxError::Abort(())) + } + TxFnResult::DbErr => { + tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; + Err(TxError::Db(Error( + "(this message will be discarded)".into(), + ))) + } + } + } +} + +// ---- + +struct LmdbTx<'a> { + trees: &'a [Database], + tx: RwTxn<'a, 'a>, +} + +impl<'a> LmdbTx<'a> { + fn get_tree(&self, i: usize) -> TxOpResult<&Database> { + self.trees.get(i).ok_or_else(|| { + TxOpError(Error( + "invalid tree id (it might have been openned after the transaction started)".into(), + )) + }) + } +} + +impl<'a> ITx for LmdbTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + match tree.get(&self.tx, key)? { + Some(v) => Ok(Some(v.to_vec())), + None => Ok(None), + } + } + fn len(&self, _tree: usize) -> TxOpResult { + unimplemented!(".len() in transaction not supported with LMDB backend") + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult> { + let tree = *self.get_tree(tree)?; + let old_val = tree.get(&self.tx, key)?.map(Vec::from); + tree.put(&mut self.tx, key, value)?; + Ok(old_val) + } + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = *self.get_tree(tree)?; + let old_val = tree.get(&self.tx, key)?.map(Vec::from); + tree.delete(&mut self.tx, key)?; + Ok(old_val) + } + + fn iter(&self, _tree: usize) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + fn iter_rev(&self, _tree: usize) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + + fn range<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + fn range_rev<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } +} + +// ---- + +type IteratorItem<'a> = heed::Result<( + >::DItem, + >::DItem, +)>; + +struct TxAndIterator<'a, I> +where + I: Iterator> + 'a, +{ + tx: RoTxn<'a>, + iter: Option, +} + +impl<'a, I> TxAndIterator<'a, I> +where + I: Iterator> + 'a, +{ + fn make(tx: RoTxn<'a>, iterfun: F) -> Result> + where + F: FnOnce(&'a RoTxn<'a>) -> Result, + { + let mut res = TxAndIterator { tx, iter: None }; + + let tx = unsafe { NonNull::from(&res.tx).as_ref() }; + res.iter = Some(iterfun(tx)?); + + Ok(Box::new(res)) + } +} + +impl<'a, I> Drop for TxAndIterator<'a, I> +where + I: Iterator> + 'a, +{ + fn drop(&mut self) { + drop(self.iter.take()); + } +} + +impl<'a, I> Iterator for TxAndIterator<'a, I> +where + I: Iterator> + 'a, +{ + type Item = Result<(Value, Value)>; + + fn next(&mut self) -> Option { + match self.iter.as_mut().unwrap().next() { + None => None, + Some(Err(e)) => Some(Err(e.into())), + Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))), + } + } +} diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs new file mode 100644 index 00000000..982f8d82 --- /dev/null +++ b/src/db/sled_adapter.rs @@ -0,0 +1,260 @@ +use core::ops::Bound; + +use std::cell::Cell; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use sled::transaction::{ + ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, + UnabortableTransactionError, +}; + +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + TxValueIter, Value, ValueIter, +}; + +pub use sled; + +// -- err + +impl From for Error { + fn from(e: sled::Error) -> Error { + Error(format!("Sled: {}", e).into()) + } +} + +impl From for TxOpError { + fn from(e: sled::Error) -> TxOpError { + TxOpError(e.into()) + } +} + +// -- db + +pub struct SledDb { + db: sled::Db, + trees: RwLock<(Vec, HashMap)>, +} + +impl SledDb { + pub fn init(db: sled::Db) -> Db { + let s = Self { + db, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or_else(|| Error("invalid tree id".into())) + } +} + +impl IDb for SledDb { + fn engine(&self) -> String { + "Sled".into() + } + + fn open_tree(&self, name: &str) -> Result { + let mut trees = self.trees.write().unwrap(); + if let Some(i) = trees.1.get(name) { + Ok(*i) + } else { + let tree = self.db.open_tree(name)?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(name.to_string(), i); + Ok(i) + } + } + + fn list_trees(&self) -> Result> { + let mut trees = vec![]; + for name in self.db.tree_names() { + let name = std::str::from_utf8(&name) + .map_err(|e| Error(format!("{}", e).into()))? + .to_string(); + if name != "__sled__default" { + trees.push(name); + } + } + Ok(trees) + } + + // ---- + + fn get(&self, tree: usize, key: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + let val = tree.get(key)?; + Ok(val.map(|x| x.to_vec())) + } + + fn len(&self, tree: usize) -> Result { + let tree = self.get_tree(tree)?; + Ok(tree.len()) + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + let old_val = tree.insert(key, value)?; + Ok(old_val.map(|x| x.to_vec())) + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result> { + let tree = self.get_tree(tree)?; + let old_val = tree.remove(key)?; + Ok(old_val.map(|x| x.to_vec())) + } + + fn iter(&self, tree: usize) -> Result> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.iter().map(|v| { + v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) + }))) + } + + fn iter_rev(&self, tree: usize) -> Result> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.iter().rev().map(|v| { + v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) + }))) + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| { + v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) + }))) + } + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree)?; + Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map( + |v| v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into), + ))) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + let trees = self.trees.read().unwrap(); + let res = trees.0.transaction(|txtrees| { + let mut tx = SledTx { + trees: txtrees, + err: Cell::new(None), + }; + match f.try_on(&mut tx) { + TxFnResult::Ok => { + assert!(tx.err.into_inner().is_none()); + Ok(()) + } + TxFnResult::Abort => { + assert!(tx.err.into_inner().is_none()); + Err(ConflictableTransactionError::Abort(())) + } + TxFnResult::DbErr => { + let e = tx.err.into_inner().expect("No DB error"); + Err(e.into()) + } + } + }); + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(())) => Err(TxError::Abort(())), + Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), + } + } +} + +// ---- + +struct SledTx<'a> { + trees: &'a [TransactionalTree], + err: Cell>, +} + +impl<'a> SledTx<'a> { + fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> { + self.trees.get(i).ok_or_else(|| { + TxOpError(Error( + "invalid tree id (it might have been openned after the transaction started)".into(), + )) + }) + } + + fn save_error( + &self, + v: std::result::Result, + ) -> TxOpResult { + match v { + Ok(x) => Ok(x), + Err(e) => { + let txt = format!("{}", e); + self.err.set(Some(e)); + Err(TxOpError(Error(txt.into()))) + } + } + } +} + +impl<'a> ITx for SledTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + let tmp = self.save_error(tree.get(key))?; + Ok(tmp.map(|x| x.to_vec())) + } + fn len(&self, _tree: usize) -> TxOpResult { + unimplemented!(".len() in transaction not supported with Sled backend") + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + let old_val = self.save_error(tree.insert(key, value))?; + Ok(old_val.map(|x| x.to_vec())) + } + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + let old_val = self.save_error(tree.remove(key))?; + Ok(old_val.map(|x| x.to_vec())) + } + + fn iter(&self, _tree: usize) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with Sled backend"); + } + fn iter_rev(&self, _tree: usize) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with Sled backend"); + } + + fn range<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with Sled backend"); + } + fn range_rev<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!("Iterators in transactions not supported with Sled backend"); + } +} diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs new file mode 100644 index 00000000..14bf35ff --- /dev/null +++ b/src/db/sqlite_adapter.rs @@ -0,0 +1,500 @@ +use core::ops::Bound; + +use std::borrow::BorrowMut; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex, MutexGuard}; + +use log::trace; + +use rusqlite::{params, Connection, Rows, Statement, Transaction}; + +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + TxValueIter, Value, ValueIter, +}; + +pub use rusqlite; + +// --- err + +impl From for Error { + fn from(e: rusqlite::Error) -> Error { + Error(format!("Sqlite: {}", e).into()) + } +} + +impl From for TxOpError { + fn from(e: rusqlite::Error) -> TxOpError { + TxOpError(e.into()) + } +} + +// -- db + +pub struct SqliteDb(Mutex); + +struct SqliteDbInner { + db: Connection, + trees: Vec, +} + +impl SqliteDb { + pub fn init(db: rusqlite::Connection) -> Db { + let s = Self(Mutex::new(SqliteDbInner { + db, + trees: Vec::new(), + })); + Db(Arc::new(s)) + } +} + +impl SqliteDbInner { + fn get_tree(&self, i: usize) -> Result<&'_ str> { + self.trees + .get(i) + .map(String::as_str) + .ok_or_else(|| Error("invalid tree id".into())) + } + + fn internal_get(&self, tree: &str, key: &[u8]) -> Result> { + let mut stmt = self + .db + .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + let mut res_iter = stmt.query([key])?; + match res_iter.next()? { + None => Ok(None), + Some(v) => Ok(Some(v.get::<_, Vec>(0)?)), + } + } +} + +impl IDb for SqliteDb { + fn engine(&self) -> String { + format!("sqlite3 v{} (using rusqlite crate)", rusqlite::version()) + } + + fn open_tree(&self, name: &str) -> Result { + let name = format!("tree_{}", name.replace(':', "_COLON_")); + let mut this = self.0.lock().unwrap(); + + if let Some(i) = this.trees.iter().position(|x| x == &name) { + Ok(i) + } else { + trace!("create table {}", name); + this.db.execute( + &format!( + "CREATE TABLE IF NOT EXISTS {} ( + k BLOB PRIMARY KEY, + v BLOB + )", + name + ), + [], + )?; + trace!("table created: {}, unlocking", name); + + let i = this.trees.len(); + this.trees.push(name.to_string()); + Ok(i) + } + } + + fn list_trees(&self) -> Result> { + let mut trees = vec![]; + + trace!("list_trees: lock db"); + let this = self.0.lock().unwrap(); + trace!("list_trees: lock acquired"); + + let mut stmt = this.db.prepare( + "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", + )?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + let name = row.get::<_, String>(0)?; + let name = name.replace("_COLON_", ":"); + let name = name.strip_prefix("tree_").unwrap().to_string(); + trees.push(name); + } + Ok(trees) + } + + // ---- + + fn get(&self, tree: usize, key: &[u8]) -> Result> { + trace!("get {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("get {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + this.internal_get(tree, key) + } + + fn len(&self, tree: usize) -> Result { + trace!("len {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("len {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let mut res_iter = stmt.query([])?; + match res_iter.next()? { + None => Ok(0), + Some(v) => Ok(v.get::<_, usize>(0)?), + } + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { + trace!("insert {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("insert {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + let old_val = this.internal_get(tree, key)?; + + let sql = match &old_val { + Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree), + None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree), + }; + let n = this.db.execute(&sql, params![key, value])?; + assert_eq!(n, 1); + + Ok(old_val) + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result> { + trace!("remove {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("remove {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + let old_val = this.internal_get(tree, key)?; + + if old_val.is_some() { + let n = this + .db + .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + assert_eq!(n, 1); + } + + Ok(old_val) + } + + fn iter(&self, tree: usize) -> Result> { + trace!("iter {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("iter {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); + DbValueIterator::make(this, &sql, []) + } + + fn iter_rev(&self, tree: usize) -> Result> { + trace!("iter_rev {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("iter_rev {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); + DbValueIterator::make(this, &sql, []) + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + trace!("range {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("range {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + + let (bounds_sql, params) = bounds_sql(low, high); + let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); + + let params = params + .iter() + .map(|x| x as &dyn rusqlite::ToSql) + .collect::>(); + + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + } + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + trace!("range_rev {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("range_rev {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + + let (bounds_sql, params) = bounds_sql(low, high); + let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); + + let params = params + .iter() + .map(|x| x as &dyn rusqlite::ToSql) + .collect::>(); + + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + trace!("transaction: lock db"); + let mut this = self.0.lock().unwrap(); + trace!("transaction: lock acquired"); + + let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); + + let mut tx = SqliteTx { + tx: this_mut_ref + .db + .transaction() + .map_err(Error::from) + .map_err(TxError::Db)?, + trees: &this_mut_ref.trees, + }; + let res = match f.try_on(&mut tx) { + TxFnResult::Ok => { + tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; + Ok(()) + } + TxFnResult::Abort => { + tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; + Err(TxError::Abort(())) + } + TxFnResult::DbErr => { + tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; + Err(TxError::Db(Error( + "(this message will be discarded)".into(), + ))) + } + }; + + trace!("transaction done"); + res + } +} + +// ---- + +struct SqliteTx<'a> { + tx: Transaction<'a>, + trees: &'a [String], +} + +impl<'a> SqliteTx<'a> { + fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> { + self.trees.get(i).map(String::as_ref).ok_or_else(|| { + TxOpError(Error( + "invalid tree id (it might have been openned after the transaction started)".into(), + )) + }) + } + + fn internal_get(&self, tree: &str, key: &[u8]) -> TxOpResult> { + let mut stmt = self + .tx + .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + let mut res_iter = stmt.query([key])?; + match res_iter.next()? { + None => Ok(None), + Some(v) => Ok(Some(v.get::<_, Vec>(0)?)), + } + } +} + +impl<'a> ITx for SqliteTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + self.internal_get(tree, key) + } + fn len(&self, tree: usize) -> TxOpResult { + let tree = self.get_tree(tree)?; + let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let mut res_iter = stmt.query([])?; + match res_iter.next()? { + None => Ok(0), + Some(v) => Ok(v.get::<_, usize>(0)?), + } + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + let old_val = self.internal_get(tree, key)?; + + let sql = match &old_val { + Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree), + None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree), + }; + let n = self.tx.execute(&sql, params![key, value])?; + assert_eq!(n, 1); + + Ok(old_val) + } + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree)?; + let old_val = self.internal_get(tree, key)?; + + if old_val.is_some() { + let n = self + .tx + .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + assert_eq!(n, 1); + } + + Ok(old_val) + } + + fn iter(&self, _tree: usize) -> TxOpResult> { + unimplemented!(); + } + fn iter_rev(&self, _tree: usize) -> TxOpResult> { + unimplemented!(); + } + + fn range<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!(); + } + fn range_rev<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> TxOpResult> { + unimplemented!(); + } +} + +// ---- + +struct DbValueIterator<'a> { + db: MutexGuard<'a, SqliteDbInner>, + stmt: Option>, + iter: Option>, + _pin: PhantomPinned, +} + +impl<'a> DbValueIterator<'a> { + fn make( + db: MutexGuard<'a, SqliteDbInner>, + sql: &str, + args: P, + ) -> Result> { + let res = DbValueIterator { + db, + stmt: None, + iter: None, + _pin: PhantomPinned, + }; + let mut boxed = Box::pin(res); + trace!("make iterator with sql: {}", sql); + + unsafe { + let db = NonNull::from(&boxed.db); + let stmt = db.as_ref().db.prepare(sql)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); + + let mut stmt = NonNull::from(&boxed.stmt); + let iter = stmt.as_mut().as_mut().unwrap().query(args)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).iter = Some(iter); + } + + Ok(Box::new(DbValueIteratorPin(boxed))) + } +} + +impl<'a> Drop for DbValueIterator<'a> { + fn drop(&mut self) { + trace!("drop iter"); + drop(self.iter.take()); + drop(self.stmt.take()); + } +} + +struct DbValueIteratorPin<'a>(Pin>>); + +impl<'a> Iterator for DbValueIteratorPin<'a> { + type Item = Result<(Value, Value)>; + + fn next(&mut self) -> Option { + let next = unsafe { + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); + Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() + }; + let row = match next { + Err(e) => return Some(Err(e.into())), + Ok(None) => return None, + Ok(Some(r)) => r, + }; + let k = match row.get::<_, Vec>(0) { + Err(e) => return Some(Err(e.into())), + Ok(x) => x, + }; + let v = match row.get::<_, Vec>(1) { + Err(e) => return Some(Err(e.into())), + Ok(y) => y, + }; + Some(Ok((k, v))) + } +} + +// ---- + +fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec>) { + let mut sql = String::new(); + let mut params: Vec> = vec![]; + + match low { + Bound::Included(b) => { + sql.push_str(" WHERE k >= ?1"); + params.push(b.to_vec()); + } + Bound::Excluded(b) => { + sql.push_str(" WHERE k > ?1"); + params.push(b.to_vec()); + } + Bound::Unbounded => (), + }; + + match high { + Bound::Included(b) => { + if !params.is_empty() { + sql.push_str(" AND k <= ?2"); + } else { + sql.push_str(" WHERE k <= ?1"); + } + params.push(b.to_vec()); + } + Bound::Excluded(b) => { + if !params.is_empty() { + sql.push_str(" AND k < ?2"); + } else { + sql.push_str(" WHERE k < ?1"); + } + params.push(b.to_vec()); + } + Bound::Unbounded => (), + } + + (sql, params) +} diff --git a/src/db/test.rs b/src/db/test.rs new file mode 100644 index 00000000..cfcee643 --- /dev/null +++ b/src/db/test.rs @@ -0,0 +1,106 @@ +use crate::*; + +use crate::lmdb_adapter::LmdbDb; +use crate::sled_adapter::SledDb; +use crate::sqlite_adapter::SqliteDb; + +fn test_suite(db: Db) { + let tree = db.open_tree("tree").unwrap(); + + let ka: &[u8] = &b"test"[..]; + let kb: &[u8] = &b"zwello"[..]; + let kint: &[u8] = &b"tz"[..]; + let va: &[u8] = &b"plop"[..]; + let vb: &[u8] = &b"plip"[..]; + let vc: &[u8] = &b"plup"[..]; + + assert!(tree.insert(ka, va).unwrap().is_none()); + assert_eq!(tree.get(ka).unwrap().unwrap(), va); + + let res = db.transaction::<_, (), _>(|mut tx| { + assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); + + assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va); + + assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); + + tx.commit(12) + }); + assert!(matches!(res, Ok(12))); + assert_eq!(tree.get(ka).unwrap().unwrap(), vb); + + let res = db.transaction::<(), _, _>(|mut tx| { + assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); + + assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb); + + assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc); + + tx.abort(42) + }); + assert!(matches!(res, Err(TxError::Abort(42)))); + assert_eq!(tree.get(ka).unwrap().unwrap(), vb); + + let mut iter = tree.iter().unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + assert!(iter.next().is_none()); + drop(iter); + + assert!(tree.insert(kb, vc).unwrap().is_none()); + assert_eq!(tree.get(kb).unwrap().unwrap(), vc); + + let mut iter = tree.iter().unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + assert!(iter.next().is_none()); + drop(iter); + + let mut iter = tree.range(kint..).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + assert!(iter.next().is_none()); + drop(iter); + + let mut iter = tree.range_rev(..kint).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + assert!(iter.next().is_none()); + drop(iter); + + let mut iter = tree.iter_rev().unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + assert!(iter.next().is_none()); + drop(iter); +} + +#[test] +fn test_lmdb_db() { + let path = mktemp::Temp::new_dir().unwrap(); + let db = heed::EnvOpenOptions::new() + .max_dbs(100) + .open(&path) + .unwrap(); + let db = LmdbDb::init(db); + test_suite(db); + drop(path); +} + +#[test] +fn test_sled_db() { + let path = mktemp::Temp::new_dir().unwrap(); + let db = SledDb::init(sled::open(path.to_path_buf()).unwrap()); + test_suite(db); + drop(path); +} + +#[test] +fn test_sqlite_db() { + let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap()); + test_suite(db); +} -- cgit v1.2.3 From 138e13071be37d873344cd03e316c87ff8057ea0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Jun 2022 14:55:20 +0200 Subject: Fix garage_db build on 32-bit systems --- src/db/bin/convert.rs | 9 +-------- src/db/lmdb_adapter.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 8 deletions(-) (limited to 'src/db') diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs index 9e45e61f..bbde2048 100644 --- a/src/db/bin/convert.rs +++ b/src/db/bin/convert.rs @@ -55,14 +55,7 @@ fn open_db(path: PathBuf, engine: String) -> Result { Error(format!("Unable to create LMDB data directory: {}", e).into()) })?; - let map_size = if u32::MAX as usize == usize::MAX { - eprintln!( - "LMDB is not recommended on 32-bit systems, database size will be limited" - ); - 1usize << 30 // 1GB for 32-bit systems - } else { - 1usize << 40 // 1TB for 64-bit systems - }; + let map_size = lmdb_adapter::recommended_map_size(); let db = lmdb_adapter::heed::EnvOpenOptions::new() .max_dbs(100) diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 74622919..62fcc3e6 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -327,3 +327,17 @@ where } } } + +// ---- + +#[cfg(target_pointer_width = "64")] +pub fn recommended_map_size() -> usize { + 1usize << 40 +} + +#[cfg(target_pointer_width = "32")] +pub fn recommended_map_size() -> usize { + use log::warn; + warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); + 1usize << 30 +} -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/db/lib.rs | 6 ++++++ src/db/lmdb_adapter.rs | 8 ++++++++ src/db/sled_adapter.rs | 6 ++++++ src/db/sqlite_adapter.rs | 10 ++++++++++ 4 files changed, 30 insertions(+) (limited to 'src/db') diff --git a/src/db/lib.rs b/src/db/lib.rs index e9d3ea18..8188c715 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -197,6 +197,11 @@ impl Tree { pub fn remove>(&self, key: T) -> Result> { self.0.remove(self.1, key.as_ref()) } + /// Clears all values from the tree + #[inline] + pub fn clear(&self) -> Result<()> { + self.0.clear(self.1) + } #[inline] pub fn iter(&self) -> Result> { @@ -311,6 +316,7 @@ pub(crate) trait IDb: Send + Sync { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result>; fn remove(&self, tree: usize, key: &[u8]) -> Result>; + fn clear(&self, tree: usize) -> Result<()>; fn iter(&self, tree: usize) -> Result>; fn iter_rev(&self, tree: usize) -> Result>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 62fcc3e6..fdb254c6 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -139,6 +139,14 @@ impl IDb for LmdbDb { Ok(old_val) } + fn clear(&self, tree: usize) -> Result<()> { + let tree = self.get_tree(tree)?; + let mut tx = self.db.write_txn()?; + tree.clear(&mut tx)?; + tx.commit()?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 982f8d82..cf61867d 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -113,6 +113,12 @@ impl IDb for SledDb { Ok(old_val.map(|x| x.to_vec())) } + fn clear(&self, tree: usize) -> Result<()> { + let tree = self.get_tree(tree)?; + tree.clear()?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; Ok(Box::new(tree.iter().map(|v| { diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 14bf35ff..68d96ca0 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -182,6 +182,16 @@ impl IDb for SqliteDb { Ok(old_val) } + fn clear(&self, tree: usize) -> Result<()> { + trace!("clear {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("clear {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + this.db.execute(&format!("DELETE FROM {}", tree), [])?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result> { trace!("iter {}: lock db", tree); let this = self.0.lock().unwrap(); -- cgit v1.2.3 From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/db/Cargo.toml | 2 +- src/db/sqlite_adapter.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/db') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 6d8f64be..f697054b 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -19,7 +19,7 @@ required-features = ["cli"] [dependencies] err-derive = "0.3" hexdump = "0.1" -log = "0.4" +tracing = "0.1.30" heed = "0.11" rusqlite = { version = "0.27", features = ["bundled"] } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 68d96ca0..97a78b07 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::ptr::NonNull; use std::sync::{Arc, Mutex, MutexGuard}; -use log::trace; +use tracing::trace; use rusqlite::{params, Connection, Rows, Statement, Transaction}; -- cgit v1.2.3 From ac03fa7937d9da29d2358343a499fe9d15ac5f7c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 15 Jul 2022 18:31:19 +0200 Subject: Uniformize tracing::* imports (hopefully fixes 32-bit build) --- src/db/lib.rs | 3 +++ src/db/lmdb_adapter.rs | 1 - src/db/sqlite_adapter.rs | 2 -- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/db') diff --git a/src/db/lib.rs b/src/db/lib.rs index 8188c715..f185114e 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + pub mod lmdb_adapter; pub mod sled_adapter; pub mod sqlite_adapter; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index fdb254c6..c036c990 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -345,7 +345,6 @@ pub fn recommended_map_size() -> usize { #[cfg(target_pointer_width = "32")] pub fn recommended_map_size() -> usize { - use log::warn; warn!("LMDB is not recommended on 32-bit systems, database size will be limited"); 1usize << 30 } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 97a78b07..886fda6e 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -6,8 +6,6 @@ use std::pin::Pin; use std::ptr::NonNull; use std::sync::{Arc, Mutex, MutexGuard}; -use tracing::trace; - use rusqlite::{params, Connection, Rows, Statement, Transaction}; use crate::{ -- cgit v1.2.3 From 7511ba5530d56a446fefe2372409d9c2ceea17c5 Mon Sep 17 00:00:00 2001 From: Jakub Jirutka Date: Sat, 3 Sep 2022 19:05:32 +0200 Subject: Allow linking against system-provided libsqlite Unfortunately, rusqlite uses the opposite logic for enabling/disabling bundled libraries to others (libsodium-sys, zstd-sys). Cargo features are very limited and doesn't allow to enable feature A in a dependency iff feature B is disabled. Note, lmdb-rkv-sys doesn't need any special treatment because it automatically links against system liblmdb if found via pkgconf. Linux distros should build garage with `--no-default-features --features system-libs` to disable bundled-libs and enable system-libs. --- src/db/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/db') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index f697054b..230fbaf9 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -22,7 +22,7 @@ hexdump = "0.1" tracing = "0.1.30" heed = "0.11" -rusqlite = { version = "0.27", features = ["bundled"] } +rusqlite = "0.27" sled = "0.34" # cli deps @@ -33,4 +33,5 @@ pretty_env_logger = { version = "0.4", optional = true } mktemp = "0.4" [features] +bundled-libs = [ "rusqlite/bundled" ] cli = ["clap", "pretty_env_logger"] -- cgit v1.2.3 From 729a910e14bc44925175ea8240d0c16fdfc18103 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Sep 2022 16:40:13 +0200 Subject: Remove Heed default features --- src/db/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/db') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 230fbaf9..44f0be56 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -21,7 +21,7 @@ err-derive = "0.3" hexdump = "0.1" tracing = "0.1.30" -heed = "0.11" +heed = { version = "0.11", default-features = false, features = ["lmdb"] } rusqlite = "0.27" sled = "0.34" -- cgit v1.2.3 From b886c75450e3ee6a7c2b0a8265d7ada20a4d9d75 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:09:43 +0200 Subject: Make all DB engines optional build features --- src/db/Cargo.toml | 8 +++++--- src/db/lib.rs | 4 ++++ 2 files changed, 9 insertions(+), 3 deletions(-) (limited to 'src/db') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 44f0be56..62dda2ca 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -21,9 +21,9 @@ err-derive = "0.3" hexdump = "0.1" tracing = "0.1.30" -heed = { version = "0.11", default-features = false, features = ["lmdb"] } -rusqlite = "0.27" -sled = "0.34" +heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true } +rusqlite = { version = "0.27", optional = true } +sled = { version = "0.34", optional = true } # cli deps clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } @@ -35,3 +35,5 @@ mktemp = "0.4" [features] bundled-libs = [ "rusqlite/bundled" ] cli = ["clap", "pretty_env_logger"] +lmdb = [ "heed" ] +sqlite = [ "rusqlite" ] diff --git a/src/db/lib.rs b/src/db/lib.rs index f185114e..5304c195 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -1,8 +1,12 @@ #[macro_use] +#[cfg(feature = "sqlite")] extern crate tracing; +#[cfg(feature = "lmdb")] pub mod lmdb_adapter; +#[cfg(feature = "sled")] pub mod sled_adapter; +#[cfg(feature = "sqlite")] pub mod sqlite_adapter; pub mod counted_tree_hack; -- cgit v1.2.3 From 14492044394a875475b2159d51234ac1e35531bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 18:02:13 +0200 Subject: Add warnings when features are not included in build --- src/db/lib.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/db') diff --git a/src/db/lib.rs b/src/db/lib.rs index 5304c195..d96586be 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -2,6 +2,9 @@ #[cfg(feature = "sqlite")] extern crate tracing; +#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] +compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); + #[cfg(feature = "lmdb")] pub mod lmdb_adapter; #[cfg(feature = "sled")] -- cgit v1.2.3