aboutsummaryrefslogtreecommitdiff
path: root/src/db
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-02 19:58:47 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-02 19:58:47 +0200
commit43704afb291e955c043686dde6b801d4d1339231 (patch)
tree3bcb41f3d373c28ff717d151039760b15857164a /src/db
parentf29b91232fbacc0c552fbbec52f5b2cf20cdcf8d (diff)
downloadgarage-43704afb291e955c043686dde6b801d4d1339231.tar.gz
garage-43704afb291e955c043686dde6b801d4d1339231.zip
Begin sqlite adapter
Diffstat (limited to 'src/db')
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/db/lib.rs72
-rw-r--r--src/db/sled_adapter.rs105
-rw-r--r--src/db/sqlite_adapter.rs265
-rw-r--r--src/db/test.rs52
5 files changed, 394 insertions, 102 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index ae6cb45f..34d483ec 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -15,8 +15,10 @@ path = "lib.rs"
[dependencies]
err-derive = "0.3"
+ouroboros = "0.15"
sled = "0.34"
+rusqlite = "0.27"
[dev-dependencies]
mktemp = "0.4"
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 2b9b6bd7..6de296af 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -1,4 +1,5 @@
pub mod sled_adapter;
+pub mod sqlite_adapter;
#[cfg(test)]
pub mod test;
@@ -22,10 +23,10 @@ pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
pub type Value<'a> = Cow<'a, [u8]>;
pub type ValueIter<'a> =
- Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + Sync + 'a>;
+ Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>;
pub type Exporter<'a> =
- Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + Send + Sync + 'a>;
+ Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>;
// ----
@@ -64,26 +65,40 @@ impl Db {
function: fun,
result: Cell::new(None),
};
- match self.0.transaction(&f) {
- Err(TxError::Db(e)) => Err(TxError::Db(e)),
- Err(TxError::Abort(())) => {
- let r = f
- .result
- .into_inner()
- .expect("Transaction did not store result");
- assert!(matches!(r, Err(TxError::Abort(_))));
- r
- }
+ let tx_res = self.0.transaction(&f);
+ let ret = f
+ .result
+ .into_inner()
+ .expect("Transaction did not store result");
+
+ match tx_res {
Ok(()) => {
- let r = f
- .result
- .into_inner()
- .expect("Transaction did not store result");
- assert!(matches!(r, Ok(_)));
- r
+ assert!(matches!(ret, Ok(_)));
+ ret
}
+ Err(TxError::Abort(())) => {
+ assert!(matches!(ret, Err(TxError::Abort(_))));
+ ret
+ }
+ Err(TxError::Db(e2)) => match ret {
+ // Ok was stored -> the error occured when finalizing
+ // transaction
+ Ok(_) => Err(TxError::Db(e2)),
+ // An error was already stored: that's the one we want to
+ // return
+ Err(TxError::Db(e)) => Err(TxError::Db(e)),
+ _ => unreachable!(),
+ },
}
}
+
+ pub fn export<'a>(&'a self) -> Result<Exporter<'a>> {
+ self.0.export()
+ }
+
+ pub fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
+ self.0.import(ex)
+ }
}
impl Tree {
@@ -181,14 +196,12 @@ impl<'a> Transaction<'a> {
// ----
#[must_use]
- pub fn abort<R, E>(self, e: E) -> TxResult<R, E>
- {
+ pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
Err(TxError::Abort(e))
}
#[must_use]
- pub fn commit<R, E>(self, r: R) -> TxResult<R, E>
- {
+ pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
Ok(r)
}
}
@@ -221,6 +234,9 @@ pub(crate) trait IDb: Send + Sync {
) -> Result<ValueIter<'a>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
+
+ fn export<'a>(&'a self) -> Result<Exporter<'a>>;
+ fn import<'a>(&self, ex: Exporter<'a>) -> Result<()>;
}
pub(crate) trait ITx<'a> {
@@ -251,10 +267,10 @@ pub(crate) trait ITxFn {
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult;
}
-enum TxFnResult {
- Abort,
+pub(crate) enum TxFnResult {
Ok,
- Err,
+ Abort,
+ DbErr,
}
struct TxFn<F, R, E>
@@ -271,13 +287,13 @@ where
{
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult {
let res = (self.function)(Transaction(tx));
- let retval = match &res {
+ let res2 = match &res {
Ok(_) => TxFnResult::Ok,
Err(TxError::Abort(_)) => TxFnResult::Abort,
- Err(TxError::Db(_)) => TxFnResult::Err,
+ Err(TxError::Db(_)) => TxFnResult::DbErr,
};
self.result.set(Some(res));
- retval
+ res2
}
}
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 3942317c..c296708b 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -44,50 +44,6 @@ impl SledDb {
.cloned()
.ok_or(Error("invalid tree id".into()))
}
-
- pub fn export<'a>(&'a self) -> Result<Exporter<'a>> {
- let mut trees = vec![];
- for name in self.db.tree_names() {
- let name = std::str::from_utf8(&name)
- .map_err(|e| Error(format!("{}", e).into()))?
- .to_string();
- let tree = self.open_tree(&name)?;
- let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
- trees.push((name, tree));
- }
- let trees_exporter: Exporter<'a> = Box::new(trees.into_iter().map(|(name, tree)| {
- let iter: ValueIter<'a> = Box::new(tree.iter().map(|v| {
- v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
- .map_err(Into::into)
- }));
- Ok((name.to_string(), iter))
- }));
- Ok(trees_exporter)
- }
-
- pub fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
- for ex_tree in ex {
- let (name, data) = ex_tree?;
-
- let tree = self.open_tree(&name)?;
- let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
- if !tree.is_empty() {
- return Err(Error(format!("tree {} already contains data", name).into()));
- }
-
- let mut i = 0;
- for item in data {
- let (k, v) = item?;
- tree.insert(k.as_ref(), v.as_ref())?;
- i += 1;
- if i % 1000 == 0 {
- println!("{}: imported {}", name, i);
- }
- }
- println!("{}: finished importing, {} items", name, i);
- }
- Ok(())
- }
}
impl IDb for SledDb {
@@ -104,6 +60,8 @@ impl IDb for SledDb {
}
}
+ // ----
+
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
let tree = self.get_tree(tree)?;
Ok(tree.get(key)?.map(|v| v.to_vec().into()))
@@ -168,6 +126,8 @@ impl IDb for SledDb {
)))
}
+ // ----
+
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| {
@@ -184,12 +144,9 @@ impl IDb for SledDb {
assert!(tx.err.into_inner().is_none());
Err(ConflictableTransactionError::Abort(()))
}
- TxFnResult::Err => {
- let err = tx
- .err
- .into_inner()
- .expect("Transaction did not store error");
- Err(err.into())
+ TxFnResult::DbErr => {
+ let e = tx.err.into_inner().expect("No DB error");
+ Err(e.into())
}
}
});
@@ -199,6 +156,54 @@ impl IDb for SledDb {
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
}
}
+
+ // ----
+
+ fn export<'a>(&'a self) -> Result<Exporter<'a>> {
+ let mut trees = vec![];
+ for name in self.db.tree_names() {
+ let name = std::str::from_utf8(&name)
+ .map_err(|e| Error(format!("{}", e).into()))?
+ .to_string();
+ let tree = self.open_tree(&name)?;
+ let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
+ trees.push((name, tree));
+ }
+ let trees_exporter: Exporter<'a> = Box::new(trees.into_iter().map(|(name, tree)| {
+ let iter: ValueIter<'a> = Box::new(tree.iter().map(|v| {
+ v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
+ .map_err(Into::into)
+ }));
+ Ok((name.to_string(), iter))
+ }));
+ Ok(trees_exporter)
+ }
+
+ fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
+ for ex_tree in ex {
+ let (name, data) = ex_tree?;
+
+ let tree = self.open_tree(&name)?;
+ let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
+ if !tree.is_empty() {
+ return Err(Error(format!("tree {} already contains data", name).into()));
+ }
+
+ let mut i = 0;
+ for item in data {
+ let (k, v) = item?;
+ tree.insert(k.as_ref(), v.as_ref())?;
+ i += 1;
+ if i % 1000 == 0 {
+ println!("{}: imported {}", name, i);
+ }
+ }
+ println!("{}: finished importing, {} items", name, i);
+ }
+ Ok(())
+ }
+
+ // ----
}
// ----
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
new file mode 100644
index 00000000..320684df
--- /dev/null
+++ b/src/db/sqlite_adapter.rs
@@ -0,0 +1,265 @@
+use core::ops::Bound;
+
+use std::cell::Cell;
+use std::sync::{Arc, Mutex, RwLock, MutexGuard};
+
+use ouroboros::self_referencing;
+
+use rusqlite::{params, Connection, Transaction};
+
+use crate::{
+ Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
+};
+
+pub use rusqlite;
+
+impl From<rusqlite::Error> for Error {
+ fn from(e: rusqlite::Error) -> Error {
+ Error(format!("{}", e).into())
+ }
+}
+
+impl<T> From<rusqlite::Error> for TxError<T> {
+ fn from(e: rusqlite::Error) -> TxError<T> {
+ TxError::Db(e.into())
+ }
+}
+
+pub struct SqliteDb {
+ db: Mutex<Connection>,
+ trees: RwLock<Vec<String>>,
+}
+
+impl SqliteDb {
+ pub fn new(db: rusqlite::Connection) -> Db {
+ let s = Self {
+ db: Mutex::new(db),
+ trees: RwLock::new(Vec::new()),
+ };
+ Db(Arc::new(s))
+ }
+
+ fn get_tree(&self, i: usize) -> Result<String> {
+ self.trees
+ .read()
+ .unwrap()
+ .get(i)
+ .cloned()
+ .ok_or(Error("invalid tree id".into()))
+ }
+}
+
+impl IDb for SqliteDb {
+ fn open_tree(&self, name: &str) -> Result<usize> {
+ let mut trees = self.trees.write().unwrap();
+ if let Some(i) = trees.iter().position(|x| x == name) {
+ Ok(i)
+ } else {
+ self.db.lock().unwrap().execute(
+ &format!(
+ "CREATE TABLE IF NOT EXISTS {} (
+ k BLOB PRIMARY KEY,
+ v BLOB
+ )",
+ name
+ ),
+ [],
+ )?;
+ let i = trees.len();
+ trees.push(name.to_string());
+ Ok(i)
+ }
+ }
+
+ // ----
+
+ fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+ let tree = self.get_tree(tree)?;
+ let db = self.db.lock().unwrap();
+ let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?.into())),
+ }
+ }
+
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
+ let tree = self.get_tree(tree)?;
+ let db = self.db.lock().unwrap();
+ let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ Ok(res > 0)
+ }
+
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ let db = self.db.lock().unwrap();
+ let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?.into()),
+ }
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ let db = self.db.lock().unwrap();
+ db.execute(
+ &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
+ params![key, value],
+ )?;
+ Ok(())
+ }
+
+ fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
+ let tree = self.get_tree(tree)?;
+ let db = self.db.lock().unwrap();
+ let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
+ let mut stmt = db.prepare(&sql)?;
+ let res = stmt.query([])?;
+ unimplemented!();
+ }
+
+ fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
+ let tree = self.get_tree(tree)?;
+ unimplemented!();
+ }
+
+ fn range<'a, 'r>(
+ &'a self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'a>> {
+ let tree = self.get_tree(tree)?;
+ unimplemented!();
+ }
+ fn range_rev<'a, 'r>(
+ &'a self,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'a>> {
+ let tree = self.get_tree(tree)?;
+ unimplemented!();
+ }
+
+ // ----
+
+ fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
+ let trees = self.trees.read().unwrap();
+ let mut db = self.db.lock().unwrap();
+ let tx = SqliteTx {
+ tx: db.transaction()?,
+ trees: trees.as_ref(),
+ };
+ match f.try_on(&tx) {
+ TxFnResult::Ok => {
+ tx.tx.commit()?;
+ Ok(())
+ }
+ TxFnResult::Abort => {
+ tx.tx.rollback()?;
+ Err(TxError::Abort(()))
+ }
+ TxFnResult::DbErr => {
+ tx.tx.rollback()?;
+ Err(TxError::Db(Error(
+ "(this message will be discarded)".into(),
+ )))
+ }
+ }
+ }
+
+ // ----
+
+ fn export<'a>(&'a self) -> Result<Exporter<'a>> {
+ unimplemented!()
+ }
+
+ fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
+ unimplemented!()
+ }
+
+ // ----
+}
+
+// ----
+
+struct SqliteTx<'a> {
+ tx: Transaction<'a>,
+ trees: &'a [String],
+}
+
+impl<'a> SqliteTx<'a> {
+ fn get_tree(&self, i: usize) -> Result<String> {
+ self.trees.get(i).cloned().ok_or(Error(
+ "invalid tree id (it might have been openned after the transaction started)".into(),
+ ))
+ }
+}
+
+impl<'a> ITx<'a> for SqliteTx<'a> {
+ fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
+ let tree = self.get_tree(tree)?;
+ let mut stmt = self
+ .tx
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let mut res_iter = stmt.query([key])?;
+ match res_iter.next()? {
+ None => Ok(None),
+ Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?.into())),
+ }
+ }
+ fn len(&self, tree: usize) -> Result<usize> {
+ let tree = self.get_tree(tree)?;
+ let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut res_iter = stmt.query([])?;
+ match res_iter.next()? {
+ None => Ok(0),
+ Some(v) => Ok(v.get::<_, usize>(0)?.into()),
+ }
+ }
+
+ fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ self.tx.execute(
+ &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
+ params![key, value],
+ )?;
+ Ok(())
+ }
+ fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
+ let tree = self.get_tree(tree)?;
+ let res = self
+ .tx
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ Ok(res > 0)
+ }
+
+ fn iter(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ unimplemented!();
+ }
+ fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'a>> {
+ unimplemented!();
+ }
+
+ fn range<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'a>> {
+ unimplemented!();
+ }
+ fn range_rev<'r>(
+ &self,
+ _tree: usize,
+ _low: Bound<&'r [u8]>,
+ _high: Bound<&'r [u8]>,
+ ) -> Result<ValueIter<'a>> {
+ unimplemented!();
+ }
+}
+
diff --git a/src/db/test.rs b/src/db/test.rs
index 69e1d12c..20ebd54b 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -1,9 +1,10 @@
use crate::*;
use crate::sled_adapter::SledDb;
+use crate::sqlite_adapter::SqliteDb;
-fn test_suite(db: Db) -> Result<()> {
- let tree = db.open_tree("tree")?;
+fn test_suite(db: Db) {
+ let tree = db.open_tree("tree").unwrap();
let ka: &[u8] = &b"test"[..];
let kb: &[u8] = &b"zwello"[..];
@@ -12,66 +13,69 @@ fn test_suite(db: Db) -> Result<()> {
let vb: &[u8] = &b"plip"[..];
let vc: &[u8] = &b"plup"[..];
- tree.insert(ka, va)?;
- assert_eq!(tree.get(ka)?, Some(va.into()));
+ tree.insert(ka, va).unwrap();
+ assert_eq!(tree.get(ka).unwrap(), Some(va.into()));
let res = db.transaction::<_, (), _>(|tx| {
- assert_eq!(tx.get(&tree, ka)?, Some(va.into()));
+ assert_eq!(tx.get(&tree, ka).unwrap(), Some(va.into()));
- tx.insert(&tree, ka, vb)?;
+ tx.insert(&tree, ka, vb).unwrap();
- assert_eq!(tx.get(&tree, ka)?, Some(vb.into()));
+ assert_eq!(tx.get(&tree, ka).unwrap(), Some(vb.into()));
tx.commit(12)
});
assert!(matches!(res, Ok(12)));
- assert_eq!(tree.get(ka)?, Some(vb.into()));
+ assert_eq!(tree.get(ka).unwrap(), Some(vb.into()));
let res = db.transaction::<(), _, _>(|tx| {
- assert_eq!(tx.get(&tree, ka)?, Some(vb.into()));
+ assert_eq!(tx.get(&tree, ka).unwrap(), Some(vb.into()));
- tx.insert(&tree, ka, vc)?;
+ tx.insert(&tree, ka, vc).unwrap();
- assert_eq!(tx.get(&tree, ka)?, Some(vc.into()));
+ assert_eq!(tx.get(&tree, ka).unwrap(), Some(vc.into()));
tx.abort(42)
});
assert!(matches!(res, Err(TxError::Abort(42))));
- assert_eq!(tree.get(ka)?, Some(vb.into()));
+ assert_eq!(tree.get(ka).unwrap(), Some(vb.into()));
- let mut iter = tree.iter()?;
+ let mut iter = tree.iter().unwrap();
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none());
- tree.insert(kb, vc)?;
- assert_eq!(tree.get(kb)?, Some(vc.into()));
+ tree.insert(kb, vc).unwrap();
+ assert_eq!(tree.get(kb).unwrap(), Some(vc.into()));
- let mut iter = tree.iter()?;
+ let mut iter = tree.iter().unwrap();
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert!(iter.next().is_none());
- let mut iter = tree.range(kint..)?;
+ let mut iter = tree.range(kint..).unwrap();
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert!(iter.next().is_none());
- let mut iter = tree.range_rev(..kint)?;
+ let mut iter = tree.range_rev(..kint).unwrap();
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none());
- let mut iter = tree.iter_rev()?;
+ let mut iter = tree.iter_rev().unwrap();
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none());
-
- Ok(())
}
#[test]
-fn test_sled_db() -> Result<()> {
+fn test_sled_db() {
let path = mktemp::Temp::new_dir().unwrap();
let db = SledDb::new(sled::open(path.to_path_buf()).unwrap());
- test_suite(db)?;
+ test_suite(db);
drop(path);
- Ok(())
+}
+
+#[test]
+fn test_sqlite_db() {
+ let db = SqliteDb::new(rusqlite::Connection::open_in_memory().unwrap());
+ test_suite(db);
}