aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock29
-rw-r--r--Cargo.toml1
-rw-r--r--src/db/Cargo.toml23
-rw-r--r--src/db/lib.rs174
-rw-r--r--src/db/sled_adapter.rs132
-rw-r--r--src/db/test.rs49
-rw-r--r--src/table/Cargo.toml1
7 files changed, 409 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 630642ff..1f81ebec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -988,6 +988,16 @@ dependencies = [
]
[[package]]
+name = "garage_db"
+version = "0.8.0"
+dependencies = [
+ "arc-swap",
+ "err-derive 0.3.1",
+ "mktemp",
+ "sled",
+]
+
+[[package]]
name = "garage_model"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1130,6 +1140,7 @@ dependencies = [
"bytes 1.1.0",
"futures",
"futures-util",
+ "garage_db",
"garage_rpc 0.7.0",
"garage_util 0.7.0",
"hexdump",
@@ -1856,6 +1867,15 @@ dependencies = [
]
[[package]]
+name = "mktemp"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "975de676448231fcde04b9149d2543077e166b78fc29eae5aa219e7928410da2"
+dependencies = [
+ "uuid",
+]
+
+[[package]]
name = "multer"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3490,6 +3510,15 @@ dependencies = [
]
[[package]]
+name = "uuid"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
+dependencies = [
+ "getrandom",
+]
+
+[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index edd0e3f9..122285db 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,5 +1,6 @@
[workspace]
members = [
+ "src/db",
"src/util",
"src/rpc",
"src/table",
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
new file mode 100644
index 00000000..025016d5
--- /dev/null
+++ b/src/db/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "garage_db"
+version = "0.8.0"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Abstraction over multiple key/value storage engines that supports transactions"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+err-derive = "0.3"
+arc-swap = "1.0"
+
+sled = "0.34"
+
+[dev-dependencies]
+mktemp = "0.4"
diff --git a/src/db/lib.rs b/src/db/lib.rs
new file mode 100644
index 00000000..0f23a9b4
--- /dev/null
+++ b/src/db/lib.rs
@@ -0,0 +1,174 @@
+pub mod sled_adapter;
+
+#[cfg(test)]
+pub mod test;
+
+use std::borrow::Cow;
+use std::sync::Arc;
+
+use arc_swap::ArcSwapOption;
+
+#[derive(Clone)]
+pub struct Db(pub(crate) Arc<dyn IDb>);
+
+#[derive(Clone)]
+pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
+
+#[derive(Clone)]
+pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
+
+pub type Value<'a> = Cow<'a, [u8]>;
+
+// ----
+
+#[derive(Debug)]
+pub struct Error(Cow<'static, str>);
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug)]
+pub enum TxError<E> {
+ Abort(E),
+ Db(Error),
+}
+pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
+
+impl<E> From<Error> for TxError<E> {
+ fn from(e: Error) -> TxError<E> {
+ TxError::Db(e)
+ }
+}
+
+// ----
+
+impl Db {
+ pub fn tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
+ let tree_id = self.0.tree(name.as_ref())?;
+ Ok(Tree(self.0.clone(), tree_id))
+ }
+
+ pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
+ where
+ F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
+ R: Send + Sync,
+ E: Send + Sync,
+ {
+ let f = TxFn {
+ function: fun,
+ result: ArcSwapOption::new(None),
+ };
+ match self.0.transaction(&f) {
+ Err(TxError::Db(e)) => Err(TxError::Db(e)),
+ Err(TxError::Abort(())) => {
+ let r_arc = f
+ .result
+ .into_inner()
+ .expect("Transaction did not store result");
+ let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
+ assert!(matches!(r, Err(TxError::Abort(_))));
+ r
+ }
+ Ok(()) => {
+ let r_arc = f
+ .result
+ .into_inner()
+ .expect("Transaction did not store result");
+ let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
+ assert!(matches!(r, Ok(_)));
+ r
+ }
+ }
+ }
+}
+
+impl Tree {
+ pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> {
+ self.0.get(self.1, key.as_ref())
+ }
+
+ pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
+ self.0.put(self.1, key.as_ref(), value.as_ref())
+ }
+}
+
+impl<'a> Transaction<'a> {
+ pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> {
+ self.0.get(tree.1, key.as_ref())
+ }
+
+ pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> {
+ self.0.put(tree.1, key.as_ref(), value.as_ref())
+ }
+
+ #[must_use]
+ pub fn abort<R, E>(self, e: E) -> TxResult<R, E>
+ where
+ R: Send + Sync,
+ E: Send + Sync,
+ {
+ Err(TxError::Abort(e))
+ }
+
+ #[must_use]
+ pub fn commit<R, E>(self, r: R) -> TxResult<R, E>
+ where
+ R: Send + Sync,
+ E: Send + Sync,
+ {
+ Ok(r)
+ }
+}
+
+// ---- Internal interfaces
+
+pub(crate) trait IDb: Send + Sync {
+ fn tree(&self, name: &str) -> Result<usize>;
+
+ fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
+ fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+}
+
+pub(crate) trait ITx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
+ fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
+}
+
+pub(crate) trait ITxFn: Send + Sync {
+ fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult;
+}
+
+enum TxFnResult {
+ Abort,
+ Ok,
+ Err,
+}
+
+struct TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
+ R: Send + Sync,
+ E: Send + Sync,
+{
+ function: F,
+ result: ArcSwapOption<TxResult<R, E>>,
+}
+
+impl<F, R, E> ITxFn for TxFn<F, R, E>
+where
+ F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
+ R: Send + Sync,
+ E: Send + Sync,
+{
+ fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult {
+ let res = (self.function)(Transaction(tx));
+ let retval = match &res {
+ Ok(_) => TxFnResult::Ok,
+ Err(TxError::Abort(_)) => TxFnResult::Abort,
+ Err(TxError::Db(_)) => TxFnResult::Err,
+ };
+ self.result.store(Some(Arc::new(res)));
+ retval
+ }
+}
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
new file mode 100644
index 00000000..617b4844
--- /dev/null
+++ b/src/db/sled_adapter.rs
@@ -0,0 +1,132 @@
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use arc_swap::ArcSwapOption;
+
+use sled::transaction::{
+ ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, UnabortableTransactionError
+};
+
+use crate::{Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, Db};
+
+impl From<sled::Error> for Error {
+ fn from(e: sled::Error) -> Error {
+ Error(format!("{}", e).into())
+ }
+}
+
+pub struct SledDb {
+ db: sled::Db,
+ trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>,
+}
+
+impl SledDb {
+ pub fn new(db: sled::Db) -> Db {
+ let s = Self {
+ db,
+ trees: RwLock::new((Vec::new(), HashMap::new())),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<sled::Tree> {
+ self.trees
+ .read()
+ .unwrap()
+ .0
+ .get(i)
+ .cloned()
+ .ok_or(Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for SledDb {
+ fn tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.1.get(name) {
+ Ok(*i)
+ } else {
+ let tree = self.db.open_tree(name)?;
+ let i = trees.0.len();
+ trees.0.push(tree);
+ trees.1.insert(name.to_string(), i);
+ Ok(i)
+ }
+ }
+
+ fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+ let tree = self.get_tree(tree)?;
+ Ok(tree.get(key)?.map(|v| v.to_vec().into()))
+ }
+ fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ tree.insert(key, value)?;
+ Ok(())
+ }
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let res = trees.0.transaction(|txtrees| {
+ let tx = SledTx {
+ trees: txtrees,
+ err: ArcSwapOption::new(None),
+ };
+ match f.try_on(&tx) {
+ TxFnResult::Ok => {
+ assert!(tx.err.into_inner().is_none());
+ Ok(())
+ }
+ TxFnResult::Abort => Err(ConflictableTransactionError::Abort(())),
+ TxFnResult::Err => {
+ let err_arc = tx
+ .err
+ .into_inner()
+ .expect("Transaction did not store error");
+ let err = Arc::try_unwrap(err_arc).ok().expect("Many refs");
+ Err(err.into())
+ }
+ }
+ });
+ match res {
+ Ok(()) => Ok(()),
+ Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
+ Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
+ }
+ }
+}
+
+// ----
+
+struct SledTx<'a> {
+ trees: &'a [TransactionalTree],
+ err: ArcSwapOption<UnabortableTransactionError>,
+}
+
+impl<'a> SledTx<'a> {
+ fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> {
+ match v {
+ Ok(x) => Ok(x),
+ Err(e) => {
+ let txt = format!("{}", e);
+ self.err.store(Some(Arc::new(e)));
+ Err(Error(txt.into()))
+ }
+ }
+ }
+}
+
+impl<'a> ITx<'a> for SledTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+ let tree = self.trees.get(tree)
+ .ok_or(Error("invalid tree id".into()))?;
+ let tmp = self.save_error(tree.get(key))?;
+ Ok(tmp.map(|v| v.to_vec().into()))
+ }
+
+ fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.trees.get(tree)
+ .ok_or(Error("invalid tree id".into()))?;
+ self.save_error(tree.insert(key, value))?;
+ Ok(())
+ }
+}
diff --git a/src/db/test.rs b/src/db/test.rs
new file mode 100644
index 00000000..f0e6c5de
--- /dev/null
+++ b/src/db/test.rs
@@ -0,0 +1,49 @@
+use crate::*;
+
+use crate::sled_adapter::SledDb;
+
+fn test_suite(db: Db) -> Result<()> {
+ let tree = db.tree("tree")?;
+
+ let va: &[u8] = &b"plop"[..];
+ let vb: &[u8] = &b"plip"[..];
+ let vc: &[u8] = &b"plup"[..];
+
+ tree.put(b"test", va)?;
+ assert_eq!(tree.get(b"test")?, Some(va.into()));
+
+ let res = db.transaction::<_, (), _>(|tx| {
+ assert_eq!(tx.get(&tree, b"test")?, Some(va.into()));
+
+ tx.put(&tree, b"test", vb)?;
+
+ assert_eq!(tx.get(&tree, b"test")?, Some(vb.into()));
+
+ tx.commit(12)
+ });
+ assert!(matches!(res, Ok(12)));
+ assert_eq!(tree.get(b"test")?, Some(vb.into()));
+
+ let res = db.transaction::<(), _, _>(|tx| {
+ assert_eq!(tx.get(&tree, b"test")?, Some(vb.into()));
+
+ tx.put(&tree, b"test", vc)?;
+
+ assert_eq!(tx.get(&tree, b"test")?, Some(vc.into()));
+
+ tx.abort(42)
+ });
+ assert!(matches!(res, Err(TxError::Abort(42))));
+ assert_eq!(tree.get(b"test")?, Some(vb.into()));
+
+ Ok(())
+}
+
+#[test]
+fn test_sled_db() -> Result<()> {
+ let path = mktemp::Temp::new_dir().unwrap();
+ let db = SledDb::new(sled::open(path.to_path_buf()).unwrap());
+ test_suite(db)?;
+ drop(path);
+ Ok(())
+}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ed1a213f..6ae50366 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }