aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs19
-rw-r--r--src/db/lib.rs9
-rw-r--r--src/db/sqlite_adapter.rs16
-rw-r--r--src/garage/repair.rs68
-rw-r--r--src/table/merkle.rs8
5 files changed, 42 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index fbca74e2..53baede5 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -56,8 +56,6 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
-type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
-
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@@ -549,7 +547,12 @@ impl BlockManager {
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
- if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? {
+ let next = match self.resync_queue.first()? {
+ Some((k, v)) => Some((k.into_vec(), v.into_vec())),
+ None => None,
+ };
+
+ if let Some((time_bytes, hash_bytes)) = next {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -642,16 +645,6 @@ impl BlockManager {
}
}
- fn resync_get_next(&self) -> Result<OptKVPair, db::Error> {
- match self.resync_queue.iter()?.next() {
- None => Ok(None),
- Some(v) => {
- let (time_bytes, hash_bytes) = v?;
- Ok(Some((time_bytes.into_vec(), hash_bytes.into_vec())))
- }
- }
- }
-
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = self
.mutation_lock
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 1b31df43..045c16c5 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -226,6 +226,15 @@ impl Tree {
self.0.len(self.1)
}
+ pub fn first(&self) -> Result<Option<(Value<'_>, Value<'_>)>> {
+ self.iter()?.next().transpose()
+ }
+ pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value<'_>, Value<'_>)>> {
+ self.range((Bound::Excluded(from), Bound::Unbounded))?
+ .next()
+ .transpose()
+ }
+
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
self.0.insert(self.1, key.as_ref(), value.as_ref())
}
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 49c07562..ab7efbf8 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -107,9 +107,9 @@ impl IDb for SqliteDb {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
- trace!("get: lock db");
+ trace!("get {}: lock db", tree);
let db = self.db.lock().unwrap();
- trace!("get: lock acquired");
+ trace!("get {}: lock acquired", tree);
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
@@ -122,9 +122,9 @@ impl IDb for SqliteDb {
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
- trace!("remove: lock db");
+ trace!("remove {}: lock db", tree);
let db = self.db.lock().unwrap();
- trace!("remove: lock acquired");
+ trace!("remove {}: lock acquired", tree);
let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
Ok(res > 0)
@@ -133,9 +133,9 @@ impl IDb for SqliteDb {
fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
- trace!("len: lock db");
+ trace!("len {}: lock db", tree);
let db = self.db.lock().unwrap();
- trace!("len: lock acquired");
+ trace!("len {}: lock acquired", tree);
let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
@@ -148,9 +148,9 @@ impl IDb for SqliteDb {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
- trace!("insert: lock db");
+ trace!("insert {}: lock db", tree);
let db = self.db.lock().unwrap();
- trace!("insert: lock acquired");
+ trace!("insert {}: lock acquired", tree);
db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 5735d6d0..a90b4148 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -1,4 +1,3 @@
-use core::ops::Bound;
use std::sync::Arc;
use tokio::sync::watch;
@@ -16,8 +15,6 @@ pub struct Repair {
pub garage: Arc<Garage>,
}
-type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
-
impl Repair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
@@ -68,8 +65,15 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? {
- pos = item_key;
+ while *must_exit.borrow() {
+ let item_bytes = {
+ let (k, v) = match self.garage.version_table.data.store.get_gt(pos)? {
+ Some(pair) => pair,
+ None => break,
+ };
+ pos = k.into_vec();
+ v.into_vec()
+ };
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
@@ -99,36 +103,22 @@ impl Repair {
))
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
Ok(())
}
- fn get_next_version_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
- match self
- .garage
- .version_table
- .data
- .store
- .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
- .next()
- {
- None => Ok(None),
- Some(item) => {
- let (item_key, item_bytes) = item?;
- Ok(Some((item_key.into_vec(), item_bytes.into_vec())))
- }
- }
- }
-
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? {
- pos = item_key;
+ while *must_exit.borrow() {
+ let item_bytes = {
+ let (k, v) = match self.garage.block_ref_table.data.store.get_gt(pos)? {
+ Some(pair) => pair,
+ None => break,
+ };
+ pos = k.into_vec();
+ v.into_vec()
+ };
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
@@ -155,29 +145,7 @@ impl Repair {
})
.await?;
}
-
- if *must_exit.borrow() {
- break;
- }
}
Ok(())
}
-
- #[allow(clippy::type_complexity)]
- fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
- match self
- .garage
- .block_ref_table
- .data
- .store
- .range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
- .next()
- {
- None => Ok(None),
- Some(item) => {
- let (item_key, item_bytes) = item?;
- Ok(Some((item_key.into_vec(), item_bytes.into_vec())))
- }
- }
- }
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 48d2c5dd..c6653a64 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -110,13 +110,7 @@ where
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
- // TODO undo this iter hack
- let mut iter = self.data.merkle_todo.iter()?;
- if let Some(x) = iter.next() {
- let (key, valhash) = x?;
- let key = key.to_vec();
- let valhash = valhash.to_vec();
- drop(iter);
+ if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
Ok(true)
} else {