aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block/manager.rs54
-rw-r--r--src/garage/admin.rs44
-rw-r--r--src/table/data.rs4
-rw-r--r--src/table/merkle.rs8
4 files changed, 70 insertions, 40 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index b7dcaf8a..ea984646 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,5 @@
+use core::ops::Bound;
+
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -218,19 +220,35 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- // TODO don't do this like this
- let mut hashes = vec![];
- for (i, entry) in self.rc.rc.iter()?.enumerate() {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- hashes.push(hash);
- if i & 0xFF == 0 && *must_exit.borrow() {
- return Ok(());
+ let mut next_start: Option<Hash> = None;
+ loop {
+ let mut batch_of_hashes = vec![];
+ let start_bound = match next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
}
- }
- for (i, hash) in hashes.into_iter().enumerate() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- if i & 0xFF == 0 && *must_exit.borrow() {
+ if batch_of_hashes.is_empty() {
+ break;
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.put_to_resync(&hash, Duration::from_secs(0))?;
+ next_start = Some(hash)
+ }
+
+ if *must_exit.borrow() {
return Ok(());
}
}
@@ -271,18 +289,18 @@ impl BlockManager {
}
/// Get lenght of resync queue
- pub fn resync_queue_len(&self) -> usize {
- self.resync_queue.len().unwrap() // TODO fix unwrap
+ pub fn resync_queue_len(&self) -> Result<usize, Error> {
+ Ok(self.resync_queue.len()?)
}
/// Get number of blocks that have an error
- pub fn resync_errors_len(&self) -> usize {
- self.resync_errors.len().unwrap() // TODO fix unwrap
+ pub fn resync_errors_len(&self) -> Result<usize, Error> {
+ Ok(self.resync_errors.len()?)
}
/// Get number of items in the refcount table
- pub fn rc_len(&self) -> usize {
- self.rc.rc.len().unwrap() // TODO fix unwrap
+ pub fn rc_len(&self) -> Result<usize, Error> {
+ Ok(self.rc.rc.len()?)
}
//// ----- Managing the reference counter ----
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index cce88b35..3af8b046 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -660,11 +660,11 @@ impl AdminRpcHandler {
}
Ok(AdminRpc::Ok(ret))
} else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)))
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
- fn gather_stats_local(&self, opt: StatsOpt) -> String {
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
@@ -689,59 +689,71 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt);
+ self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed {
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()
+ self.garage.block_manager.rc_len()?
)
.unwrap();
}
writeln!(
&mut ret,
" resync queue length: {}",
- self.garage.block_manager.resync_queue_len()
+ self.garage.block_manager.resync_queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
- self.garage.block_manager.resync_errors_len()
+ self.garage.block_manager.resync_errors_len()?
)
.unwrap();
- ret
+ Ok(ret)
}
- fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt)
+ fn gather_table_stats<F, R>(
+ &self,
+ to: &mut String,
+ t: &Arc<Table<F, R>>,
+ opt: &StatsOpt,
+ ) -> Result<(), Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed {
- writeln!(to, " number of items: {}", t.data.store.len().unwrap()).unwrap(); // TODO fix len unwrap
+ writeln!(
+ to,
+ " number of items: {}",
+ t.data.store.len().map_err(GarageError::from)?
+ )
+ .unwrap();
writeln!(
to,
" Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()
+ t.merkle_updater.merkle_tree_len()?
)
.unwrap();
}
writeln!(
to,
" Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()
+ t.merkle_updater.todo_len()?
)
.unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+
+ Ok(())
}
}
diff --git a/src/table/data.rs b/src/table/data.rs
index e688168f..839dae94 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -318,7 +318,7 @@ where
}
}
- pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len().unwrap() // TODO fix unwrap
+ pub fn gc_todo_len(&self) -> Result<usize, Error> {
+ Ok(self.gc_todo.len()?)
}
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e7f2442e..7685b193 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -316,12 +316,12 @@ where
MerkleNode::decode_opt(&ent)
}
- pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len().unwrap() // TODO fix unwrap
+ pub fn merkle_tree_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_tree.len()?)
}
- pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len().unwrap() // TODO fix unwrap
+ pub fn todo_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_todo.len()?)
}
}