diff options
author | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
---|---|---|
committer | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
commit | 829f815a897b04986559910bbcbf53625adcdf20 (patch) | |
tree | 6db3c27cff2aded754a641d1f2b05c83be701267 /src/db/lib.rs | |
parent | 99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff) | |
parent | a096ced35562bd0a8877a1ee2f755be1edafe343 (diff) | |
download | garage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz garage-829f815a897b04986559910bbcbf53625adcdf20.zip |
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/db/lib.rs')
-rw-r--r-- | src/db/lib.rs | 416 |
1 files changed, 416 insertions, 0 deletions
diff --git a/src/db/lib.rs b/src/db/lib.rs new file mode 100644 index 00000000..d96586be --- /dev/null +++ b/src/db/lib.rs @@ -0,0 +1,416 @@ +#[macro_use] +#[cfg(feature = "sqlite")] +extern crate tracing; + +#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] +compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); + +#[cfg(feature = "lmdb")] +pub mod lmdb_adapter; +#[cfg(feature = "sled")] +pub mod sled_adapter; +#[cfg(feature = "sqlite")] +pub mod sqlite_adapter; + +pub mod counted_tree_hack; + +#[cfg(test)] +pub mod test; + +use core::ops::{Bound, RangeBounds}; + +use std::borrow::Cow; +use std::cell::Cell; +use std::sync::Arc; + +use err_derive::Error; + +#[derive(Clone)] +pub struct Db(pub(crate) Arc<dyn IDb>); + +pub struct Transaction<'a>(&'a mut dyn ITx); + +#[derive(Clone)] +pub struct Tree(Arc<dyn IDb>, usize); + +pub type Value = Vec<u8>; +pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>; +pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>; + +// ---- + +#[derive(Debug, Error)] +#[error(display = "{}", _0)] +pub struct Error(pub Cow<'static, str>); + +pub type Result<T> = std::result::Result<T, Error>; + +#[derive(Debug, Error)] +#[error(display = "{}", _0)] +pub struct TxOpError(pub(crate) Error); +pub type TxOpResult<T> = std::result::Result<T, TxOpError>; + +pub enum TxError<E> { + Abort(E), + Db(Error), +} +pub type TxResult<R, E> = std::result::Result<R, TxError<E>>; + +impl<E> From<TxOpError> for TxError<E> { + fn from(e: TxOpError) -> TxError<E> { + TxError::Db(e.0) + } +} + +pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> { + match res { + Ok(v) => Ok(Ok(v)), + Err(TxError::Abort(e)) => Ok(Err(e)), + Err(TxError::Db(e)) => Err(TxOpError(e)), + } +} + +// ---- + +impl Db { + pub fn engine(&self) -> String { + self.0.engine() + } + + pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> { + let tree_id = self.0.open_tree(name.as_ref())?; + Ok(Tree(self.0.clone(), tree_id)) + } + + pub fn list_trees(&self) -> Result<Vec<String>> { + self.0.list_trees() + } + + pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> + where + F: Fn(Transaction<'_>) -> TxResult<R, E>, + { + let f = TxFn { + function: fun, + result: Cell::new(None), + }; + let tx_res = self.0.transaction(&f); + let ret = f + .result + .into_inner() + .expect("Transaction did not store result"); + + match tx_res { + Ok(()) => { + assert!(matches!(ret, Ok(_))); + ret + } + Err(TxError::Abort(())) => { + assert!(matches!(ret, Err(TxError::Abort(_)))); + ret + } + Err(TxError::Db(e2)) => match ret { + // Ok was stored -> the error occured when finalizing + // transaction + Ok(_) => Err(TxError::Db(e2)), + // An error was already stored: that's the one we want to + // return + Err(TxError::Db(e)) => Err(TxError::Db(e)), + _ => unreachable!(), + }, + } + } + + pub fn import(&self, other: &Db) -> Result<()> { + let existing_trees = self.list_trees()?; + if !existing_trees.is_empty() { + return Err(Error( + format!( + "destination database already contains data: {:?}", + existing_trees + ) + .into(), + )); + } + + let tree_names = other.list_trees()?; + for name in tree_names { + let tree = self.open_tree(&name)?; + if tree.len()? > 0 { + return Err(Error(format!("tree {} already contains data", name).into())); + } + + let ex_tree = other.open_tree(&name)?; + + let tx_res = self.transaction(|mut tx| { + let mut i = 0; + for item in ex_tree.iter().map_err(TxError::Abort)? { + let (k, v) = item.map_err(TxError::Abort)?; + tx.insert(&tree, k, v)?; + i += 1; + if i % 1000 == 0 { + println!("{}: imported {}", name, i); + } + } + tx.commit(i) + }); + let total = match tx_res { + Err(TxError::Db(e)) => return Err(e), + Err(TxError::Abort(e)) => return Err(e), + Ok(x) => x, + }; + + println!("{}: finished importing, {} items", name, total); + } + Ok(()) + } +} + +#[allow(clippy::len_without_is_empty)] +impl Tree { + #[inline] + pub fn db(&self) -> Db { + Db(self.0.clone()) + } + + #[inline] + pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> { + self.0.get(self.1, key.as_ref()) + } + #[inline] + pub fn len(&self) -> Result<usize> { + self.0.len(self.1) + } + + #[inline] + pub fn first(&self) -> Result<Option<(Value, Value)>> { + self.iter()?.next().transpose() + } + #[inline] + pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> { + self.range((Bound::Excluded(from), Bound::Unbounded))? + .next() + .transpose() + } + + /// Returns the old value if there was one + #[inline] + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>( + &self, + key: T, + value: U, + ) -> Result<Option<Value>> { + self.0.insert(self.1, key.as_ref(), value.as_ref()) + } + /// Returns the old value if there was one + #[inline] + pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> { + self.0.remove(self.1, key.as_ref()) + } + /// Clears all values from the tree + #[inline] + pub fn clear(&self) -> Result<()> { + self.0.clear(self.1) + } + + #[inline] + pub fn iter(&self) -> Result<ValueIter<'_>> { + self.0.iter(self.1) + } + #[inline] + pub fn iter_rev(&self) -> Result<ValueIter<'_>> { + self.0.iter_rev(self.1) + } + + #[inline] + pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range(self.1, get_bound(sb), get_bound(eb)) + } + #[inline] + pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range_rev(self.1, get_bound(sb), get_bound(eb)) + } +} + +#[allow(clippy::len_without_is_empty)] +impl<'a> Transaction<'a> { + #[inline] + pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { + self.0.get(tree.1, key.as_ref()) + } + #[inline] + pub fn len(&self, tree: &Tree) -> TxOpResult<usize> { + self.0.len(tree.1) + } + + /// Returns the old value if there was one + #[inline] + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>( + &mut self, + tree: &Tree, + key: T, + value: U, + ) -> TxOpResult<Option<Value>> { + self.0.insert(tree.1, key.as_ref(), value.as_ref()) + } + /// Returns the old value if there was one + #[inline] + pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { + self.0.remove(tree.1, key.as_ref()) + } + + #[inline] + pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { + self.0.iter(tree.1) + } + #[inline] + pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { + self.0.iter_rev(tree.1) + } + + #[inline] + pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range(tree.1, get_bound(sb), get_bound(eb)) + } + #[inline] + pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>> + where + K: AsRef<[u8]>, + R: RangeBounds<K>, + { + let sb = range.start_bound(); + let eb = range.end_bound(); + self.0.range_rev(tree.1, get_bound(sb), get_bound(eb)) + } + + // ---- + + #[inline] + pub fn abort<R, E>(self, e: E) -> TxResult<R, E> { + Err(TxError::Abort(e)) + } + + #[inline] + pub fn commit<R, E>(self, r: R) -> TxResult<R, E> { + Ok(r) + } +} + +// ---- Internal interfaces + +pub(crate) trait IDb: Send + Sync { + fn engine(&self) -> String; + fn open_tree(&self, name: &str) -> Result<usize>; + fn list_trees(&self) -> Result<Vec<String>>; + + fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; + fn len(&self, tree: usize) -> Result<usize>; + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; + fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; + fn clear(&self, tree: usize) -> Result<()>; + + fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; + fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>; + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result<ValueIter<'_>>; + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result<ValueIter<'_>>; + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; +} + +pub(crate) trait ITx { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>; + fn len(&self, tree: usize) -> TxOpResult<usize>; + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>; + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>; + + fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>; + fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>; + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult<TxValueIter<'_>>; + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult<TxValueIter<'_>>; +} + +pub(crate) trait ITxFn { + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult; +} + +pub(crate) enum TxFnResult { + Ok, + Abort, + DbErr, +} + +struct TxFn<F, R, E> +where + F: Fn(Transaction<'_>) -> TxResult<R, E>, +{ + function: F, + result: Cell<Option<TxResult<R, E>>>, +} + +impl<F, R, E> ITxFn for TxFn<F, R, E> +where + F: Fn(Transaction<'_>) -> TxResult<R, E>, +{ + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { + let res = (self.function)(Transaction(tx)); + let res2 = match &res { + Ok(_) => TxFnResult::Ok, + Err(TxError::Abort(_)) => TxFnResult::Abort, + Err(TxError::Db(_)) => TxFnResult::DbErr, + }; + self.result.set(Some(res)); + res2 + } +} + +// ---- + +fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> { + match b { + Bound::Included(v) => Bound::Included(v.as_ref()), + Bound::Excluded(v) => Bound::Excluded(v.as_ref()), + Bound::Unbounded => Bound::Unbounded, + } +} |