diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/db/sqlite_adapter.rs | 133 | ||||
-rw-r--r-- | src/table/gc.rs | 20 |
2 files changed, 79 insertions, 74 deletions
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 10e3bfee..e945c31c 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,9 +1,10 @@ use core::ops::Bound; +use std::borrow::BorrowMut; use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::{Arc, Mutex, MutexGuard}; use log::trace; @@ -29,24 +30,26 @@ impl<T> From<rusqlite::Error> for TxError<T> { // -- db -pub struct SqliteDb { - db: Mutex<Connection>, - trees: RwLock<Vec<String>>, +pub struct SqliteDb(Mutex<SqliteDbInner>); + +struct SqliteDbInner { + db: Connection, + trees: Vec<String>, } impl SqliteDb { pub fn init(db: rusqlite::Connection) -> Db { - let s = Self { - db: Mutex::new(db), - trees: RwLock::new(Vec::new()), - }; + let s = Self(Mutex::new(SqliteDbInner { + db, + trees: Vec::new(), + })); Db(Arc::new(s)) } +} +impl SqliteDbInner { fn get_tree(&self, i: usize) -> Result<String> { self.trees - .read() - .unwrap() .get(i) .cloned() .ok_or_else(|| Error("invalid tree id".into())) @@ -56,16 +59,13 @@ impl SqliteDb { impl IDb for SqliteDb { fn open_tree(&self, name: &str) -> Result<usize> { let name = format!("tree_{}", name.replace(':', "_COLON_")); + let mut this = self.0.lock().unwrap(); - let mut trees = self.trees.write().unwrap(); - if let Some(i) = trees.iter().position(|x| x == &name) { + if let Some(i) = this.trees.iter().position(|x| x == &name) { Ok(i) } else { - trace!("open tree {}: lock db", name); - let db = self.db.lock().unwrap(); trace!("create table {}", name); - - db.execute( + this.db.execute( &format!( "CREATE TABLE IF NOT EXISTS {} ( k BLOB PRIMARY KEY, @@ -75,10 +75,10 @@ impl IDb for SqliteDb { ), [], )?; - trace!("table created: {}", name); + trace!("table created: {}, unlocking", name); - let i = trees.len(); - trees.push(name.to_string()); + let i = this.trees.len(); + this.trees.push(name.to_string()); Ok(i) } } @@ -87,10 +87,10 @@ impl IDb for SqliteDb { let mut trees = vec![]; trace!("list_trees: lock db"); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("list_trees: lock acquired"); - let mut stmt = db.prepare( + let mut stmt = this.db.prepare( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", )?; let mut rows = stmt.query([])?; @@ -106,13 +106,15 @@ impl IDb for SqliteDb { // ---- fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> { - let tree = self.get_tree(tree)?; - trace!("get {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("get {}: lock acquired", tree); - let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + let tree = this.get_tree(tree)?; + + let mut stmt = this + .db + .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; match res_iter.next()? { None => Ok(None), @@ -121,24 +123,24 @@ impl IDb for SqliteDb { } fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { - let tree = self.get_tree(tree)?; - trace!("remove {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("remove {}: lock acquired", tree); - let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; + let tree = this.get_tree(tree)?; + let res = this + .db + .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; Ok(res > 0) } fn len(&self, tree: usize) -> Result<usize> { - let tree = self.get_tree(tree)?; - trace!("len {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("len {}: lock acquired", tree); - let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; + let tree = this.get_tree(tree)?; + let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; match res_iter.next()? { None => Ok(0), @@ -147,13 +149,12 @@ impl IDb for SqliteDb { } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { - let tree = self.get_tree(tree)?; - trace!("insert {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("insert {}: lock acquired", tree); - db.execute( + let tree = this.get_tree(tree)?; + this.db.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), params![key, value], )?; @@ -161,25 +162,23 @@ impl IDb for SqliteDb { } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - trace!("iter {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("iter {}: lock acquired", tree); - DbValueIterator::make(db, &sql, []) + let tree = this.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); + DbValueIterator::make(this, &sql, []) } fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; - let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); - trace!("iter_rev {}: lock db", tree); - let db = self.db.lock().unwrap(); + let this = self.0.lock().unwrap(); trace!("iter_rev {}: lock acquired", tree); - DbValueIterator::make(db, &sql, []) + let tree = this.get_tree(tree)?; + let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); + DbValueIterator::make(this, &sql, []) } fn range<'r>( @@ -188,7 +187,11 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; + trace!("range {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("range {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql); @@ -198,11 +201,7 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - trace!("range {}: lock db", tree); - let db = self.db.lock().unwrap(); - trace!("range {}: lock acquired", tree); - - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) } fn range_rev<'r>( &self, @@ -210,7 +209,11 @@ impl IDb for SqliteDb { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> Result<ValueIter<'_>> { - let tree = self.get_tree(tree)?; + trace!("range_rev {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("range_rev {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; let (bounds_sql, params) = bounds_sql(low, high); let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql); @@ -220,25 +223,21 @@ impl IDb for SqliteDb { .map(|x| x as &dyn rusqlite::ToSql) .collect::<Vec<_>>(); - trace!("range_rev {}: lock db", tree); - let db = self.db.lock().unwrap(); - trace!("range_rev {}: lock acquired", tree); - - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref()) + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref()) } // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { - let trees = self.trees.read().unwrap(); - trace!("transaction: lock db"); - let mut db = self.db.lock().unwrap(); + let mut this = self.0.lock().unwrap(); trace!("transaction: lock acquired"); + let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); + let mut tx = SqliteTx { - tx: db.transaction()?, - trees: trees.as_ref(), + tx: this_mut_ref.db.transaction()?, + trees: &this_mut_ref.trees, }; let res = match f.try_on(&mut tx) { TxFnResult::Ok => { @@ -345,7 +344,7 @@ impl<'a> ITx for SqliteTx<'a> { // ---- struct DbValueIterator<'a> { - db: MutexGuard<'a, Connection>, + db: MutexGuard<'a, SqliteDbInner>, stmt: Option<Statement<'a>>, iter: Option<Rows<'a>>, _pin: PhantomPinned, @@ -353,7 +352,7 @@ struct DbValueIterator<'a> { impl<'a> DbValueIterator<'a> { fn make<P: rusqlite::Params>( - db: MutexGuard<'a, Connection>, + db: MutexGuard<'a, SqliteDbInner>, sql: &str, args: P, ) -> Result<ValueIter<'a>> { @@ -368,7 +367,7 @@ impl<'a> DbValueIterator<'a> { unsafe { let db = NonNull::from(&boxed.db); - let stmt = db.as_ref().prepare(sql)?; + let stmt = db.as_ref().db.prepare(sql)?; let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); diff --git a/src/table/gc.rs b/src/table/gc.rs index e2611389..2a23f4cc 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -101,18 +101,16 @@ where async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); - let mut entries = vec![]; - let mut excluded = vec![]; - // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) + let mut candidates = vec![]; for entry_kv in self.data.gc_todo.iter()? { let (k, vhash) = entry_kv?; - let mut todo_entry = GcTodoEntry::parse(&k, &vhash); + let todo_entry = GcTodoEntry::parse(&k, &vhash); if todo_entry.deletion_time() > now { - if entries.is_empty() && excluded.is_empty() { + if candidates.is_empty() { // If the earliest entry in the todo list shouldn't yet be processed, // return a duration to wait in the loop return Ok(Some(Duration::from_millis( @@ -124,15 +122,23 @@ where } } - let vhash = Hash::try_from(&vhash[..]).unwrap(); + candidates.push(todo_entry); + if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE { + break; + } + } + let mut entries = vec![]; + let mut excluded = vec![]; + for mut todo_entry in candidates { // Check if the tombstone is still the current value of the entry. // If not, we don't actually want to GC it, and we will remove it // from the gc_todo table later (below). + let vhash = todo_entry.value_hash; todo_entry.value = self .data .store - .get(&k[..])? + .get(&todo_entry.key[..])? .filter(|v| blake2sum(&v[..]) == vhash) .map(|v| v.to_vec()); |