diff options
Diffstat (limited to 'src/db')
-rw-r--r-- | src/db/Cargo.toml | 5 | ||||
-rw-r--r-- | src/db/counted_tree_hack.rs | 127 | ||||
-rw-r--r-- | src/db/lib.rs | 18 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 90 | ||||
-rw-r--r-- | src/db/open.rs | 27 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 282 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 184 | ||||
-rw-r--r-- | src/db/test.rs | 60 |
8 files changed, 266 insertions, 527 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index baa94bae..b88298ee 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_db" -version = "0.9.3" +version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -20,13 +20,12 @@ heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } r2d2 = { workspace = true, optional = true } r2d2_sqlite = { workspace = true, optional = true } -sled = { workspace = true, optional = true } [dev-dependencies] mktemp.workspace = true [features] -default = [ "sled", "lmdb", "sqlite" ] +default = [ "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] lmdb = [ "heed" ] sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs deleted file mode 100644 index a4ce12e0..00000000 --- a/src/db/counted_tree_hack.rs +++ /dev/null @@ -1,127 +0,0 @@ -//! This hack allows a db tree to keep in RAM a counter of the number of entries -//! it contains, which is used to call .len() on it. This is usefull only for -//! the sled backend where .len() otherwise would have to traverse the whole -//! tree to count items. For sqlite and lmdb, this is mostly useless (but -//! hopefully not harmfull!). Note that a CountedTree cannot be part of a -//! transaction. - -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - -use crate::{Result, Tree, TxError, Value, ValueIter}; - -#[derive(Clone)] -pub struct CountedTree(Arc<CountedTreeInternal>); - -struct CountedTreeInternal { - tree: Tree, - len: AtomicUsize, -} - -impl CountedTree { - pub fn new(tree: Tree) -> Result<Self> { - let len = tree.len()?; - Ok(Self(Arc::new(CountedTreeInternal { - tree, - len: AtomicUsize::new(len), - }))) - } - - pub fn len(&self) -> usize { - self.0.len.load(Ordering::SeqCst) - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> { - self.0.tree.get(key) - } - - pub fn first(&self) -> Result<Option<(Value, Value)>> { - self.0.tree.first() - } - - pub fn iter(&self) -> Result<ValueIter<'_>> { - self.0.tree.iter() - } - - // ---- writing functions ---- - - pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<Value>> - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let old_val = self.0.tree.insert(key, value)?; - if old_val.is_none() { - self.0.len.fetch_add(1, Ordering::SeqCst); - } - Ok(old_val) - } - - pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> { - let old_val = self.0.tree.remove(key)?; - if old_val.is_some() { - self.0.len.fetch_sub(1, Ordering::SeqCst); - } - Ok(old_val) - } - - pub fn compare_and_swap<K, OV, NV>( - &self, - key: K, - expected_old: Option<OV>, - new: Option<NV>, - ) -> Result<bool> - where - K: AsRef<[u8]>, - OV: AsRef<[u8]>, - NV: AsRef<[u8]>, - { - let old_some = expected_old.is_some(); - let new_some = new.is_some(); - - let tx_res = self.0.tree.db().transaction(|tx| { - let old_val = tx.get(&self.0.tree, &key)?; - let is_same = match (&old_val, &expected_old) { - (None, None) => true, - (Some(x), Some(y)) if x == y.as_ref() => true, - _ => false, - }; - if is_same { - match &new { - Some(v) => { - tx.insert(&self.0.tree, &key, v)?; - } - None => { - tx.remove(&self.0.tree, &key)?; - } - } - Ok(()) - } else { - Err(TxError::Abort(())) - } - }); - - match tx_res { - Ok(()) => { - match (old_some, new_some) { - (false, true) => { - self.0.len.fetch_add(1, Ordering::SeqCst); - } - (true, false) => { - self.0.len.fetch_sub(1, Ordering::SeqCst); - } - _ => (), - } - Ok(true) - } - Err(TxError::Abort(())) => Ok(false), - Err(TxError::Db(e)) => Err(e), - } - } -} diff --git a/src/db/lib.rs b/src/db/lib.rs index 7f19172f..c8f9e13f 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -3,13 +3,9 @@ extern crate tracing; #[cfg(feature = "lmdb")] pub mod lmdb_adapter; -#[cfg(feature = "sled")] -pub mod sled_adapter; #[cfg(feature = "sqlite")] pub mod sqlite_adapter; -pub mod counted_tree_hack; - pub mod open; #[cfg(test)] @@ -62,6 +58,7 @@ pub type Result<T> = std::result::Result<T, Error>; pub struct TxOpError(pub(crate) Error); pub type TxOpResult<T> = std::result::Result<T, TxOpError>; +#[derive(Debug)] pub enum TxError<E> { Abort(E), Db(Error), @@ -200,10 +197,6 @@ impl Tree { pub fn len(&self) -> Result<usize> { self.0.len(self.1) } - #[inline] - pub fn fast_len(&self) -> Result<Option<usize>> { - self.0.fast_len(self.1) - } #[inline] pub fn first(&self) -> Result<Option<(Value, Value)>> { @@ -293,6 +286,11 @@ impl<'a> Transaction<'a> { pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { self.tx.remove(tree.1, key.as_ref()) } + /// Clears all values in a tree + #[inline] + pub fn clear(&mut self, tree: &Tree) -> TxOpResult<()> { + self.tx.clear(tree.1) + } #[inline] pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { @@ -340,9 +338,6 @@ pub(crate) trait IDb: Send + Sync { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn len(&self, tree: usize) -> Result<usize>; - fn fast_len(&self, _tree: usize) -> Result<Option<usize>> { - Ok(None) - } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; @@ -373,6 +368,7 @@ pub(crate) trait ITx { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>; fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>; + fn clear(&mut self, tree: usize) -> TxOpResult<()>; fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>; fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 4b131aff..d5066664 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -4,6 +4,7 @@ use core::ptr::NonNull; use std::collections::HashMap; use std::convert::TryInto; use std::path::PathBuf; +use std::pin::Pin; use std::sync::{Arc, RwLock}; use heed::types::ByteSlice; @@ -131,10 +132,6 @@ impl IDb for LmdbDb { Ok(tree.len(&tx)?.try_into().unwrap()) } - fn fast_len(&self, tree: usize) -> Result<Option<usize>> { - Ok(Some(self.len(tree)?)) - } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { let tree = self.get_tree(tree)?; let mut tx = self.db.write_txn()?; @@ -252,8 +249,9 @@ impl<'a> ITx for LmdbTx<'a> { None => Ok(None), } } - fn len(&self, _tree: usize) -> TxOpResult<usize> { - unimplemented!(".len() in transaction not supported with LMDB backend") + fn len(&self, tree: usize) -> TxOpResult<usize> { + let tree = self.get_tree(tree)?; + Ok(tree.len(&self.tx)? as usize) } fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> { @@ -268,33 +266,48 @@ impl<'a> ITx for LmdbTx<'a> { tree.delete(&mut self.tx, key)?; Ok(old_val) } + fn clear(&mut self, tree: usize) -> TxOpResult<()> { + let tree = *self.get_tree(tree)?; + tree.clear(&mut self.tx)?; + Ok(()) + } - fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with LMDB backend"); + fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> { + let tree = *self.get_tree(tree)?; + Ok(Box::new(tree.iter(&self.tx)?.map(tx_iter_item))) } - fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with LMDB backend"); + fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> { + let tree = *self.get_tree(tree)?; + Ok(Box::new(tree.rev_iter(&self.tx)?.map(tx_iter_item))) } fn range<'r>( &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with LMDB backend"); + let tree = *self.get_tree(tree)?; + Ok(Box::new( + tree.range(&self.tx, &(low, high))?.map(tx_iter_item), + )) } fn range_rev<'r>( &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with LMDB backend"); + let tree = *self.get_tree(tree)?; + Ok(Box::new( + tree.rev_range(&self.tx, &(low, high))?.map(tx_iter_item), + )) } } -// ---- +// ---- iterators outside transactions ---- +// complicated, they must hold the transaction object +// therefore a bit of unsafe code (it is a self-referential struct) type IteratorItem<'a> = heed::Result<( <ByteSlice as BytesDecode<'a>>::DItem, @@ -317,12 +330,20 @@ where where F: FnOnce(&'a RoTxn<'a>) -> Result<I>, { - let mut res = TxAndIterator { tx, iter: None }; + let res = TxAndIterator { tx, iter: None }; + let mut boxed = Box::pin(res); + + // This unsafe allows us to bypass lifetime checks + let tx = unsafe { NonNull::from(&boxed.tx).as_ref() }; + let iter = iterfun(tx)?; - let tx = unsafe { NonNull::from(&res.tx).as_ref() }; - res.iter = Some(iterfun(tx)?); + let mut_ref = Pin::as_mut(&mut boxed); + // This unsafe allows us to write in a field of the pinned struct + unsafe { + Pin::get_unchecked_mut(mut_ref).iter = Some(iter); + } - Ok(Box::new(res)) + Ok(Box::new(TxAndIteratorPin(boxed))) } } @@ -331,18 +352,26 @@ where I: Iterator<Item = IteratorItem<'a>> + 'a, { fn drop(&mut self) { + // ensure the iterator is dropped before the RoTxn it references drop(self.iter.take()); } } -impl<'a, I> Iterator for TxAndIterator<'a, I> +struct TxAndIteratorPin<'a, I>(Pin<Box<TxAndIterator<'a, I>>>) +where + I: Iterator<Item = IteratorItem<'a>> + 'a; + +impl<'a, I> Iterator for TxAndIteratorPin<'a, I> where I: Iterator<Item = IteratorItem<'a>> + 'a, { type Item = Result<(Value, Value)>; fn next(&mut self) -> Option<Self::Item> { - match self.iter.as_mut().unwrap().next() { + let mut_ref = Pin::as_mut(&mut self.0); + // This unsafe allows us to mutably access the iterator field + let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() }; + match next { None => None, Some(Err(e)) => Some(Err(e.into())), Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))), @@ -350,7 +379,16 @@ where } } -// ---- +// ---- iterators within transactions ---- + +fn tx_iter_item<'a>( + item: std::result::Result<(&'a [u8], &'a [u8]), heed::Error>, +) -> TxOpResult<(Vec<u8>, Vec<u8>)> { + item.map(|(k, v)| (k.to_vec(), v.to_vec())) + .map_err(|e| TxOpError(Error::from(e))) +} + +// ---- utility ---- #[cfg(target_pointer_width = "64")] pub fn recommended_map_size() -> usize { diff --git a/src/db/open.rs b/src/db/open.rs index 59d06f2e..19bc96cc 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -11,7 +11,6 @@ use crate::{Db, Error, Result}; pub enum Engine { Lmdb, Sqlite, - Sled, } impl Engine { @@ -20,7 +19,6 @@ impl Engine { match self { Self::Lmdb => "lmdb", Self::Sqlite => "sqlite", - Self::Sled => "sled", } } } @@ -38,10 +36,10 @@ impl std::str::FromStr for Engine { match text { "lmdb" | "heed" => Ok(Self::Lmdb), "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), - "sled" => Ok(Self::Sled), + "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.3).".into())), kind => Err(Error( format!( - "Invalid DB engine: {} (options are: lmdb, sled, sqlite)", + "Invalid DB engine: {} (options are: lmdb, sqlite)", kind ) .into(), @@ -53,8 +51,6 @@ impl std::str::FromStr for Engine { pub struct OpenOpt { pub fsync: bool, pub lmdb_map_size: Option<usize>, - pub sled_cache_capacity: usize, - pub sled_flush_every_ms: u64, } impl Default for OpenOpt { @@ -62,31 +58,12 @@ impl Default for OpenOpt { Self { fsync: false, lmdb_map_size: None, - sled_cache_capacity: 1024 * 1024 * 1024, - sled_flush_every_ms: 2000, } } } pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> { match engine { - // ---- Sled DB ---- - #[cfg(feature = "sled")] - Engine::Sled => { - if opt.fsync { - return Err(Error( - "`metadata_fsync = true` is not supported with the Sled database engine".into(), - )); - } - info!("Opening Sled database at: {}", path.display()); - let db = crate::sled_adapter::sled::Config::default() - .path(&path) - .cache_capacity(opt.sled_cache_capacity as u64) - .flush_every_ms(Some(opt.sled_flush_every_ms)) - .open()?; - Ok(crate::sled_adapter::SledDb::init(db)) - } - // ---- Sqlite DB ---- #[cfg(feature = "sqlite")] Engine::Sqlite => { diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs deleted file mode 100644 index c34b4d81..00000000 --- a/src/db/sled_adapter.rs +++ /dev/null @@ -1,282 +0,0 @@ -use core::ops::Bound; - -use std::cell::Cell; -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::{Arc, RwLock}; - -use sled::transaction::{ - ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, - UnabortableTransactionError, -}; - -use crate::{ - Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, - TxResult, TxValueIter, Value, ValueIter, -}; - -pub use sled; - -// -- err - -impl From<sled::Error> for Error { - fn from(e: sled::Error) -> Error { - Error(format!("Sled: {}", e).into()) - } -} - -impl From<sled::Error> for TxOpError { - fn from(e: sled::Error) -> TxOpError { - TxOpError(e.into()) - } -} - -// -- db - -pub struct SledDb { - db: sled::Db, - trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>, -} - -impl SledDb { - #[deprecated( - since = "0.9.0", - note = "The Sled database is now deprecated and will be removed in Garage v1.0. Please migrate to LMDB or Sqlite as soon as possible." - )] - pub fn init(db: sled::Db) -> Db { - tracing::warn!("-------------------- IMPORTANT WARNING !!! ----------------------"); - tracing::warn!("The Sled database is now deprecated and will be removed in Garage v1.0."); - tracing::warn!("Please migrate to LMDB or Sqlite as soon as possible."); - tracing::warn!("-----------------------------------------------------------------------"); - let s = Self { - db, - trees: RwLock::new((Vec::new(), HashMap::new())), - }; - Db(Arc::new(s)) - } - - fn get_tree(&self, i: usize) -> Result<sled::Tree> { - self.trees - .read() - .unwrap() - .0 - .get(i) - .cloned() - .ok_or_else(|| Error("invalid tree id".into())) - } -} - -impl IDb for SledDb { - fn engine(&self) -> String { - "Sled".into() - } - - fn open_tree(&self, name: &str) -> Result<usize> { - let mut trees = self.trees.write().unwrap(); - if let Some(i) = trees.1.get(name) { - Ok(*i) - } else { - let tree = self.db.open_tree(name)?; - let i = trees.0.len(); - trees.0.push(tree); - trees.1.insert(name.to_string(), i); - Ok(i) - } - } - - fn list_trees(&self) -> Result<Vec<String>> { - let mut trees = vec![]; - for name in self.db.tree_names() { - let name = std::str::from_utf8(&name) - .map_err(|e| Error(format!("{}", e).into()))? - .to_string(); - if name != "__sled__default" { - trees.push(name); - } - } - Ok(trees) - } - - fn snapshot(&self, to: &PathBuf) -> Result<()> { - let to_db = sled::open(to)?; - let export = self.db.export(); - to_db.import(export); - Ok(()) - } - - // ---- - - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - let tree = self.get_tree(tree)?; - let val = tree.get(key)?; - Ok(val.map(|x| x.to_vec())) - } - - fn len(&self, tree: usize) -> Result<usize> { - let tree = self.get_tree(tree)?; - Ok(tree.len()) - } - - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { - let tree = self.get_tree(tree)?; - let old_val = tree.insert(key, value)?; - Ok(old_val.map(|x| x.to_vec())) - } - - fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - let tree = self.get_tree(tree)?; - let old_val = tree.remove(key)?; - Ok(old_val.map(|x| x.to_vec())) - } - - fn clear(&self, tree: usize) -> Result<()> { - let tree = self.get_tree(tree)?; - tree.clear()?; - Ok(()) - } - - fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - Ok(Box::new(tree.iter().map(|v| { - v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) - }))) - } - - fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - Ok(Box::new(tree.iter().rev().map(|v| { - v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) - }))) - } - - fn range<'r>( - &self, - tree: usize, - low: Bound<&'r [u8]>, - high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| { - v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into) - }))) - } - fn range_rev<'r>( - &self, - tree: usize, - low: Bound<&'r [u8]>, - high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map( - |v| v.map(|(x, y)| (x.to_vec(), y.to_vec())).map_err(Into::into), - ))) - } - - // ---- - - fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { - let trees = self.trees.read().unwrap(); - let res = trees.0.transaction(|txtrees| { - let mut tx = SledTx { - trees: txtrees, - err: Cell::new(None), - }; - match f.try_on(&mut tx) { - TxFnResult::Ok(on_commit) => { - assert!(tx.err.into_inner().is_none()); - Ok(on_commit) - } - TxFnResult::Abort => { - assert!(tx.err.into_inner().is_none()); - Err(ConflictableTransactionError::Abort(())) - } - TxFnResult::DbErr => { - let e = tx.err.into_inner().expect("No DB error"); - Err(e.into()) - } - } - }); - match res { - Ok(on_commit) => Ok(on_commit), - Err(TransactionError::Abort(())) => Err(TxError::Abort(())), - Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), - } - } -} - -// ---- - -struct SledTx<'a> { - trees: &'a [TransactionalTree], - err: Cell<Option<UnabortableTransactionError>>, -} - -impl<'a> SledTx<'a> { - fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> { - self.trees.get(i).ok_or_else(|| { - TxOpError(Error( - "invalid tree id (it might have been openned after the transaction started)".into(), - )) - }) - } - - fn save_error<R>( - &self, - v: std::result::Result<R, UnabortableTransactionError>, - ) -> TxOpResult<R> { - match v { - Ok(x) => Ok(x), - Err(e) => { - let txt = format!("{}", e); - self.err.set(Some(e)); - Err(TxOpError(Error(txt.into()))) - } - } - } -} - -impl<'a> ITx for SledTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { - let tree = self.get_tree(tree)?; - let tmp = self.save_error(tree.get(key))?; - Ok(tmp.map(|x| x.to_vec())) - } - fn len(&self, _tree: usize) -> TxOpResult<usize> { - unimplemented!(".len() in transaction not supported with Sled backend") - } - - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> { - let tree = self.get_tree(tree)?; - let old_val = self.save_error(tree.insert(key, value))?; - Ok(old_val.map(|x| x.to_vec())) - } - fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { - let tree = self.get_tree(tree)?; - let old_val = self.save_error(tree.remove(key))?; - Ok(old_val.map(|x| x.to_vec())) - } - - fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with Sled backend"); - } - fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with Sled backend"); - } - - fn range<'r>( - &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, - ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with Sled backend"); - } - fn range_rev<'r>( - &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, - ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!("Iterators in transactions not supported with Sled backend"); - } -} diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 3eccfdde..a91b9011 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -169,10 +169,6 @@ impl IDb for SqliteDb { } } - fn fast_len(&self, tree: usize) -> Result<Option<usize>> { - Ok(Some(self.len(tree)?)) - } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { let tree = self.get_tree(tree)?; let db = self.db.get()?; @@ -371,33 +367,64 @@ impl<'a> ITx for SqliteTx<'a> { Ok(old_val) } + fn clear(&mut self, tree: usize) -> TxOpResult<()> { + let tree = self.get_tree(tree)?; + self.tx.execute(&format!("DELETE FROM {}", tree), [])?; + Ok(()) + } - fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!(); + fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> { + let tree = self.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); + TxValueIterator::make(self, &sql, []) } - fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { - unimplemented!(); + fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> { + let tree = self.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); + TxValueIterator::make(self, &sql, []) } fn range<'r>( &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!(); + let tree = self.get_tree(tree)?; + + let (bounds_sql, params) = bounds_sql(low, high); + let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); + + let params = params + .iter() + .map(|x| x as &dyn rusqlite::ToSql) + .collect::<Vec<_>>(); + + TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref()) } fn range_rev<'r>( &self, - _tree: usize, - _low: Bound<&'r [u8]>, - _high: Bound<&'r [u8]>, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, ) -> TxOpResult<TxValueIter<'_>> { - unimplemented!(); + let tree = self.get_tree(tree)?; + + let (bounds_sql, params) = bounds_sql(low, high); + let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); + + let params = params + .iter() + .map(|x| x as &dyn rusqlite::ToSql) + .collect::<Vec<_>>(); + + TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref()) } } -// ---- +// ---- iterators outside transactions ---- +// complicated, they must hold the Statement and Row objects +// therefore quite some unsafe code (it is a self-referential struct) struct DbValueIterator<'a> { db: Connection, @@ -417,17 +444,23 @@ impl<'a> DbValueIterator<'a> { let mut boxed = Box::pin(res); trace!("make iterator with sql: {}", sql); - unsafe { - let db = NonNull::from(&boxed.db); - let stmt = db.as_ref().prepare(sql)?; + // This unsafe allows us to bypass lifetime checks + let db = unsafe { NonNull::from(&boxed.db).as_ref() }; + let stmt = db.prepare(sql)?; - let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + let mut_ref = Pin::as_mut(&mut boxed); + // This unsafe allows us to write in a field of the pinned struct + unsafe { Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); + } - let mut stmt = NonNull::from(&boxed.stmt); - let iter = stmt.as_mut().as_mut().unwrap().query(args)?; + // This unsafe allows us to bypass lifetime checks + let stmt = unsafe { NonNull::from(&boxed.stmt).as_mut() }; + let iter = stmt.as_mut().unwrap().query(args)?; - let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + let mut_ref = Pin::as_mut(&mut boxed); + // This unsafe allows us to write in a field of the pinned struct + unsafe { Pin::get_unchecked_mut(mut_ref).iter = Some(iter); } @@ -449,28 +482,73 @@ impl<'a> Iterator for DbValueIteratorPin<'a> { type Item = Result<(Value, Value)>; fn next(&mut self) -> Option<Self::Item> { - let next = unsafe { - let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); - Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() - }; - let row = match next { - Err(e) => return Some(Err(e.into())), - Ok(None) => return None, - Ok(Some(r)) => r, - }; - let k = match row.get::<_, Vec<u8>>(0) { - Err(e) => return Some(Err(e.into())), - Ok(x) => x, - }; - let v = match row.get::<_, Vec<u8>>(1) { - Err(e) => return Some(Err(e.into())), - Ok(y) => y, + let mut_ref = Pin::as_mut(&mut self.0); + // This unsafe allows us to mutably access the iterator field + let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() }; + iter_next_row(next) + } +} + +// ---- iterators within transactions ---- +// it's the same except we don't hold a mutex guard, +// only a Statement and a Rows object + +struct TxValueIterator<'a> { + stmt: Statement<'a>, + iter: Option<Rows<'a>>, + _pin: PhantomPinned, +} + +impl<'a> TxValueIterator<'a> { + fn make<P: rusqlite::Params>( + tx: &'a SqliteTx<'a>, + sql: &str, + args: P, + ) -> TxOpResult<TxValueIter<'a>> { + let stmt = tx.tx.prepare(sql)?; + let res = TxValueIterator { + stmt, + iter: None, + _pin: PhantomPinned, }; - Some(Ok((k, v))) + let mut boxed = Box::pin(res); + trace!("make iterator with sql: {}", sql); + + // This unsafe allows us to bypass lifetime checks + let stmt = unsafe { NonNull::from(&boxed.stmt).as_mut() }; + let iter = stmt.query(args)?; + + let mut_ref = Pin::as_mut(&mut boxed); + // This unsafe allows us to write in a field of the pinned struct + unsafe { + Pin::get_unchecked_mut(mut_ref).iter = Some(iter); + } + + Ok(Box::new(TxValueIteratorPin(boxed))) } } -// ---- +impl<'a> Drop for TxValueIterator<'a> { + fn drop(&mut self) { + trace!("drop iter"); + drop(self.iter.take()); + } +} + +struct TxValueIteratorPin<'a>(Pin<Box<TxValueIterator<'a>>>); + +impl<'a> Iterator for TxValueIteratorPin<'a> { + type Item = TxOpResult<(Value, Value)>; + + fn next(&mut self) -> Option<Self::Item> { + let mut_ref = Pin::as_mut(&mut self.0); + // This unsafe allows us to mutably access the iterator field + let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() }; + iter_next_row(next) + } +} + +// ---- utility ---- fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) { let mut sql = String::new(); @@ -510,3 +588,25 @@ fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<V (sql, params) } + +fn iter_next_row<E>( + next_row: rusqlite::Result<Option<&rusqlite::Row>>, +) -> Option<std::result::Result<(Value, Value), E>> +where + E: From<rusqlite::Error>, +{ + let row = match next_row { + Err(e) => return Some(Err(e.into())), + Ok(None) => return None, + Ok(Some(r)) => r, + }; + let k = match row.get::<_, Vec<u8>>(0) { + Err(e) => return Some(Err(e.into())), + Ok(x) => x, + }; + let v = match row.get::<_, Vec<u8>>(1) { + Err(e) => return Some(Err(e.into())), + Ok(y) => y, + }; + Some(Ok((k, v))) +} diff --git a/src/db/test.rs b/src/db/test.rs index cad25f4d..adb429e7 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -10,8 +10,13 @@ fn test_suite(db: Db) { let vb: &[u8] = &b"plip"[..]; let vc: &[u8] = &b"plup"[..]; + // ---- test simple insert/delete ---- + assert!(tree.insert(ka, va).unwrap().is_none()); assert_eq!(tree.get(ka).unwrap().unwrap(), va); + assert_eq!(tree.len().unwrap(), 1); + + // ---- test transaction logic ---- let res = db.transaction::<_, (), _>(|tx| { assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); @@ -37,6 +42,8 @@ fn test_suite(db: Db) { assert!(matches!(res, Err(TxError::Abort(42)))); assert_eq!(tree.get(ka).unwrap().unwrap(), vb); + // ---- test iteration outside of transactions ---- + let mut iter = tree.iter().unwrap(); let next = iter.next().unwrap().unwrap(); assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); @@ -73,6 +80,48 @@ fn test_suite(db: Db) { assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); assert!(iter.next().is_none()); drop(iter); + + // ---- test iteration within transactions ---- + + db.transaction::<_, (), _>(|tx| { + let mut iter = tx.iter(&tree).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + assert!(iter.next().is_none()); + Ok(()) + }) + .unwrap(); + + db.transaction::<_, (), _>(|tx| { + let mut iter = tx.range(&tree, kint..).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + assert!(iter.next().is_none()); + Ok(()) + }) + .unwrap(); + + db.transaction::<_, (), _>(|tx| { + let mut iter = tx.range_rev(&tree, ..kint).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + assert!(iter.next().is_none()); + Ok(()) + }) + .unwrap(); + + db.transaction::<_, (), _>(|tx| { + let mut iter = tx.iter_rev(&tree).unwrap(); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc)); + let next = iter.next().unwrap().unwrap(); + assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); + assert!(iter.next().is_none()); + Ok(()) + }) + .unwrap(); } #[test] @@ -91,17 +140,6 @@ fn test_lmdb_db() { } #[test] -#[cfg(feature = "sled")] -fn test_sled_db() { - use crate::sled_adapter::SledDb; - - let path = mktemp::Temp::new_dir().unwrap(); - let db = SledDb::init(sled::open(path.to_path_buf()).unwrap()); - test_suite(db); - drop(path); -} - -#[test] #[cfg(feature = "sqlite")] fn test_sqlite_db() { use crate::sqlite_adapter::SqliteDb; |