diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/db/Cargo.toml | 23 | ||||
-rw-r--r-- | src/db/lib.rs | 174 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 132 | ||||
-rw-r--r-- | src/db/test.rs | 49 | ||||
-rw-r--r-- | src/table/Cargo.toml | 1 |
5 files changed, 379 insertions, 0 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml new file mode 100644 index 00000000..025016d5 --- /dev/null +++ b/src/db/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "garage_db" +version = "0.8.0" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Abstraction over multiple key/value storage engines that supports transactions" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +err-derive = "0.3" +arc-swap = "1.0" + +sled = "0.34" + +[dev-dependencies] +mktemp = "0.4" diff --git a/src/db/lib.rs b/src/db/lib.rs new file mode 100644 index 00000000..0f23a9b4 --- /dev/null +++ b/src/db/lib.rs @@ -0,0 +1,174 @@ +pub mod sled_adapter; + +#[cfg(test)] +pub mod test; + +use std::borrow::Cow; +use std::sync::Arc; + +use arc_swap::ArcSwapOption; + +#[derive(Clone)] +pub struct Db(pub(crate) Arc<dyn IDb>); + +#[derive(Clone)] +pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); + +#[derive(Clone)] +pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize); + +pub type Value<'a> = Cow<'a, [u8]>; + +// ---- + +#[derive(Debug)] +pub struct Error(Cow<'static, str>); + +pub type Result<T> = std::result::Result<T, Error>; + +#[derive(Debug)] +pub enum TxError<E> { + Abort(E), + Db(Error), +} +pub type TxResult<R, E> = std::result::Result<R, TxError<E>>; + +impl<E> From<Error> for TxError<E> { + fn from(e: Error) -> TxError<E> { + TxError::Db(e) + } +} + +// ---- + +impl Db { + pub fn tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> { + let tree_id = self.0.tree(name.as_ref())?; + Ok(Tree(self.0.clone(), tree_id)) + } + + pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> + where + F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + R: Send + Sync, + E: Send + Sync, + { + let f = TxFn { + function: fun, + result: ArcSwapOption::new(None), + }; + match self.0.transaction(&f) { + Err(TxError::Db(e)) => Err(TxError::Db(e)), + Err(TxError::Abort(())) => { + let r_arc = f + .result + .into_inner() + .expect("Transaction did not store result"); + let r = Arc::try_unwrap(r_arc).ok().expect("Many refs"); + assert!(matches!(r, Err(TxError::Abort(_)))); + r + } + Ok(()) => { + let r_arc = f + .result + .into_inner() + .expect("Transaction did not store result"); + let r = Arc::try_unwrap(r_arc).ok().expect("Many refs"); + assert!(matches!(r, Ok(_))); + r + } + } + } +} + +impl Tree { + pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> { + self.0.get(self.1, key.as_ref()) + } + + pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { + self.0.put(self.1, key.as_ref(), value.as_ref()) + } +} + +impl<'a> Transaction<'a> { + pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> { + self.0.get(tree.1, key.as_ref()) + } + + pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> { + self.0.put(tree.1, key.as_ref(), value.as_ref()) + } + + #[must_use] + pub fn abort<R, E>(self, e: E) -> TxResult<R, E> + where + R: Send + Sync, + E: Send + Sync, + { + Err(TxError::Abort(e)) + } + + #[must_use] + pub fn commit<R, E>(self, r: R) -> TxResult<R, E> + where + R: Send + Sync, + E: Send + Sync, + { + Ok(r) + } +} + +// ---- Internal interfaces + +pub(crate) trait IDb: Send + Sync { + fn tree(&self, name: &str) -> Result<usize>; + + fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; +} + +pub(crate) trait ITx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; +} + +pub(crate) trait ITxFn: Send + Sync { + fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; +} + +enum TxFnResult { + Abort, + Ok, + Err, +} + +struct TxFn<F, R, E> +where + F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + R: Send + Sync, + E: Send + Sync, +{ + function: F, + result: ArcSwapOption<TxResult<R, E>>, +} + +impl<F, R, E> ITxFn for TxFn<F, R, E> +where + F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, + R: Send + Sync, + E: Send + Sync, +{ + fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult { + let res = (self.function)(Transaction(tx)); + let retval = match &res { + Ok(_) => TxFnResult::Ok, + Err(TxError::Abort(_)) => TxFnResult::Abort, + Err(TxError::Db(_)) => TxFnResult::Err, + }; + self.result.store(Some(Arc::new(res))); + retval + } +} diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs new file mode 100644 index 00000000..617b4844 --- /dev/null +++ b/src/db/sled_adapter.rs @@ -0,0 +1,132 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use arc_swap::ArcSwapOption; + +use sled::transaction::{ + ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, UnabortableTransactionError +}; + +use crate::{Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, Db}; + +impl From<sled::Error> for Error { + fn from(e: sled::Error) -> Error { + Error(format!("{}", e).into()) + } +} + +pub struct SledDb { + db: sled::Db, + trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>, +} + +impl SledDb { + pub fn new(db: sled::Db) -> Db { + let s = Self { + db, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result<sled::Tree> { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or(Error("invalid tree id".into())) + } +} + +impl IDb for SledDb { + fn tree(&self, name: &str) -> Result<usize> { + let mut trees = self.trees.write().unwrap(); + if let Some(i) = trees.1.get(name) { + Ok(*i) + } else { + let tree = self.db.open_tree(name)?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(name.to_string(), i); + Ok(i) + } + } + + fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> { + let tree = self.get_tree(tree)?; + Ok(tree.get(key)?.map(|v| v.to_vec().into())) + } + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.get_tree(tree)?; + tree.insert(key, value)?; + Ok(()) + } + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + let trees = self.trees.read().unwrap(); + let res = trees.0.transaction(|txtrees| { + let tx = SledTx { + trees: txtrees, + err: ArcSwapOption::new(None), + }; + match f.try_on(&tx) { + TxFnResult::Ok => { + assert!(tx.err.into_inner().is_none()); + Ok(()) + } + TxFnResult::Abort => Err(ConflictableTransactionError::Abort(())), + TxFnResult::Err => { + let err_arc = tx + .err + .into_inner() + .expect("Transaction did not store error"); + let err = Arc::try_unwrap(err_arc).ok().expect("Many refs"); + Err(err.into()) + } + } + }); + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(())) => Err(TxError::Abort(())), + Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), + } + } +} + +// ---- + +struct SledTx<'a> { + trees: &'a [TransactionalTree], + err: ArcSwapOption<UnabortableTransactionError>, +} + +impl<'a> SledTx<'a> { + fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> { + match v { + Ok(x) => Ok(x), + Err(e) => { + let txt = format!("{}", e); + self.err.store(Some(Arc::new(e))); + Err(Error(txt.into())) + } + } + } +} + +impl<'a> ITx<'a> for SledTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> { + let tree = self.trees.get(tree) + .ok_or(Error("invalid tree id".into()))?; + let tmp = self.save_error(tree.get(key))?; + Ok(tmp.map(|v| v.to_vec().into())) + } + + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.trees.get(tree) + .ok_or(Error("invalid tree id".into()))?; + self.save_error(tree.insert(key, value))?; + Ok(()) + } +} diff --git a/src/db/test.rs b/src/db/test.rs new file mode 100644 index 00000000..f0e6c5de --- /dev/null +++ b/src/db/test.rs @@ -0,0 +1,49 @@ +use crate::*; + +use crate::sled_adapter::SledDb; + +fn test_suite(db: Db) -> Result<()> { + let tree = db.tree("tree")?; + + let va: &[u8] = &b"plop"[..]; + let vb: &[u8] = &b"plip"[..]; + let vc: &[u8] = &b"plup"[..]; + + tree.put(b"test", va)?; + assert_eq!(tree.get(b"test")?, Some(va.into())); + + let res = db.transaction::<_, (), _>(|tx| { + assert_eq!(tx.get(&tree, b"test")?, Some(va.into())); + + tx.put(&tree, b"test", vb)?; + + assert_eq!(tx.get(&tree, b"test")?, Some(vb.into())); + + tx.commit(12) + }); + assert!(matches!(res, Ok(12))); + assert_eq!(tree.get(b"test")?, Some(vb.into())); + + let res = db.transaction::<(), _, _>(|tx| { + assert_eq!(tx.get(&tree, b"test")?, Some(vb.into())); + + tx.put(&tree, b"test", vc)?; + + assert_eq!(tx.get(&tree, b"test")?, Some(vc.into())); + + tx.abort(42) + }); + assert!(matches!(res, Err(TxError::Abort(42)))); + assert_eq!(tree.get(b"test")?, Some(vb.into())); + + Ok(()) +} + +#[test] +fn test_sled_db() -> Result<()> { + let path = mktemp::Temp::new_dir().unwrap(); + let db = SledDb::new(sled::open(path.to_path_buf()).unwrap()); + test_suite(db)?; + drop(path); + Ok(()) +} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ed1a213f..6ae50366 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" } |