diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 19 | ||||
-rw-r--r-- | src/db/lib.rs | 9 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 16 | ||||
-rw-r--r-- | src/garage/repair.rs | 68 | ||||
-rw-r--r-- | src/table/merkle.rs | 8 |
5 files changed, 42 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index fbca74e2..53baede5 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -56,8 +56,6 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -type OptKVPair = Option<(Vec<u8>, Vec<u8>)>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -549,7 +547,12 @@ impl BlockManager { // - Ok(false) -> no block was processed, but we are ready for the next iteration // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> { - if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? { + let next = match self.resync_queue.first()? { + Some((k, v)) => Some((k.into_vec(), v.into_vec())), + None => None, + }; + + if let Some((time_bytes, hash_bytes)) = next { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); @@ -642,16 +645,6 @@ impl BlockManager { } } - fn resync_get_next(&self) -> Result<OptKVPair, db::Error> { - match self.resync_queue.iter()?.next() { - None => Ok(None), - Some(v) => { - let (time_bytes, hash_bytes) = v?; - Ok(Some((time_bytes.into_vec(), hash_bytes.into_vec()))) - } - } - } - async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { let BlockStatus { exists, needed } = self .mutation_lock diff --git a/src/db/lib.rs b/src/db/lib.rs index 1b31df43..045c16c5 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -226,6 +226,15 @@ impl Tree { self.0.len(self.1) } + pub fn first(&self) -> Result<Option<(Value<'_>, Value<'_>)>> { + self.iter()?.next().transpose() + } + pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value<'_>, Value<'_>)>> { + self.range((Bound::Excluded(from), Bound::Unbounded))? + .next() + .transpose() + } + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { self.0.insert(self.1, key.as_ref(), value.as_ref()) } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 49c07562..ab7efbf8 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -107,9 +107,9 @@ impl IDb for SqliteDb { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> { let tree = self.get_tree(tree)?; - trace!("get: lock db"); + trace!("get {}: lock db", tree); let db = self.db.lock().unwrap(); - trace!("get: lock acquired"); + trace!("get {}: lock acquired", tree); let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; @@ -122,9 +122,9 @@ impl IDb for SqliteDb { fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; - trace!("remove: lock db"); + trace!("remove {}: lock db", tree); let db = self.db.lock().unwrap(); - trace!("remove: lock acquired"); + trace!("remove {}: lock acquired", tree); let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; Ok(res > 0) @@ -133,9 +133,9 @@ impl IDb for SqliteDb { fn len(&self, tree: usize) -> Result<usize> { let tree = self.get_tree(tree)?; - trace!("len: lock db"); + trace!("len {}: lock db", tree); let db = self.db.lock().unwrap(); - trace!("len: lock acquired"); + trace!("len {}: lock acquired", tree); let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; @@ -148,9 +148,9 @@ impl IDb for SqliteDb { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; - trace!("insert: lock db"); + trace!("insert {}: lock db", tree); let db = self.db.lock().unwrap(); - trace!("insert: lock acquired"); + trace!("insert {}: lock acquired", tree); db.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 5735d6d0..a90b4148 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -1,4 +1,3 @@ -use core::ops::Bound; use std::sync::Arc; use tokio::sync::watch; @@ -16,8 +15,6 @@ pub struct Repair { pub garage: Arc<Garage>, } -type OptKVPair = Option<(Vec<u8>, Vec<u8>)>; - impl Repair { pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { if let Err(e) = self.repair_worker_aux(opt, must_exit).await { @@ -68,8 +65,15 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? { - pos = item_key; + while *must_exit.borrow() { + let item_bytes = { + let (k, v) = match self.garage.version_table.data.store.get_gt(pos)? { + Some(pair) => pair, + None => break, + }; + pos = k.into_vec(); + v.into_vec() + }; let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if version.deleted.get() { @@ -99,36 +103,22 @@ impl Repair { )) .await?; } - - if *must_exit.borrow() { - break; - } } Ok(()) } - fn get_next_version_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> { - match self - .garage - .version_table - .data - .store - .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? - .next() - { - None => Ok(None), - Some(item) => { - let (item_key, item_bytes) = item?; - Ok(Some((item_key.into_vec(), item_bytes.into_vec()))) - } - } - } - async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? { - pos = item_key; + while *must_exit.borrow() { + let item_bytes = { + let (k, v) = match self.garage.block_ref_table.data.store.get_gt(pos)? { + Some(pair) => pair, + None => break, + }; + pos = k.into_vec(); + v.into_vec() + }; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if block_ref.deleted.get() { @@ -155,29 +145,7 @@ impl Repair { }) .await?; } - - if *must_exit.borrow() { - break; - } } Ok(()) } - - #[allow(clippy::type_complexity)] - fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> { - match self - .garage - .block_ref_table - .data - .store - .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))? - .next() - { - None => Ok(None), - Some(item) => { - let (item_key, item_bytes) = item?; - Ok(Some((item_key.into_vec(), item_bytes.into_vec()))) - } - } - } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 48d2c5dd..c6653a64 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -110,13 +110,7 @@ where } fn updater_loop_iter(&self) -> Result<bool, Error> { - // TODO undo this iter hack - let mut iter = self.data.merkle_todo.iter()?; - if let Some(x) = iter.next() { - let (key, valhash) = x?; - let key = key.to_vec(); - let valhash = valhash.to_vec(); - drop(iter); + if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; Ok(true) } else { |