aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-08 16:38:01 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-08 16:38:01 +0100
commitb942949940b5a0dec8e8640c44a2705a4482a2e4 (patch)
tree5d59eb2a6fd9d62f075e8bf094ba3e78a0dbbaf7
parent66c23890c1a6e73fd6c5246642e087cd2866451e (diff)
downloadgarage-b942949940b5a0dec8e8640c44a2705a4482a2e4.tar.gz
garage-b942949940b5a0dec8e8640c44a2705a4482a2e4.zip
[rm-sled] Implement iterators in sqlite & lmdb transactions
with way too much unsafe code
-rw-r--r--src/db/lib.rs1
-rw-r--r--src/db/lmdb_adapter.rs48
-rw-r--r--src/db/sqlite_adapter.rs125
-rw-r--r--src/db/test.rs49
4 files changed, 195 insertions, 28 deletions
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 5881954d..ff511b5f 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -51,6 +51,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub struct TxOpError(pub(crate) Error);
pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
+#[derive(Debug)]
pub enum TxError<E> {
Abort(E),
Db(Error),
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index 01c360b4..ddfb6ed5 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -261,32 +261,42 @@ impl<'a> ITx for LmdbTx<'a> {
Ok(())
}
- fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!("Iterators in transactions not supported with LMDB backend");
+ fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ let tree = *self.get_tree(tree)?;
+ Ok(Box::new(tree.iter(&self.tx)?.map(tx_iter_item)))
}
- fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!("Iterators in transactions not supported with LMDB backend");
+ fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ let tree = *self.get_tree(tree)?;
+ Ok(Box::new(tree.rev_iter(&self.tx)?.map(tx_iter_item)))
}
fn range<'r>(
&self,
- _tree: usize,
- _low: Bound<&'r [u8]>,
- _high: Bound<&'r [u8]>,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!("Iterators in transactions not supported with LMDB backend");
+ let tree = *self.get_tree(tree)?;
+ Ok(Box::new(
+ tree.range(&self.tx, &(low, high))?.map(tx_iter_item),
+ ))
}
fn range_rev<'r>(
&self,
- _tree: usize,
- _low: Bound<&'r [u8]>,
- _high: Bound<&'r [u8]>,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!("Iterators in transactions not supported with LMDB backend");
+ let tree = *self.get_tree(tree)?;
+ Ok(Box::new(
+ tree.rev_range(&self.tx, &(low, high))?.map(tx_iter_item),
+ ))
}
}
-// ----
+// ---- iterators outside transactions ----
+// complicated, they must hold the transaction object
+// therefore a bit of unsafe code (it is a self-referential struct)
type IteratorItem<'a> = heed::Result<(
<ByteSlice as BytesDecode<'a>>::DItem,
@@ -323,6 +333,7 @@ where
I: Iterator<Item = IteratorItem<'a>> + 'a,
{
fn drop(&mut self) {
+ // ensure the iterator is dropped before the RoTxn it references
drop(self.iter.take());
}
}
@@ -342,7 +353,16 @@ where
}
}
-// ----
+// ---- iterators within transactions ----
+
+fn tx_iter_item<'a>(
+ item: std::result::Result<(&'a [u8], &'a [u8]), heed::Error>,
+) -> TxOpResult<(Vec<u8>, Vec<u8>)> {
+ item.map(|(k, v)| (k.to_vec(), v.to_vec()))
+ .map_err(|e| TxOpError(Error::from(e)))
+}
+
+// ---- utility ----
#[cfg(target_pointer_width = "64")]
pub fn recommended_map_size() -> usize {
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index d1394355..077c1f1b 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -369,32 +369,58 @@ impl<'a> ITx for SqliteTx<'a> {
Ok(())
}
- fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!();
+ fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
+ TxValueIterator::make(self, &sql, [])
}
- fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!();
+ fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
+ let tree = self.get_tree(tree)?;
+ let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
+ TxValueIterator::make(self, &sql, [])
}
fn range<'r>(
&self,
- _tree: usize,
- _low: Bound<&'r [u8]>,
- _high: Bound<&'r [u8]>,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!();
+ let tree = self.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref())
}
fn range_rev<'r>(
&self,
- _tree: usize,
- _low: Bound<&'r [u8]>,
- _high: Bound<&'r [u8]>,
+ tree: usize,
+ low: Bound<&'r [u8]>,
+ high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
- unimplemented!();
+ let tree = self.get_tree(tree)?;
+
+ let (bounds_sql, params) = bounds_sql(low, high);
+ let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
+
+ let params = params
+ .iter()
+ .map(|x| x as &dyn rusqlite::ToSql)
+ .collect::<Vec<_>>();
+
+ TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref())
}
}
-// ----
+// ---- iterators outside transactions ----
+// complicated, they must hold the Statement and Row objects
+// therefore quite some unsafe code (it is a self-referential struct)
struct DbValueIterator<'a> {
db: MutexGuard<'a, SqliteDbInner>,
@@ -471,7 +497,78 @@ impl<'a> Iterator for DbValueIteratorPin<'a> {
}
}
-// ----
+// ---- iterators within transactions ----
+// it's the same except we don't hold a mutex guard,
+// only a Statement and a Rows object
+
+struct TxValueIterator<'a> {
+ stmt: Statement<'a>,
+ iter: Option<Rows<'a>>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> TxValueIterator<'a> {
+ fn make<P: rusqlite::Params>(
+ tx: &'a SqliteTx<'a>,
+ sql: &str,
+ args: P,
+ ) -> TxOpResult<TxValueIter<'a>> {
+ let stmt = tx.tx.prepare(sql)?;
+ let res = TxValueIterator {
+ stmt,
+ iter: None,
+ _pin: PhantomPinned,
+ };
+ let mut boxed = Box::pin(res);
+ trace!("make iterator with sql: {}", sql);
+
+ unsafe {
+ let mut stmt = NonNull::from(&boxed.stmt);
+ let iter = stmt.as_mut().query(args)?;
+
+ let mut_ref: Pin<&mut TxValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
+ }
+
+ Ok(Box::new(TxValueIteratorPin(boxed)))
+ }
+}
+
+impl<'a> Drop for TxValueIterator<'a> {
+ fn drop(&mut self) {
+ trace!("drop iter");
+ drop(self.iter.take());
+ }
+}
+
+struct TxValueIteratorPin<'a>(Pin<Box<TxValueIterator<'a>>>);
+
+impl<'a> Iterator for TxValueIteratorPin<'a> {
+ type Item = TxOpResult<(Value, Value)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = unsafe {
+ let mut_ref: Pin<&mut TxValueIterator<'a>> = Pin::as_mut(&mut self.0);
+ Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
+ };
+ let row = match next {
+ Err(e) => return Some(Err(e.into())),
+ Ok(None) => return None,
+ Ok(Some(r)) => r,
+ };
+ let k = match row.get::<_, Vec<u8>>(0) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(x) => x,
+ };
+ let v = match row.get::<_, Vec<u8>>(1) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(y) => y,
+ };
+ Some(Ok((k, v)))
+ }
+}
+
+// ---- utility ----
fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) {
let mut sql = String::new();
diff --git a/src/db/test.rs b/src/db/test.rs
index d4c875f0..3add89fb 100644
--- a/src/db/test.rs
+++ b/src/db/test.rs
@@ -10,8 +10,13 @@ fn test_suite(db: Db) {
let vb: &[u8] = &b"plip"[..];
let vc: &[u8] = &b"plup"[..];
+ // ---- test simple insert/delete ----
+
assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
+ assert_eq!(tree.len().unwrap(), 1);
+
+ // ---- test transaction logic ----
let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
@@ -37,6 +42,8 @@ fn test_suite(db: Db) {
assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
+ // ---- test iteration outside of transactions ----
+
let mut iter = tree.iter().unwrap();
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
@@ -73,6 +80,48 @@ fn test_suite(db: Db) {
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
assert!(iter.next().is_none());
drop(iter);
+
+ // ---- test iteration within transactions ----
+
+ db.transaction::<_, (), _>(|tx| {
+ let mut iter = tx.iter(&tree).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ Ok(())
+ })
+ .unwrap();
+
+ db.transaction::<_, (), _>(|tx| {
+ let mut iter = tx.range(&tree, kint..).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ assert!(iter.next().is_none());
+ Ok(())
+ })
+ .unwrap();
+
+ db.transaction::<_, (), _>(|tx| {
+ let mut iter = tx.range_rev(&tree, ..kint).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ Ok(())
+ })
+ .unwrap();
+
+ db.transaction::<_, (), _>(|tx| {
+ let mut iter = tx.iter_rev(&tree).unwrap();
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
+ let next = iter.next().unwrap().unwrap();
+ assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
+ assert!(iter.next().is_none());
+ Ok(())
+ })
+ .unwrap();
}
#[test]