diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:17:54 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-18 20:19:30 +0100 |
commit | 0038ca8a78f147b9c0ec07ef0121773aaf110dc9 (patch) | |
tree | 43f39f30c63a6affa62eeea62cfec674f217c2b4 /src/db/sqlite_adapter.rs | |
parent | 81191d2d92e58ff82ace0f4d82b275c157673ade (diff) | |
parent | 1a0bffae3491fae6af5a8d4defc5c6b84839e197 (diff) | |
download | garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.tar.gz garage-0038ca8a78f147b9c0ec07ef0121773aaf110dc9.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/db/sqlite_adapter.rs')
-rw-r--r-- | src/db/sqlite_adapter.rs | 212 |
1 files changed, 106 insertions, 106 deletions
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 6c556c97..a91b9011 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,12 +1,14 @@ use core::ops::Bound; -use std::borrow::BorrowMut; use std::marker::PhantomPinned; +use std::path::PathBuf; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, RwLock}; -use rusqlite::{params, Connection, Rows, Statement, Transaction}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{params, Rows, Statement, Transaction}; use crate::{ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, @@ -15,6 +17,8 @@ use crate::{ pub use rusqlite; +type Connection = r2d2::PooledConnection<SqliteConnectionManager>; + // --- err impl From<rusqlite::Error> for Error { @@ -23,6 +27,12 @@ impl From<rusqlite::Error> for Error { } } +impl From<r2d2::Error> for Error { + fn from(e: r2d2::Error) -> Error { + Error(format!("Sqlite: {}", e).into()) + } +} + impl From<rusqlite::Error> for TxOpError { fn from(e: rusqlite::Error) -> TxOpError { TxOpError(e.into()) @@ -31,35 +41,47 @@ impl From<rusqlite::Error> for TxOpError { // -- db -pub struct SqliteDb(Mutex<SqliteDbInner>); - -struct SqliteDbInner { - db: Connection, - trees: Vec<String>, +pub struct SqliteDb { + db: Pool<SqliteConnectionManager>, + trees: RwLock<Vec<Arc<str>>>, + // All operations that might write on the DB must take this lock first. + // This emulates LMDB's approach where a single writer can be + // active at once. + write_lock: Mutex<()>, } impl SqliteDb { - pub fn init(db: rusqlite::Connection) -> Db { - let s = Self(Mutex::new(SqliteDbInner { - db, - trees: Vec::new(), - })); - Db(Arc::new(s)) + pub fn new(manager: SqliteConnectionManager, sync_mode: bool) -> Result<Db> { + let manager = manager.with_init(move |db| { + db.pragma_update(None, "journal_mode", "WAL")?; + if sync_mode { + db.pragma_update(None, "synchronous", "NORMAL")?; + } else { + db.pragma_update(None, "synchronous", "OFF")?; + } + Ok(()) + }); + let s = Self { + db: Pool::builder().build(manager)?, + trees: RwLock::new(vec![]), + write_lock: Mutex::new(()), + }; + Ok(Db(Arc::new(s))) } } -impl SqliteDbInner { - fn get_tree(&self, i: usize) -> Result<&'_ str> { +impl SqliteDb { + fn get_tree(&self, i: usize) -> Result<Arc<str>> { self.trees + .read() + .unwrap() .get(i) - .map(String::as_str) + .cloned() .ok_or_else(|| Error("invalid tree id".into())) } - fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { - let mut stmt = self - .db - .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + fn internal_get(&self, db: &Connection, tree: &str, key: &[u8]) -> Result<Option<Value>> { + let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; match res_iter.next()? { None => Ok(None), @@ -75,13 +97,14 @@ impl IDb for SqliteDb { fn open_tree(&self, name: &str) -> Result<usize> { let name = format!("tree_{}", name.replace(':', "_COLON_")); - let mut this = self.0.lock().unwrap(); + let mut trees = self.trees.write().unwrap(); - if let Some(i) = this.trees.iter().position(|x| x == &name) { + if let Some(i) = trees.iter().position(|x| x.as_ref() == &name) { Ok(i) } else { + let db = self.db.get()?; trace!("create table {}", name); - this.db.execute( + db.execute( &format!( "CREATE TABLE IF NOT EXISTS {} ( k BLOB PRIMARY KEY, @@ -93,8 +116,8 @@ impl IDb for SqliteDb { )?; trace!("table created: {}, unlocking", name); - let i = this.trees.len(); - this.trees.push(name.to_string()); + let i = trees.len(); + trees.push(name.to_string().into_boxed_str().into()); Ok(i) } } @@ -102,11 +125,8 @@ impl IDb for SqliteDb { fn list_trees(&self) -> Result<Vec<String>> { let mut trees = vec![]; - trace!("list_trees: lock db"); - let this = self.0.lock().unwrap(); - trace!("list_trees: lock acquired"); - - let mut stmt = this.db.prepare( + let db = self.db.get()?; + let mut stmt = db.prepare( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", )?; let mut rows = stmt.query([])?; @@ -119,24 +139,29 @@ impl IDb for SqliteDb { Ok(trees) } + fn snapshot(&self, to: &PathBuf) -> Result<()> { + fn progress(p: rusqlite::backup::Progress) { + let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; + info!("Sqlite snapshot progres: {}%", percent); + } + self.db + .get()? + .backup(rusqlite::DatabaseName::Main, to, Some(progress))?; + Ok(()) + } + // ---- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - trace!("get {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("get {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; - this.internal_get(tree, key) + let tree = self.get_tree(tree)?; + self.internal_get(&self.db.get()?, &tree, key) } fn len(&self, tree: usize) -> Result<usize> { - trace!("len {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("len {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; - let tree = this.get_tree(tree)?; - let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; match res_iter.next()? { None => Ok(0), @@ -145,69 +170,60 @@ impl IDb for SqliteDb { } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { - trace!("insert {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("insert {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; let sql = match &old_val { Some(_) => format!("UPDATE {} SET v = ?2 WHERE k = ?1", tree), None => format!("INSERT INTO {} (k, v) VALUES (?1, ?2)", tree), }; - let n = this.db.execute(&sql, params![key, value])?; + let n = db.execute(&sql, params![key, value])?; assert_eq!(n, 1); + drop(lock); Ok(old_val) } fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - trace!("remove {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("remove {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); - let tree = this.get_tree(tree)?; - let old_val = this.internal_get(tree, key)?; + let old_val = self.internal_get(&db, &tree, key)?; if old_val.is_some() { - let n = this - .db - .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + let n = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; assert_eq!(n, 1); } + drop(lock); Ok(old_val) } fn clear(&self, tree: usize) -> Result<()> { - trace!("clear {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("clear {}: lock acquired", tree); + let tree = self.get_tree(tree)?; + let db = self.db.get()?; + let lock = self.write_lock.lock(); + + db.execute(&format!("DELETE FROM {}", tree), [])?; - let tree = this.get_tree(tree)?; - this.db.execute(&format!("DELETE FROM {}", tree), [])?; + drop(lock); Ok(()) } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { - trace!("iter {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> { - trace!("iter_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("iter_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); - DbValueIterator::make(this, &sql, []) + DbValueIterator::make(self.db.get()?, &sql, []) } fn range<'r>( @@ -216,11 +232,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - trace!("range {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); @@ -230,7 +242,7 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } fn range_rev<'r>( &self, @@ -238,11 +250,7 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - trace!("range_rev {}: lock db", tree); - let this = self.0.lock().unwrap(); - trace!("range_rev {}: lock acquired", tree); - - let tree = this.get_tree(tree)?; + let tree = self.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); @@ -252,25 +260,20 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(self.db.get()?, &sql, params.as_ref()) } // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> { - trace!("transaction: lock db"); - let mut this = self.0.lock().unwrap(); - trace!("transaction: lock acquired"); - - let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); + let mut db = self.db.get().map_err(Error::from).map_err(TxError::Db)?; + let trees = self.trees.read().unwrap(); + let lock = self.write_lock.lock(); + trace!("trying transaction"); let mut tx = SqliteTx { - tx: this_mut_ref - .db - .transaction() - .map_err(Error::from) - .map_err(TxError::Db)?, - trees: &this_mut_ref.trees, + tx: db.transaction().map_err(Error::from).map_err(TxError::Db)?, + trees: &trees, }; let res = match f.try_on(&mut tx) { TxFnResult::Ok(on_commit) => { @@ -290,7 +293,8 @@ impl IDb for SqliteDb { }; trace!("transaction done"); - res + drop(lock); + return res; } } @@ -298,12 +302,12 @@ impl IDb for SqliteDb { struct SqliteTx<'a> { tx: Transaction<'a>, - trees: &'a [String], + trees: &'a [Arc<str>], } impl<'a> SqliteTx<'a> { fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> { - self.trees.get(i).map(String::as_ref).ok_or_else(|| { + self.trees.get(i).map(Arc::as_ref).ok_or_else(|| { TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), )) @@ -423,18 +427,14 @@ impl<'a> ITx for SqliteTx<'a> { // therefore quite some unsafe code (it is a self-referential struct) struct DbValueIterator<'a> { - db: MutexGuard<'a, SqliteDbInner>, + db: Connection, stmt: Option<Statement<'a>>, iter: Option<Rows<'a>>, _pin: PhantomPinned, } impl<'a> DbValueIterator<'a> { - fn make<P: rusqlite::Params>( - db: MutexGuard<'a, SqliteDbInner>, - sql: &str, - args: P, - ) -> Result<ValueIter<'a>> { + fn make<P: rusqlite::Params>(db: Connection, sql: &str, args: P) -> Result<ValueIter<'a>> { let res = DbValueIterator { db, stmt: None, @@ -446,7 +446,7 @@ impl<'a> DbValueIterator<'a> { // This unsafe allows us to bypass lifetime checks let db = unsafe { NonNull::from(&boxed.db).as_ref() }; - let stmt = db.db.prepare(sql)?; + let stmt = db.prepare(sql)?; let mut_ref = Pin::as_mut(&mut boxed); // This unsafe allows us to write in a field of the pinned struct |