aboutsummaryrefslogtreecommitdiff
path: root/src/db
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-18 17:08:54 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-18 18:05:25 +0100
commite8f9718ccd656dacc4fba2ed9fa5d8abf12ad37b (patch)
tree8896fd504d6fc1be97324c1d813309a24bf43e82 /src/db
parentfd2e19bf1bf301bc03aa29ffa3fe1e71008cbe50 (diff)
downloadgarage-e8f9718ccd656dacc4fba2ed9fa5d8abf12ad37b.tar.gz
garage-e8f9718ccd656dacc4fba2ed9fa5d8abf12ad37b.zip
[sqlite-r2d2] implement connection pooling in sqlite backend
Diffstat (limited to 'src/db')
-rw-r--r--src/db/Cargo.toml4
-rw-r--r--src/db/open.rs10
-rw-r--r--src/db/sqlite_adapter.rs204
-rw-r--r--src/db/test.rs3
4 files changed, 103 insertions, 118 deletions
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 324de74c..baa94bae 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -18,6 +18,8 @@ tracing.workspace = true
heed = { workspace = true, optional = true }
rusqlite = { workspace = true, optional = true, features = ["backup"] }
+r2d2 = { workspace = true, optional = true }
+r2d2_sqlite = { workspace = true, optional = true }
sled = { workspace = true, optional = true }
[dev-dependencies]
@@ -27,4 +29,4 @@ mktemp.workspace = true
default = [ "sled", "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ]
-sqlite = [ "rusqlite" ]
+sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
diff --git a/src/db/open.rs b/src/db/open.rs
index ae135c4e..59d06f2e 100644
--- a/src/db/open.rs
+++ b/src/db/open.rs
@@ -91,14 +91,8 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
#[cfg(feature = "sqlite")]
Engine::Sqlite => {
info!("Opening Sqlite database at: {}", path.display());
- let db = crate::sqlite_adapter::rusqlite::Connection::open(&path)?;
- db.pragma_update(None, "journal_mode", "WAL")?;
- if opt.fsync {
- db.pragma_update(None, "synchronous", "NORMAL")?;
- } else {
- db.pragma_update(None, "synchronous", "OFF")?;
- }
- Ok(crate::sqlite_adapter::SqliteDb::init(db))
+ let manager = r2d2_sqlite::SqliteConnectionManager::file(path);
+ Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?)
}
// ---- LMDB DB ----
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 827f3cc3..3eccfdde 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -1,13 +1,14 @@
use core::ops::Bound;
-use std::borrow::BorrowMut;
use std::marker::PhantomPinned;
use std::path::PathBuf;
use std::pin::Pin;
use std::ptr::NonNull;
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::{Arc, Mutex, RwLock};
-use rusqlite::{params, Connection, Rows, Statement, Transaction};
+use r2d2::Pool;
+use r2d2_sqlite::SqliteConnectionManager;
+use rusqlite::{params, Rows, Statement, Transaction};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
@@ -16,6 +17,8 @@ use crate::{
pub use rusqlite;
+type Connection = r2d2::PooledConnection<SqliteConnectionManager>;
+
// --- err
impl From<rusqlite::Error> for Error {
@@ -24,6 +27,12 @@ impl From<rusqlite::Error> for Error {
}
}
+impl From<r2d2::Error> for Error {
+ fn from(e: r2d2::Error) -> Error {
+ Error(format!("Sqlite: {}", e).into())
+ }
+}
+
impl From<rusqlite::Error> for TxOpError {
fn from(e: rusqlite::Error) -> TxOpError {
TxOpError(e.into())
@@ -32,35 +41,47 @@ impl From<rusqlite::Error> for TxOpError {
// -- db
-pub struct SqliteDb(Mutex<SqliteDbInner>);
-
-struct SqliteDbInner {
- db: Connection,
- trees: Vec<String>,
+pub struct SqliteDb {
+ db: Pool<SqliteConnectionManager>,
+ trees: RwLock<Vec<Arc<str>>>,
+ // All operations that might write on the DB must take this lock first.
+ // This emulates LMDB's approach where a single writer can be
+ // active at once.
+ write_lock: Mutex<()>,
}
impl SqliteDb {
- pub fn init(db: rusqlite::Connection) -> Db {
- let s = Self(Mutex::new(SqliteDbInner {
- db,
- trees: Vec::new(),
- }));
- Db(Arc::new(s))
+ pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result<Db> {
+ let manager = manager.with_init(move |db| {
+ db.pragma_update(None, "journal_mode", "WAL")?;
+ if sync_mode {
+ db.pragma_update(None, "synchronous", "NORMAL")?;
+ } else {
+ db.pragma_update(None, "synchronous", "OFF")?;
+ }
+ Ok(())
+ });
+ let s = Self {
+ db: Pool::builder().build(manager)?,
+ trees: RwLock::new(vec![]),
+ write_lock: Mutex::new(()),
+ };
+ Ok(Db(Arc::new(s)))
}
}
-impl SqliteDbInner {
- fn get_tree(&self, i: usize) -> Result<&'_ str> {
+impl SqliteDb {
+ fn get_tree(&self, i: usize) -> Result<Arc<str>> {
self.trees
+ .read()
+ .unwrap()
.get(i)
- .map(String::as_str)
+ .cloned()
.ok_or_else(|| Error("invalid tree id".into()))
}
- fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
- let mut stmt = self
- .db
- .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result<Option<Value>> {
+ let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
@@ -76,13 +97,14 @@ impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> {
let name = format!("tree_{}", name.replace(':', "_COLON_"));
- let mut this = self.0.lock().unwrap();
+ let mut trees = self.trees.write().unwrap();
- if let Some(i) = this.trees.iter().position(|x| x == &name) {
+ if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) {
Ok(i)
} else {
+ let db = self.db.get()?;
trace!("create table {}", name);
- this.db.execute(
+ db.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
@@ -94,8 +116,8 @@ impl IDb for SqliteDb {
)?;
trace!("table created: {}, unlocking", name);
- let i = this.trees.len();
- this.trees.push(name.to_string());
+ let i = trees.len();
+ trees.push(name.to_string().into_boxed_str().into());
Ok(i)
}
}
@@ -103,11 +125,8 @@ impl IDb for SqliteDb {
fn list_trees(&self) -> Result<Vec<String>> {
let mut trees = vec![];
- trace!("list_trees: lock db");
- let this = self.0.lock().unwrap();
- trace!("list_trees: lock acquired");
-
- let mut stmt = this.db.prepare(
+ let db = self.db.get()?;
+ let mut stmt = db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?;
let mut rows = stmt.query([])?;
@@ -125,8 +144,8 @@ impl IDb for SqliteDb {
let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
info!("Sqlite snapshot progres: {}%", percent);
}
- let this = self.0.lock().unwrap();
- this.db
+ self.db
+ .get()?
.backup(rusqlite::DatabaseName::Main, to, Some(progress))?;
Ok(())
}
@@ -134,21 +153,15 @@ impl IDb for SqliteDb {
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("get {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("get {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
- this.internal_get(tree, key)
+ let tree = self.get_tree(tree)?;
+ self.internal_get(&self.db.get()?, &tree, key)
}
fn len(&self, tree: usize) -> Result<usize> {
- trace!("len {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("len {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
- let tree = this.get_tree(tree)?;
- let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
@@ -161,69 +174,60 @@ impl IDb for SqliteDb {
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
- trace!("insert {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("insert {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
let sql = match &old_val {
Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree),
None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree),
};
- let n = this.db.execute(&sql, params![key, value])?;
+ let n = db.execute(&sql, params![key, value])?;
assert_eq!(n, 1);
+ drop(lock);
Ok(old_val)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
- trace!("remove {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("remove {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
- let tree = this.get_tree(tree)?;
- let old_val = this.internal_get(tree, key)?;
+ let old_val = self.internal_get(&db, &tree, key)?;
if old_val.is_some() {
- let n = this
- .db
- .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
assert_eq!(n, 1);
}
+ drop(lock);
Ok(old_val)
}
fn clear(&self, tree: usize) -> Result<()> {
- trace!("clear {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("clear {}: lock acquired", tree);
+ let tree = self.get_tree(tree)?;
+ let db = self.db.get()?;
+ let lock = self.write_lock.lock();
+
+ db.execute(&format!("DELETE FROM {}", tree), [])?;
- let tree = this.get_tree(tree)?;
- this.db.execute(&format!("DELETE FROM {}", tree), [])?;
+ drop(lock);
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
- trace!("iter_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("iter_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
- DbValueIterator::make(this, &sql, [])
+ DbValueIterator::make(self.db.get()?, &sql, [])
}
fn range<'r>(
@@ -232,11 +236,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
@@ -246,7 +246,7 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
@@ -254,11 +254,7 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- trace!("range_rev {}: lock db", tree);
- let this = self.0.lock().unwrap();
- trace!("range_rev {}: lock acquired", tree);
-
- let tree = this.get_tree(tree)?;
+ let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
@@ -268,25 +264,20 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref())
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
- trace!("transaction: lock db");
- let mut this = self.0.lock().unwrap();
- trace!("transaction: lock acquired");
-
- let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+ let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?;
+ let trees = self.trees.read().unwrap();
+ let lock = self.write_lock.lock();
+ trace!("trying transaction");
let mut tx = SqliteTx {
- tx: this_mut_ref
- .db
- .transaction()
- .map_err(Error::from)
- .map_err(TxError::Db)?,
- trees: &this_mut_ref.trees,
+ tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?,
+ trees: &trees,
};
let res = match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
@@ -306,7 +297,8 @@ impl IDb for SqliteDb {
};
trace!("transaction done");
- res
+ drop(lock);
+ return res;
}
}
@@ -314,12 +306,12 @@ impl IDb for SqliteDb {
struct SqliteTx<'a> {
tx: Transaction<'a>,
- trees: &'a [String],
+ trees: &'a [Arc<str>],
}
impl<'a> SqliteTx<'a> {
fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
- self.trees.get(i).map(String::as_ref).ok_or_else(|| {
+ self.trees.get(i).map(Arc::as_ref).ok_or_else(|| {
TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
@@ -408,18 +400,14 @@ impl<'a> ITx for SqliteTx<'a> {
// ----
struct DbValueIterator<'a> {
- db: MutexGuard<'a, SqliteDbInner>,
+ db: Connection,
stmt: Option<Statement<'a>>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
}
impl<'a> DbValueIterator<'a> {
- fn make<P: rusqlite::Params>(
- db: MutexGuard<'a, SqliteDbInner>,
- sql: &str,
- args: P,
- ) -> Result<ValueIter<'a>> {
+ fn make<P: rusqlite::Params>(db: Connection, sql: &str, args: P) -> Result<ValueIter<'a>> {
let res = DbValueIterator {
db,
stmt: None,
@@ -431,7 +419,7 @@ impl<'a> DbValueIterator<'a> {
unsafe {
let db = NonNull::from(&boxed.db);
- let stmt = db.as_ref().db.prepare(sql)?;
+ let stmt = db.as_ref().prepare(sql)?;
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
diff --git a/src/db/test.rs b/src/db/test.rs
index cd99eafa..cad25f4d 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -106,6 +106,7 @@ fn test_sled_db() {
fn test_sqlite_db() {
use crate::sqlite_adapter::SqliteDb;
- let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
+ let manager = r2d2_sqlite::SqliteConnectionManager::memory();
+ let db = SqliteDb::new(manager, false).unwrap();
test_suite(db);
}