From c439cb11a908ee5405ed0a3a721e9c8c0e299ad2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 2 Jun 2022 23:14:10 +0200 Subject: Sqlite iter with unsafe code --- Cargo.lock | 43 ---------------------- src/block/manager.rs | 14 ++++++-- src/db/Cargo.toml | 1 - src/db/lib.rs | 6 ++-- src/db/sqlite_adapter.rs | 94 +++++++++++++++++++++++++++++++++++++++++++----- src/garage/repair.rs | 57 ++++++++++++++++++----------- src/model/migrate.rs | 4 +++ src/table/merkle.rs | 46 ++++++++++++------------ 8 files changed, 162 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b8c84e8..d5309a79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "Inflector" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" - [[package]] name = "ahash" version = "0.7.6" @@ -28,12 +22,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "aliasable" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" - [[package]] name = "anyhow" version = "1.0.56" @@ -1028,7 +1016,6 @@ version = "0.8.0" dependencies = [ "err-derive 0.3.1", "mktemp", - "ouroboros", "rusqlite", "sled", ] @@ -2211,30 +2198,6 @@ version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" -[[package]] -name = "ouroboros" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f31a3b678685b150cba82b702dcdc5e155893f63610cf388d30cd988d4ca2bf" -dependencies = [ - "aliasable", - "ouroboros_macro", - "stable_deref_trait", -] - -[[package]] -name = "ouroboros_macro" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084fd65d5dd8b3772edccb5ffd1e4b7eba43897ecd0f9401e330e8c542959408" -dependencies = [ - "Inflector", - "proc-macro-error", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "parking_lot" version = "0.11.2" @@ -3104,12 +3067,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_init" version = "1.0.2" diff --git a/src/block/manager.rs b/src/block/manager.rs index 50039d2b..f34d13d0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -547,9 +547,7 @@ impl BlockManager { // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { - if let Some(first_pair_res) = self.resync_queue.iter()?.next() { - let (time_bytes, hash_bytes) = first_pair_res?; - + if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); @@ -642,6 +640,16 @@ impl BlockManager { } } + fn resync_get_next(&self) -> Result, Vec)>, db::Error> { + match self.resync_queue.iter()?.next() { + None => Ok(None), + Some(v) => { + let (time_bytes, hash_bytes) = v?; + Ok(Some((time_bytes.into_owned(), hash_bytes.into_owned()))) + } + } + } + async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { let BlockStatus { exists, needed } = self .mutation_lock diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 34d483ec..f627208b 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -15,7 +15,6 @@ path = "lib.rs" [dependencies] err-derive = "0.3" -ouroboros = "0.15" sled = "0.34" rusqlite = "0.27" diff --git a/src/db/lib.rs b/src/db/lib.rs index ce1b1c8b..3a2e1d13 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -22,11 +22,9 @@ pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); pub struct Tree(pub(crate) Arc, pub(crate) usize); pub type Value<'a> = Cow<'a, [u8]>; -pub type ValueIter<'a> = - Box, Value<'a>)>> + Send + 'a>; +pub type ValueIter<'a> = Box, Value<'a>)>> + 'a>; -pub type Exporter<'a> = - Box)>> + 'a>; +pub type Exporter<'a> = Box)>> + 'a>; // ---- diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 320684df..bed72e6b 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,11 +1,12 @@ use core::ops::Bound; use std::cell::Cell; -use std::sync::{Arc, Mutex, RwLock, MutexGuard}; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; -use ouroboros::self_referencing; - -use rusqlite::{params, Connection, Transaction}; +use rusqlite::{params, Connection, Rows, Statement, Transaction}; use crate::{ Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter, @@ -114,16 +115,14 @@ impl IDb for SqliteDb { fn iter<'a>(&'a self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; - let db = self.db.lock().unwrap(); let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - let mut stmt = db.prepare(&sql)?; - let res = stmt.query([])?; - unimplemented!(); + DbValueIterator::new(self.db.lock().unwrap(), &sql, []) } fn iter_rev<'a>(&'a self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; - unimplemented!(); + let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); + DbValueIterator::new(self.db.lock().unwrap(), &sql, []) } fn range<'a, 'r>( @@ -263,3 +262,80 @@ impl<'a> ITx<'a> for SqliteTx<'a> { } } +// ---- + +struct DbValueIterator<'a> { + db: Option>, + stmt: Option>, + iter: Option>, + _pin: PhantomPinned, +} + +impl<'a> DbValueIterator<'a> { + fn new( + db: MutexGuard<'a, Connection>, + sql: &str, + args: P, + ) -> Result> { + let res = DbValueIterator { + db: Some(db), + stmt: None, + iter: None, + _pin: PhantomPinned, + }; + let mut boxed = Box::pin(res); + + unsafe { + let db = NonNull::from(&boxed.db); + let stmt = db.as_ref().as_ref().unwrap().prepare(&sql)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); + + let mut stmt = NonNull::from(&boxed.stmt); + let iter = stmt.as_mut().as_mut().unwrap().query(args)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).iter = Some(iter); + } + + Ok(Box::new(DbValueIteratorPin(boxed))) + } +} + +struct DbValueIteratorPin<'a>(Pin>>); + +impl<'a> Iterator for DbValueIteratorPin<'a> { + type Item = Result<(Value<'a>, Value<'a>)>; + + fn next(&mut self) -> Option { + let next = unsafe { + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); + Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() + }; + let row = match next { + Err(e) => return Some(Err(e.into())), + Ok(None) => { + // finished, drop everything + unsafe { + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); + let t = Pin::get_unchecked_mut(mut_ref); + drop(t.iter.take()); + drop(t.stmt.take()); + drop(t.db.take()); + } + return None; + } + Ok(Some(r)) => r, + }; + let k = match row.get::<_, Vec>(0) { + Err(e) => return Some(Err(e.into())), + Ok(x) => x, + }; + let v = match row.get::<_, Vec>(1) { + Err(e) => return Some(Err(e.into())), + Ok(y) => y, + }; + Some(Ok((k.into(), v.into()))) + } +} diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 04d9ee72..762a8300 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -66,18 +66,10 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { let mut pos = vec![]; - while let Some(item) = self - .garage - .version_table - .data - .store - .range((Bound::Excluded(pos), Bound::Unbounded))? - .next() - { - let (item_key, item_bytes) = item?; - pos = item_key.to_vec(); + while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? { + pos = item_key; - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if version.deleted.get() { continue; } @@ -113,22 +105,30 @@ impl Repair { Ok(()) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some(item) = self + fn get_next_version_after(&self, pos: &[u8]) -> Result, Vec)>, Error> { + match self .garage - .block_ref_table + .version_table .data .store - .range((Bound::Excluded(pos), Bound::Unbounded))? + .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? .next() { - let (item_key, item_bytes) = item?; + None => Ok(None), + Some(item) => { + let (item_key, item_bytes) = item?; + Ok(Some((item_key.into_owned(), item_bytes.into_owned()))) + } + } + } - pos = item_key.to_vec(); + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? { + pos = item_key; + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if block_ref.deleted.get() { continue; } @@ -160,4 +160,21 @@ impl Repair { } Ok(()) } + + fn get_next_block_ref_after(&self, pos: &[u8]) -> Result, Vec)>, Error> { + match self + .garage + .block_ref_table + .data + .store + .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? + .next() + { + None => Ok(None), + Some(item) => { + let (item_key, item_bytes) = item?; + Ok(Some((item_key.into_owned(), item_bytes.into_owned()))) + } + } + } } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 1f063265..25acb4b0 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,11 +25,15 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; + let mut old_buckets = vec![]; for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; + old_buckets.push(bucket); + } + for bucket in old_buckets { if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 4b0b44ce..6e0c2f7e 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -89,36 +89,36 @@ where async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().unwrap().next() { - // TODO unwrap to remove - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; + match self.updater_loop_iter() { + Ok(true) => (), + Ok(false) => { + select! { + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, + Err(e) => { + warn!( + "({}) Error while updating Merkle tree item: {}", + F::TABLE_NAME, + e + ); + tokio::time::sleep(Duration::from_secs(10)).await; } } } } + fn updater_loop_iter(&self) -> Result { + if let Some(x) = self.data.merkle_todo.iter()?.next() { + let (key, valhash) = x?; + self.update_item(&key[..], &valhash[..])?; + Ok(true) + } else { + Ok(false) + } + } + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); -- cgit v1.2.3