aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db/sqlite_adapter.rs133
-rw-r--r--src/table/gc.rs20
2 files changed, 79 insertions, 74 deletions
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 10e3bfee..e945c31c 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -1,9 +1,10 @@
use core::ops::Bound;
+use std::borrow::BorrowMut;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
-use std::sync::{Arc, Mutex, MutexGuard, RwLock};
+use std::sync::{Arc, Mutex, MutexGuard};
use log::trace;
@@ -29,24 +30,26 @@ impl<T> From<rusqlite::Error> for TxError<T> {
// -- db
-pub struct SqliteDb {
- db: Mutex<Connection>,
- trees: RwLock<Vec<String>>,
+pub struct SqliteDb(Mutex<SqliteDbInner>);
+
+struct SqliteDbInner {
+ db: Connection,
+ trees: Vec<String>,
}
impl SqliteDb {
pub fn init(db: rusqlite::Connection) -> Db {
- let s = Self {
- db: Mutex::new(db),
- trees: RwLock::new(Vec::new()),
- };
+ let s = Self(Mutex::new(SqliteDbInner {
+ db,
+ trees: Vec::new(),
+ }));
Db(Arc::new(s))
}
+}
+impl SqliteDbInner {
fn get_tree(&self, i: usize) -> Result<String> {
self.trees
- .read()
- .unwrap()
.get(i)
.cloned()
.ok_or_else(|| Error("invalid tree id".into()))
@@ -56,16 +59,13 @@ impl SqliteDb {
impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> {
let name = format!("tree_{}", name.replace(':', "_COLON_"));
+ let mut this = self.0.lock().unwrap();
- let mut trees = self.trees.write().unwrap();
- if let Some(i) = trees.iter().position(|x| x == &name) {
+ if let Some(i) = this.trees.iter().position(|x| x == &name) {
Ok(i)
} else {
- trace!("open tree {}: lock db", name);
- let db = self.db.lock().unwrap();
trace!("create table {}", name);
-
- db.execute(
+ this.db.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
@@ -75,10 +75,10 @@ impl IDb for SqliteDb {
),
[],
)?;
- trace!("table created: {}", name);
+ trace!("table created: {}, unlocking", name);
- let i = trees.len();
- trees.push(name.to_string());
+ let i = this.trees.len();
+ this.trees.push(name.to_string());
Ok(i)
}
}
@@ -87,10 +87,10 @@ impl IDb for SqliteDb {
let mut trees = vec![];
trace!("list_trees: lock db");
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("list_trees: lock acquired");
- let mut stmt = db.prepare(
+ let mut stmt = this.db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?;
let mut rows = stmt.query([])?;
@@ -106,13 +106,15 @@ impl IDb for SqliteDb {
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
- let tree = self.get_tree(tree)?;
-
trace!("get {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("get {}: lock acquired", tree);
- let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
+ let tree = this.get_tree(tree)?;
+
+ let mut stmt = this
+ .db
+ .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
@@ -121,24 +123,24 @@ impl IDb for SqliteDb {
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
- let tree = self.get_tree(tree)?;
-
trace!("remove {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("remove {}: lock acquired", tree);
- let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
+ let tree = this.get_tree(tree)?;
+ let res = this
+ .db
+ .execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
Ok(res > 0)
}
fn len(&self, tree: usize) -> Result<usize> {
- let tree = self.get_tree(tree)?;
-
trace!("len {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("len {}: lock acquired", tree);
- let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
+ let tree = this.get_tree(tree)?;
+ let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
@@ -147,13 +149,12 @@ impl IDb for SqliteDb {
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
- let tree = self.get_tree(tree)?;
-
trace!("insert {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("insert {}: lock acquired", tree);
- db.execute(
+ let tree = this.get_tree(tree)?;
+ this.db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
params![key, value],
)?;
@@ -161,25 +162,23 @@ impl IDb for SqliteDb {
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
- let tree = self.get_tree(tree)?;
- let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
-
trace!("iter {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("iter {}: lock acquired", tree);
- DbValueIterator::make(db, &sql, [])
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
+ DbValueIterator::make(this, &sql, [])
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
- let tree = self.get_tree(tree)?;
- let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
-
trace!("iter_rev {}: lock db", tree);
- let db = self.db.lock().unwrap();
+ let this = self.0.lock().unwrap();
trace!("iter_rev {}: lock acquired", tree);
- DbValueIterator::make(db, &sql, [])
+ let tree = this.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
+ DbValueIterator::make(this, &sql, [])
}
fn range<'r>(
@@ -188,7 +187,11 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- let tree = self.get_tree(tree)?;
+ trace!("range {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
@@ -198,11 +201,7 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- trace!("range {}: lock db", tree);
- let db = self.db.lock().unwrap();
- trace!("range {}: lock acquired", tree);
-
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
@@ -210,7 +209,11 @@ impl IDb for SqliteDb {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
- let tree = self.get_tree(tree)?;
+ trace!("range_rev {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("range_rev {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
@@ -220,25 +223,21 @@ impl IDb for SqliteDb {
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
- trace!("range_rev {}: lock db", tree);
- let db = self.db.lock().unwrap();
- trace!("range_rev {}: lock acquired", tree);
-
- DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
+ DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
- let trees = self.trees.read().unwrap();
-
trace!("transaction: lock db");
- let mut db = self.db.lock().unwrap();
+ let mut this = self.0.lock().unwrap();
trace!("transaction: lock acquired");
+ let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
+
let mut tx = SqliteTx {
- tx: db.transaction()?,
- trees: trees.as_ref(),
+ tx: this_mut_ref.db.transaction()?,
+ trees: &this_mut_ref.trees,
};
let res = match f.try_on(&mut tx) {
TxFnResult::Ok => {
@@ -345,7 +344,7 @@ impl<'a> ITx for SqliteTx<'a> {
// ----
struct DbValueIterator<'a> {
- db: MutexGuard<'a, Connection>,
+ db: MutexGuard<'a, SqliteDbInner>,
stmt: Option<Statement<'a>>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
@@ -353,7 +352,7 @@ struct DbValueIterator<'a> {
impl<'a> DbValueIterator<'a> {
fn make<P: rusqlite::Params>(
- db: MutexGuard<'a, Connection>,
+ db: MutexGuard<'a, SqliteDbInner>,
sql: &str,
args: P,
) -> Result<ValueIter<'a>> {
@@ -368,7 +367,7 @@ impl<'a> DbValueIterator<'a> {
unsafe {
let db = NonNull::from(&boxed.db);
- let stmt = db.as_ref().prepare(sql)?;
+ let stmt = db.as_ref().db.prepare(sql)?;
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e2611389..2a23f4cc 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -101,18 +101,16 @@ where
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
- let mut entries = vec![];
- let mut excluded = vec![];
-
// List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
+ let mut candidates = vec![];
for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
- let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
+ let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
- if entries.is_empty() && excluded.is_empty() {
+ if candidates.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
@@ -124,15 +122,23 @@ where
}
}
- let vhash = Hash::try_from(&vhash[..]).unwrap();
+ candidates.push(todo_entry);
+ if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ }
+ let mut entries = vec![];
+ let mut excluded = vec![];
+ for mut todo_entry in candidates {
// Check if the tombstone is still the current value of the entry.
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
+ let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
- .get(&k[..])?
+ .get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());