diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-02 23:14:10 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-02 23:14:10 +0200 |
commit | c439cb11a908ee5405ed0a3a721e9c8c0e299ad2 (patch) | |
tree | 65835f8c9b47d0670d1461fc43f358fc73850414 /src | |
parent | 364061453cd0a5a2e4f76f4194e6b5887aae7ed8 (diff) | |
download | garage-c439cb11a908ee5405ed0a3a721e9c8c0e299ad2.tar.gz garage-c439cb11a908ee5405ed0a3a721e9c8c0e299ad2.zip |
Sqlite iter with unsafe code
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 14 | ||||
-rw-r--r-- | src/db/Cargo.toml | 1 | ||||
-rw-r--r-- | src/db/lib.rs | 6 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 94 | ||||
-rw-r--r-- | src/garage/repair.rs | 57 | ||||
-rw-r--r-- | src/model/migrate.rs | 4 | ||||
-rw-r--r-- | src/table/merkle.rs | 46 |
7 files changed, 162 insertions, 60 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 50039d2b..f34d13d0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -547,9 +547,7 @@ impl BlockManager { // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> { - if let Some(first_pair_res) = self.resync_queue.iter()?.next() { - let (time_bytes, hash_bytes) = first_pair_res?; - + if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); @@ -642,6 +640,16 @@ impl BlockManager { } } + fn resync_get_next(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, db::Error> { + match self.resync_queue.iter()?.next() { + None => Ok(None), + Some(v) => { + let (time_bytes, hash_bytes) = v?; + Ok(Some((time_bytes.into_owned(), hash_bytes.into_owned()))) + } + } + } + async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { let BlockStatus { exists, needed } = self .mutation_lock diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 34d483ec..f627208b 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -15,7 +15,6 @@ path = "lib.rs" [dependencies] err-derive = "0.3" -ouroboros = "0.15" sled = "0.34" rusqlite = "0.27" diff --git a/src/db/lib.rs b/src/db/lib.rs index ce1b1c8b..3a2e1d13 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -22,11 +22,9 @@ pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize); pub type Value<'a> = Cow<'a, [u8]>; -pub type ValueIter<'a> = - Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + 'a>; +pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>; -pub type Exporter<'a> = - Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>; +pub type Exporter<'a> = Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>; // ---- diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 320684df..bed72e6b 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -1,11 +1,12 @@ use core::ops::Bound; use std::cell::Cell; -use std::sync::{Arc, Mutex, RwLock, MutexGuard}; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; -use ouroboros::self_referencing; - -use rusqlite::{params, Connection, Transaction}; +use rusqlite::{params, Connection, Rows, Statement, Transaction}; use crate::{ Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter, @@ -114,16 +115,14 @@ impl IDb for SqliteDb { fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> { let tree = self.get_tree(tree)?; - let db = self.db.lock().unwrap(); let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - let mut stmt = db.prepare(&sql)?; - let res = stmt.query([])?; - unimplemented!(); + DbValueIterator::new(self.db.lock().unwrap(), &sql, []) } fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> { let tree = self.get_tree(tree)?; - unimplemented!(); + let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); + DbValueIterator::new(self.db.lock().unwrap(), &sql, []) } fn range<'a, 'r>( @@ -263,3 +262,80 @@ impl<'a> ITx<'a> for SqliteTx<'a> { } } +// ---- + +struct DbValueIterator<'a> { + db: Option<MutexGuard<'a, Connection>>, + stmt: Option<Statement<'a>>, + iter: Option<Rows<'a>>, + _pin: PhantomPinned, +} + +impl<'a> DbValueIterator<'a> { + fn new<P: rusqlite::Params>( + db: MutexGuard<'a, Connection>, + sql: &str, + args: P, + ) -> Result<ValueIter<'a>> { + let res = DbValueIterator { + db: Some(db), + stmt: None, + iter: None, + _pin: PhantomPinned, + }; + let mut boxed = Box::pin(res); + + unsafe { + let db = NonNull::from(&boxed.db); + let stmt = db.as_ref().as_ref().unwrap().prepare(&sql)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt); + + let mut stmt = NonNull::from(&boxed.stmt); + let iter = stmt.as_mut().as_mut().unwrap().query(args)?; + + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).iter = Some(iter); + } + + Ok(Box::new(DbValueIteratorPin(boxed))) + } +} + +struct DbValueIteratorPin<'a>(Pin<Box<DbValueIterator<'a>>>); + +impl<'a> Iterator for DbValueIteratorPin<'a> { + type Item = Result<(Value<'a>, Value<'a>)>; + + fn next(&mut self) -> Option<Self::Item> { + let next = unsafe { + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); + Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() + }; + let row = match next { + Err(e) => return Some(Err(e.into())), + Ok(None) => { + // finished, drop everything + unsafe { + let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0); + let t = Pin::get_unchecked_mut(mut_ref); + drop(t.iter.take()); + drop(t.stmt.take()); + drop(t.db.take()); + } + return None; + } + Ok(Some(r)) => r, + }; + let k = match row.get::<_, Vec<u8>>(0) { + Err(e) => return Some(Err(e.into())), + Ok(x) => x, + }; + let v = match row.get::<_, Vec<u8>>(1) { + Err(e) => return Some(Err(e.into())), + Ok(y) => y, + }; + Some(Ok((k.into(), v.into()))) + } +} diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 04d9ee72..762a8300 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -66,18 +66,10 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some(item) = self - .garage - .version_table - .data - .store - .range((Bound::Excluded(pos), Bound::Unbounded))? - .next() - { - let (item_key, item_bytes) = item?; - pos = item_key.to_vec(); + while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? { + pos = item_key; - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if version.deleted.get() { continue; } @@ -113,22 +105,30 @@ impl Repair { Ok(()) } - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some(item) = self + fn get_next_version_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> { + match self .garage - .block_ref_table + .version_table .data .store - .range((Bound::Excluded(pos), Bound::Unbounded))? + .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? .next() { - let (item_key, item_bytes) = item?; + None => Ok(None), + Some(item) => { + let (item_key, item_bytes) = item?; + Ok(Some((item_key.into_owned(), item_bytes.into_owned()))) + } + } + } - pos = item_key.to_vec(); + async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + let mut pos = vec![]; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? { + pos = item_key; + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if block_ref.deleted.get() { continue; } @@ -160,4 +160,21 @@ impl Repair { } Ok(()) } + + fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> { + match self + .garage + .block_ref_table + .data + .store + .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? + .next() + { + None => Ok(None), + Some(item) => { + let (item_key, item_bytes) = item?; + Ok(Some((item_key.into_owned(), item_bytes.into_owned()))) + } + } + } } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 1f063265..25acb4b0 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,11 +25,15 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; + let mut old_buckets = vec![]; for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; + old_buckets.push(bucket); + } + for bucket in old_buckets { if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 4b0b44ce..6e0c2f7e 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -89,36 +89,36 @@ where async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().unwrap().next() { - // TODO unwrap to remove - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; + match self.updater_loop_iter() { + Ok(true) => (), + Ok(false) => { + select! { + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, + Err(e) => { + warn!( + "({}) Error while updating Merkle tree item: {}", + F::TABLE_NAME, + e + ); + tokio::time::sleep(Duration::from_secs(10)).await; } } } } + fn updater_loop_iter(&self) -> Result<bool, Error> { + if let Some(x) = self.data.merkle_todo.iter()?.next() { + let (key, valhash) = x?; + self.update_item(&key[..], &valhash[..])?; + Ok(true) + } else { + Ok(false) + } + } + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); |