aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-02 23:14:10 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-02 23:14:10 +0200
commitc439cb11a908ee5405ed0a3a721e9c8c0e299ad2 (patch)
tree65835f8c9b47d0670d1461fc43f358fc73850414
parent364061453cd0a5a2e4f76f4194e6b5887aae7ed8 (diff)
downloadgarage-c439cb11a908ee5405ed0a3a721e9c8c0e299ad2.tar.gz
garage-c439cb11a908ee5405ed0a3a721e9c8c0e299ad2.zip
Sqlite iter with unsafe code
-rw-r--r--Cargo.lock43
-rw-r--r--src/block/manager.rs14
-rw-r--r--src/db/Cargo.toml1
-rw-r--r--src/db/lib.rs6
-rw-r--r--src/db/sqlite_adapter.rs94
-rw-r--r--src/garage/repair.rs57
-rw-r--r--src/model/migrate.rs4
-rw-r--r--src/table/merkle.rs46
8 files changed, 162 insertions, 103 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7b8c84e8..d5309a79 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,12 +3,6 @@
version = 3
[[package]]
-name = "Inflector"
-version = "0.11.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
-
-[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -29,12 +23,6 @@ dependencies = [
]
[[package]]
-name = "aliasable"
-version = "0.1.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
-
-[[package]]
name = "anyhow"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1028,7 +1016,6 @@ version = "0.8.0"
dependencies = [
"err-derive 0.3.1",
"mktemp",
- "ouroboros",
"rusqlite",
"sled",
]
@@ -2212,30 +2199,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
[[package]]
-name = "ouroboros"
-version = "0.15.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f31a3b678685b150cba82b702dcdc5e155893f63610cf388d30cd988d4ca2bf"
-dependencies = [
- "aliasable",
- "ouroboros_macro",
- "stable_deref_trait",
-]
-
-[[package]]
-name = "ouroboros_macro"
-version = "0.15.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "084fd65d5dd8b3772edccb5ffd1e4b7eba43897ecd0f9401e330e8c542959408"
-dependencies = [
- "Inflector",
- "proc-macro-error",
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3105,12 +3068,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
[[package]]
-name = "stable_deref_trait"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
-
-[[package]]
name = "static_init"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 50039d2b..f34d13d0 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -547,9 +547,7 @@ impl BlockManager {
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
- if let Some(first_pair_res) = self.resync_queue.iter()?.next() {
- let (time_bytes, hash_bytes) = first_pair_res?;
-
+ if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -642,6 +640,16 @@ impl BlockManager {
}
}
+ fn resync_get_next(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, db::Error> {
+ match self.resync_queue.iter()?.next() {
+ None => Ok(None),
+ Some(v) => {
+ let (time_bytes, hash_bytes) = v?;
+ Ok(Some((time_bytes.into_owned(), hash_bytes.into_owned())))
+ }
+ }
+ }
+
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = self
.mutation_lock
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 34d483ec..f627208b 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
err-derive = "0.3"
-ouroboros = "0.15"
sled = "0.34"
rusqlite = "0.27"
diff --git a/src/db/lib.rs b/src/db/lib.rs
index ce1b1c8b..3a2e1d13 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -22,11 +22,9 @@ pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
pub type Value<'a> = Cow<'a, [u8]>;
-pub type ValueIter<'a> =
- Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + 'a>;
+pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>;
-pub type Exporter<'a> =
- Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>;
+pub type Exporter<'a> = Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>;
// ----
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 320684df..bed72e6b 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -1,11 +1,12 @@
use core::ops::Bound;
use std::cell::Cell;
-use std::sync::{Arc, Mutex, RwLock, MutexGuard};
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::{Arc, Mutex, MutexGuard, RwLock};
-use ouroboros::self_referencing;
-
-use rusqlite::{params, Connection, Transaction};
+use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{
Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
@@ -114,16 +115,14 @@ impl IDb for SqliteDb {
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
- let db = self.db.lock().unwrap();
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
- let mut stmt = db.prepare(&sql)?;
- let res = stmt.query([])?;
- unimplemented!();
+ DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
}
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
- unimplemented!();
+ let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
+ DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
}
fn range<'a, 'r>(
@@ -263,3 +262,80 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
}
}
+// ----
+
+struct DbValueIterator<'a> {
+ db: Option<MutexGuard<'a, Connection>>,
+ stmt: Option<Statement<'a>>,
+ iter: Option<Rows<'a>>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> DbValueIterator<'a> {
+ fn new<P: rusqlite::Params>(
+ db: MutexGuard<'a, Connection>,
+ sql: &str,
+ args: P,
+ ) -> Result<ValueIter<'a>> {
+ let res = DbValueIterator {
+ db: Some(db),
+ stmt: None,
+ iter: None,
+ _pin: PhantomPinned,
+ };
+ let mut boxed = Box::pin(res);
+
+ unsafe {
+ let db = NonNull::from(&boxed.db);
+ let stmt = db.as_ref().as_ref().unwrap().prepare(&sql)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
+
+ let mut stmt = NonNull::from(&boxed.stmt);
+ let iter = stmt.as_mut().as_mut().unwrap().query(args)?;
+
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
+ Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
+ }
+
+ Ok(Box::new(DbValueIteratorPin(boxed)))
+ }
+}
+
+struct DbValueIteratorPin<'a>(Pin<Box<DbValueIterator<'a>>>);
+
+impl<'a> Iterator for DbValueIteratorPin<'a> {
+ type Item = Result<(Value<'a>, Value<'a>)>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = unsafe {
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
+ Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
+ };
+ let row = match next {
+ Err(e) => return Some(Err(e.into())),
+ Ok(None) => {
+ // finished, drop everything
+ unsafe {
+ let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
+ let t = Pin::get_unchecked_mut(mut_ref);
+ drop(t.iter.take());
+ drop(t.stmt.take());
+ drop(t.db.take());
+ }
+ return None;
+ }
+ Ok(Some(r)) => r,
+ };
+ let k = match row.get::<_, Vec<u8>>(0) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(x) => x,
+ };
+ let v = match row.get::<_, Vec<u8>>(1) {
+ Err(e) => return Some(Err(e.into())),
+ Ok(y) => y,
+ };
+ Some(Ok((k.into(), v.into())))
+ }
+}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 04d9ee72..762a8300 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -66,18 +66,10 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some(item) = self
- .garage
- .version_table
- .data
- .store
- .range((Bound::Excluded(pos), Bound::Unbounded))?
- .next()
- {
- let (item_key, item_bytes) = item?;
- pos = item_key.to_vec();
+ while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? {
+ pos = item_key;
- let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
continue;
}
@@ -113,22 +105,30 @@ impl Repair {
Ok(())
}
- async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
-
- while let Some(item) = self
+ fn get_next_version_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
+ match self
.garage
- .block_ref_table
+ .version_table
.data
.store
- .range((Bound::Excluded(pos), Bound::Unbounded))?
+ .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
- let (item_key, item_bytes) = item?;
+ None => Ok(None),
+ Some(item) => {
+ let (item_key, item_bytes) = item?;
+ Ok(Some((item_key.into_owned(), item_bytes.into_owned())))
+ }
+ }
+ }
- pos = item_key.to_vec();
+ async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ let mut pos = vec![];
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
+ while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? {
+ pos = item_key;
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
continue;
}
@@ -160,4 +160,21 @@ impl Repair {
}
Ok(())
}
+
+ fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
+ match self
+ .garage
+ .block_ref_table
+ .data
+ .store
+ .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
+ .next()
+ {
+ None => Ok(None),
+ Some(item) => {
+ let (item_key, item_bytes) = item?;
+ Ok(Some((item_key.into_owned(), item_bytes.into_owned())))
+ }
+ }
+ }
}
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 1f063265..25acb4b0 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -25,11 +25,15 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
+ let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;
+ old_buckets.push(bucket);
+ }
+ for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 4b0b44ce..6e0c2f7e 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -89,36 +89,36 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().unwrap().next() {
- // TODO unwrap to remove
- match x {
- Ok((key, valhash)) => {
- if let Err(e) = self.update_item(&key[..], &valhash[..]) {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while iterating on Merkle todo tree: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
+ match self.updater_loop_iter() {
+ Ok(true) => (),
+ Ok(false) => {
+ select! {
+ _ = self.data.merkle_todo_notify.notified().fuse() => {},
+ _ = must_exit.changed().fuse() => {},
}
}
- } else {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
+ Err(e) => {
+ warn!(
+ "({}) Error while updating Merkle tree item: {}",
+ F::TABLE_NAME,
+ e
+ );
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
+ fn updater_loop_iter(&self) -> Result<bool, Error> {
+ if let Some(x) = self.data.merkle_todo.iter()?.next() {
+ let (key, valhash) = x?;
+ self.update_item(&key[..], &valhash[..])?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
let khash = blake2sum(k);