aboutsummaryrefslogtreecommitdiff
path: root/src/db/sqlite_adapter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/db/sqlite_adapter.rs')
-rw-r--r--src/db/sqlite_adapter.rs212
1 files changed, 106 insertions, 106 deletions
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 6c556c97..a91b9011 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -1,12 +1,14 @@
use core::ops::Bound;
-use std::borrow::BorrowMut;
use std::marker::PhantomPinned;
+use std::path::PathBuf;
use std::pin::Pin;
use std::ptr::NonNull;
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::{Arc, Mutex, RwLock};
-use rusqlite::{params, Connection, Rows, Statement, Transaction};
+use r2d2::Pool;
+use r2d2_sqlite::SqliteConnectionManager;
+use rusqlite::{params, Rows, Statement, Transaction};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
@@ -15,6 +17,8 @@ use crate::{
pub use rusqlite;
+type Connection = r2d2::PooledConnection<SqliteConnectionManager>;
+
// --- err
impl From<rusqlite::Error> for Error {
@@ -23,6 +27,12 @@ impl From<rusqlite::Error> for Error {
}
}
+impl From<r2d2::Error> for Error {
+ fn from(e: r2d2::Error) -> Error {
+ Error(format!("Sqlite: {}", e).into())
+ }
+}
+
impl From<rusqlite::Error> for TxOpError {
fn from(e: rusqlite::Error) -> TxOpError {
TxOpError(e.into())
@@ -31,35 +41,47 @@ impl From<rusqlite::Error> for TxOpError {
// -- db
-pub struct SqliteDb(Mutex<SqliteDbInner>);
-
-struct SqliteDbInner {
- db: Connection,
- trees: Vec<String>,
+pub struct SqliteDb {
+ db: Pool<SqliteConnectionManager>,
+ trees: RwLock<Vec<Arc<str>>>,
+ // All operations that might write on the DB must take this lock first.
+ // This emulates LMDB's approach where a single writer can be
+ // active at once.
+ write_lock: Mutex<()>,
}
impl SqliteDb {
- pub fn init(db: rusqlite::Connection) -> Db {
- let s = Self(Mutex::new(SqliteDbInner {
- db,
- trees: Vec::new(),
- }));
- Db(Arc::new(s))
+ pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result<Db> {
+ let manager = manager.with_init(move |db| {
+ db.pragma_update(None, "journal_mode", "WAL")?;
+ if sync_mode {
+ db.pragma_update(None, "synchronous", "NORMAL")?;
+ } else {
+ db.pragma_update(None, "synchronous", "OFF")?;
+ }
+ Ok(())
+ });
+ let s = Self {
+ db: Pool::builder().build(manager)?,
+ trees: RwLock::new(vec![]),
+ write_lock: Mutex::new(()),
+ };
+ Ok(Db(Arc::new(s)))
}
}
-impl SqliteDbInner {
- fn get_tree(&self, i: usize) -> Result<&'_ str> {
+impl SqliteDb {
+ fn get_tree(&self, i: usize) -> Result<Arc<str>> {
self.trees
+ .read()
+ .unwrap()
.get(i)
- .map(String::as_str)
+ .cloned()
.ok_or_else(|| Error("invalid tree id".into()))
}
- fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
- let mut stmt = self
- .db
- .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result<Option<Value>> {
+ let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
@@ -75,13 +97,14 @@ impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> {
let name = format!("tree_{}", name.replace(':', "_COLON_"));
- let mut this = self.0.lock().unwrap();
+ let mut trees = self.trees.write().unwrap();
- if let Some(i) = this.trees.iter().position(|x| x == &name) {
+ if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) {
Ok(i)
} else {
+ let db = self.db.get()?;
trace!("create table {}", name);
- this.db.execute(
+ db.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
@@ -93,8 +116,8 @@ impl IDb for SqliteDb {
)?;
trace!("table created: {}, unlocking", name);
- let i = this.trees.len();
- this.trees.push(name.to_string());
+ let i = trees.len();
+ trees.push(name.to_string().into_boxed_str().into());
Ok(i)
}
}
@@ -102,11 +125,8 @@ impl IDb for SqliteDb {
fn list_trees(&self) -> Result<Vec<String>> {
let mut trees = vec![];
- trace!("list_trees: lock db");
- let this = self.0.lock().unwrap();
- trace!("list_trees: lock acquired");
-
- let mut stmt = this.db.prepare(
+ let db = self.db.get()?;
+ let mut stmt = db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?;
let mut rows = stmt.query([])?;
@@ -119,24 +139,29 @@ impl IDb for SqliteDb {
Ok(trees)
}
+ fn snapshot(&self, to: &PathBuf) -> Result<()> {
+ fn progress(p: rusqlite::backup::Progress) {
+ let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
+ info!("Sqlite snapshot progres: {}%", percent);
+ }
+ self.db
+ .get()?
+ .backup(rusqlite::DatabaseName::Main, to, Some(progress))?;
+ Ok(())
+ }
+
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("get {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("get {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
- this.internal_get(tree, key)
+ let tree = self.get_tree(tree)?;
+ self.internal_get(&self.db.get()?, &tree, key)
}
fn len(&self, tree: usize) -> Result<usize> {
- trace!("len {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("len {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
- let tree = this.get_tree(tree)?;
- let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
@@ -145,69 +170,60 @@ impl IDb for SqliteDb {
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
- trace!("insert {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("insert {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
let sql = match &old_val {
Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
};
- let n = this.db.execute(&sql, params![key, value])?;
+ let n = db.execute(&sql, params![key, value])?;
assert_eq!(n, 1);
+ drop(lock);
Ok(old_val)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("remove {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("remove {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
if old_val.is_some() {
- let n = this
- .db
- .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
assert_eq!(n, 1);
}
+ drop(lock);
Ok(old_val)
}
fn clear(&self, tree: usize) -> Result<()> {
- trace!("clear {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("clear {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
+
+ db.execute(&format!("DELETE FROM {}", tree), [])?;
- let tree = this.get_tree(tree)?;
- this.db.execute(&format!("DELETE FROM {}", tree), [])?;
+ drop(lock);
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn range<'r>(
@@ -216,11 +232,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
@@ -230,7 +242,7 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
@@ -238,11 +250,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
@@ -252,25 +260,20 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
- trace!("transaction: lock db");
- let mut this = self.0.lock().unwrap();
- trace!("transaction: lock acquired");
-
- let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+ let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?;
+ let trees = self.trees.read().unwrap();
+ let lock = self.write_lock.lock();
+ trace!("trying transaction");
let mut tx = SqliteTx {
- tx: this_mut_ref
- .db
- .transaction()
- .map_err(Error::from)
- .map_err(TxError::Db)?,
- trees: &this_mut_ref.trees,
+ tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?,
+ trees: &trees,
};
let res = match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
@@ -290,7 +293,8 @@ impl IDb for SqliteDb {
};
trace!("transaction done");
- res
+ drop(lock);
+ return res;
}
}
@@ -298,12 +302,12 @@ impl IDb for SqliteDb {
struct SqliteTx<'a> {
tx: Transaction<'a>,
- trees: &'a [String],
+ trees: &'a [Arc<str>],
}
impl<'a> SqliteTx<'a> {
fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
- self.trees.get(i).map(String::as_ref).ok_or_else(|| {
+ self.trees.get(i).map(Arc::as_ref).ok_or_else(|| {
TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
@@ -423,18 +427,14 @@ impl<'a> ITx for SqliteTx<'a> {
// therefore quite some unsafe code (it is a self-referential struct)
struct DbValueIterator<'a> {
- db: MutexGuard<'a, SqliteDbInner>,
+ db: Connection,
stmt: Option<Statement<'a>>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
}
impl<'a> DbValueIterator<'a> {
- fn make<P: rusqlite::Params>(
- db: MutexGuard<'a, SqliteDbInner>,
- sql: &str,
- args: P,
- ) -> Result<ValueIter<'a>> {
+ fn make<P: rusqlite::Params>(db: Connection, sql: &str, args: P) -> Result<ValueIter<'a>> {
let res = DbValueIterator {
db,
stmt: None,
@@ -446,7 +446,7 @@ impl<'a> DbValueIterator<'a> {
// This unsafe allows us to bypass lifetime checks
let db = unsafe { NonNull::from(&boxed.db).as_ref() };
- let stmt = db.db.prepare(sql)?;
+ let stmt = db.prepare(sql)?;
let mut_ref = Pin::as_mut(&mut boxed);
// This unsafe allows us to write in a field of the pinned struct