From e8f9718ccd656dacc4fba2ed9fa5d8abf12ad37b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Mar 2024 17:08:54 +0100 Subject: [sqlite-r2d2] implement connection pooling in sqlite backend --- src/db/Cargo.toml | 4 +- src/db/open.rs | 10 +-- src/db/sqlite_adapter.rs | 204 ++++++++++++++++++++++------------------------- src/db/test.rs | 3 +- 4 files changed, 103 insertions(+), 118 deletions(-) (limited to 'src') diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 324de74c..baa94bae 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -18,6 +18,8 @@ tracing.workspace = true heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } +r2d2 = { workspace = true, optional = true } +r2d2_sqlite = { workspace = true, optional = true } sled = { workspace = true, optional = true } [dev-dependencies] @@ -27,4 +29,4 @@ mktemp.workspace = true default = [ "sled", "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] lmdb = [ "heed" ] -sqlite = [ "rusqlite" ] +sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] diff --git a/src/db/open.rs b/src/db/open.rs index ae135c4e..59d06f2e 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -91,14 +91,8 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { #[cfg(feature = "sqlite")] Engine::Sqlite => { info!("Opening Sqlite database at: {}", path.display()); - let db = crate::sqlite_adapter::rusqlite::Connection::open(&path)?; - db.pragma_update(None, "journal_mode", "WAL")?; - if opt.fsync { - db.pragma_update(None, "synchronous", "NORMAL")?; - } else { - db.pragma_update(None, "synchronous", "OFF")?; - } - Ok(crate::sqlite_adapter::SqliteDb::init(db)) + let manager = r2d2_sqlite::SqliteConnectionManager::file(path); + Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?) } // ---- LMDB DB ---- diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 827f3cc3..3eccfdde 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,13 +1,14 @@ use core::ops::Bound; -use std::borrow::BorrowMut; use std::marker::PhantomPinned; use std::path::PathBuf; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, RwLock}; -use rusqlite::{params, Connection, Rows, Statement, Transaction}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{params, Rows, Statement, Transaction}; use crate::{ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, @@ -16,6 +17,8 @@ use crate::{ pub use rusqlite; +type Connection = r2d2::PooledConnection; + // --- err impl From for Error { @@ -24,6 +27,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: r2d2::Error) -> Error { + Error(format!("Sqlite: {}", e).into()) + } +} + impl From for TxOpError { fn from(e: rusqlite::Error) -> TxOpError { TxOpError(e.into()) @@ -32,35 +41,47 @@ impl From for TxOpError { // -- db -pub struct SqliteDb(Mutex); - -struct SqliteDbInner { - db: Connection, - trees: Vec, +pub struct SqliteDb { + db: Pool, + trees: RwLock>>, + // All operations that might write on the DB must take this lock first. + // This emulates LMDB's approach where a single writer can be + // active at once. + write_lock: Mutex<()>, } impl SqliteDb { - pub fn init(db: rusqlite::Connection) -> Db { - let s = Self(Mutex::new(SqliteDbInner { - db, - trees: Vec::new(), - })); - Db(Arc::new(s)) + pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result { + let manager = manager.with_init(move |db| { + db.pragma_update(None, "journal_mode", "WAL")?; + if sync_mode { + db.pragma_update(None, "synchronous", "NORMAL")?; + } else { + db.pragma_update(None, "synchronous", "OFF")?; + } + Ok(()) + }); + let s = Self { + db: Pool::builder().build(manager)?, + trees: RwLock::new(vec![]), + write_lock: Mutex::new(()), + }; + Ok(Db(Arc::new(s))) } } -impl SqliteDbInner { - fn get_tree(&self, i: usize) -> Result<&'_ str> { +impl SqliteDb { + fn get_tree(&self, i: usize) -> Result> { self.trees + .read() + .unwrap() .get(i) - .map(String::as_str) + .cloned() .ok_or_else(|| Error("invalid tree id".into())) } - fn internal_get(&self, tree: &str, key: &[u8]) -> Result> { - let mut stmt = self - .db - .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result> { + let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; match res_iter.next()? { None => Ok(None), @@ -76,13 +97,14 @@ impl IDb for SqliteDb { fn open_tree(&self, name: &str) -> Result { let name = format!("tree_{}", name.replace(':', "_COLON_")); - let mut this = self.0.lock().unwrap(); + let mut trees = self.trees.write().unwrap(); - if let Some(i) = this.trees.iter().position(|x| x == &name) { + if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) { Ok(i) } else { + let db = self.db.get()?; trace!("create table {}", name); - this.db.execute( + db.execute( &format!( "CREATE TABLE IF NOT EXISTS {} ( k BLOB PRIMARY KEY, @@ -94,8 +116,8 @@ impl IDb for SqliteDb { )?; trace!("table created: {}, unlocking", name); - let i = this.trees.len(); - this.trees.push(name.to_string()); + let i = trees.len(); + trees.push(name.to_string().into_boxed_str().into()); Ok(i) } } @@ -103,11 +125,8 @@ impl IDb for SqliteDb { fn list_trees(&self) -> Result> { let mut trees = vec![]; - trace!("list_trees: lock db"); - let this = self.0.lock().unwrap(); - trace!("list_trees: lock acquired"); - - let mut stmt = this.db.prepare( + let db = self.db.get()?; + let mut stmt = db.prepare( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", )?; let mut rows = stmt.query([])?; @@ -125,8 +144,8 @@ impl IDb for SqliteDb { let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; info!("Sqlite snapshot progres: {}%", percent); } - let this = self.0.lock().unwrap(); - this.db + self.db + .get()? .backup(rusqlite::DatabaseName::Main, to, Some(progress))?; Ok(()) } @@ -134,21 +153,15 @@ impl IDb for SqliteDb { // ---- fn get(&self, tree: usize, key: &[u8]) -> Result> { - trace!("get {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("get {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; - this.internal_get(tree, key) + let tree = self.get_tree(tree)?; + self.internal_get(&self.db.get()?, &tree, key) } fn len(&self, tree: usize) -> Result { - trace!("len {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("len {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; - let tree = this.get_tree(tree)?; - let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; match res_iter.next()? { None => Ok(0), @@ -161,69 +174,60 @@ impl IDb for SqliteDb { } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { - trace!("insert {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("insert {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; let sql = match &old_val { Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree), None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree), }; - let n = this.db.execute(&sql, params![key, value])?; + let n = db.execute(&sql, params![key, value])?; assert_eq!(n, 1); + drop(lock); Ok(old_val) } fn remove(&self, tree: usize, key: &[u8]) -> Result> { - trace!("remove {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("remove {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; if old_val.is_some() { - let n = this - .db - .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; assert_eq!(n, 1); } + drop(lock); Ok(old_val) } fn clear(&self, tree: usize) -> Result<()> { - trace!("clear {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("clear {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); + + db.execute(&format!("DELETE FROM {}", tree), [])?; - let tree = this.get_tree(tree)?; - this.db.execute(&format!("DELETE FROM {}", tree), [])?; + drop(lock); Ok(()) } fn iter(&self, tree: usize) -> Result> { - trace!("iter {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn iter_rev(&self, tree: usize) -> Result> { - trace!("iter_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn range<'r>( @@ -232,11 +236,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result> { - trace!("range {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); @@ -246,7 +246,7 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } fn range_rev<'r>( &self, @@ -254,11 +254,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result> { - trace!("range_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); @@ -268,25 +264,20 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult { - trace!("transaction: lock db"); - let mut this = self.0.lock().unwrap(); - trace!("transaction: lock acquired"); - - let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); + let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?; + let trees = self.trees.read().unwrap(); + let lock = self.write_lock.lock(); + trace!("trying transaction"); let mut tx = SqliteTx { - tx: this_mut_ref - .db - .transaction() - .map_err(Error::from) - .map_err(TxError::Db)?, - trees: &this_mut_ref.trees, + tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?, + trees: &trees, }; let res = match f.try_on(&mut tx) { TxFnResult::Ok(on_commit) => { @@ -306,7 +297,8 @@ impl IDb for SqliteDb { }; trace!("transaction done"); - res + drop(lock); + return res; } } @@ -314,12 +306,12 @@ impl IDb for SqliteDb { struct SqliteTx<'a> { tx: Transaction<'a>, - trees: &'a [String], + trees: &'a [Arc], } impl<'a> SqliteTx<'a> { fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> { - self.trees.get(i).map(String::as_ref).ok_or_else(|| { + self.trees.get(i).map(Arc::as_ref).ok_or_else(|| { TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), )) @@ -408,18 +400,14 @@ impl<'a> ITx for SqliteTx<'a> { // ---- struct DbValueIterator<'a> { - db: MutexGuard<'a, SqliteDbInner>, + db: Connection, stmt: Option>, iter: Option>, _pin: PhantomPinned, } impl<'a> DbValueIterator<'a> { - fn make( - db: MutexGuard<'a, SqliteDbInner>, - sql: &str, - args: P, - ) -> Result> { + fn make(db: Connection, sql: &str, args: P) -> Result> { let res = DbValueIterator { db, stmt: None, @@ -431,7 +419,7 @@ impl<'a> DbValueIterator<'a> { unsafe { let db = NonNull::from(&boxed.db); - let stmt = db.as_ref().db.prepare(sql)?; + let stmt = db.as_ref().prepare(sql)?; let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); diff --git a/src/db/test.rs b/src/db/test.rs index cd99eafa..cad25f4d 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -106,6 +106,7 @@ fn test_sled_db() { fn test_sqlite_db() { use crate::sqlite_adapter::SqliteDb; - let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap()); + let manager = r2d2_sqlite::SqliteConnectionManager::memory(); + let db = SqliteDb::new(manager, false).unwrap(); test_suite(db); } -- cgit v1.2.3 From b55f52a9b75359b02938ac003a6ea853b36a4f3e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Mar 2024 17:58:34 +0100 Subject: [sqlite-r2d2] run integration test with all db engines --- src/garage/tests/common/garage.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index d1f0867a..006337ee 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -42,6 +42,10 @@ impl Instance { .ok() .unwrap_or_else(|| env::temp_dir().join(format!("garage-integ-test-{}", port))); + let db_engine = env::var("GARAGE_TEST_INTEGRATION_DB_ENGINE") + .ok() + .unwrap_or_else(|| "lmdb".into()); + // Clean test runtime directory if path.exists() { fs::remove_dir_all(&path).expect("Could not clean test runtime directory"); @@ -52,7 +56,7 @@ impl Instance { r#" metadata_dir = "{path}/meta" data_dir = "{path}/data" -db_engine = "lmdb" +db_engine = "{db_engine}" replication_mode = "1" -- cgit v1.2.3