aboutsummaryrefslogtreecommitdiff
path: root/src/block/resync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r--src/block/resync.rs46
1 files changed, 27 insertions, 19 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 7221b093..ab4604ad 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -15,7 +15,6 @@ use opentelemetry::{
};
use garage_db as db;
-use garage_db::counted_tree_hack::CountedTree;
use garage_util::background::*;
use garage_util::data::*;
@@ -47,9 +46,9 @@ pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
- pub(crate) queue: CountedTree,
+ pub(crate) queue: db::Tree,
pub(crate) notify: Arc<Notify>,
- pub(crate) errors: CountedTree,
+ pub(crate) errors: db::Tree,
busy_set: BusySet,
@@ -90,12 +89,10 @@ impl BlockResyncManager {
let queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let queue = CountedTree::new(queue).expect("Could not count block_local_resync_queue");
let errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
- let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
@@ -110,16 +107,12 @@ impl BlockResyncManager {
/// Get lenght of resync queue
pub fn queue_len(&self) -> Result<usize, Error> {
- // This currently can't return an error because the CountedTree hack
- // doesn't error on .len(), but this will change when we remove the hack
- // (hopefully someday!)
- Ok(self.queue.len())
+ Ok(self.queue.len()?)
}
/// Get number of blocks that have an error
pub fn errors_len(&self) -> Result<usize, Error> {
- // (see queue_len comment)
- Ok(self.errors.len())
+ Ok(self.errors.len()?)
}
/// Clear the error counter for a block and put it in queue immediately
@@ -180,7 +173,7 @@ impl BlockResyncManager {
// deleted once the garbage collection delay has passed.
//
// Here are some explanations on how the resync queue works.
- // There are two Sled trees that are used to have information
+ // There are two db trees that are used to have information
// about the status of blocks that need to be resynchronized:
//
// - resync.queue: a tree that is ordered first by a timestamp
@@ -374,10 +367,17 @@ impl BlockResyncManager {
}
if exists && rc.is_deletable() {
+ if manager.rc.recalculate_rc(hash)?.0 > 0 {
+ return Err(Error::Message(format!(
+ "Refcount for block {:?} was inconsistent, retrying later",
+ hash
+ )));
+ }
+
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.write_nodes(hash);
+ let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -385,7 +385,7 @@ impl BlockResyncManager {
let who_needs_resps = manager
.system
- .rpc
+ .rpc_helper()
.call_many(
&manager.endpoint,
&who,
@@ -431,10 +431,10 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes);
manager
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&manager.endpoint,
- &need_nodes[..],
+ &need_nodes,
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
.with_quorum(need_nodes.len()),
@@ -462,7 +462,15 @@ impl BlockResyncManager {
let block_data = manager
.rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
- .await?;
+ .await;
+ if matches!(block_data, Err(Error::MissingBlock(_))) {
+ warn!(
+ "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
+ hash
+ );
+ manager.rc.recalculate_rc(hash)?;
+ }
+ let block_data = block_data?;
manager.metrics.resync_recv_counter.add(1);
@@ -543,9 +551,9 @@ impl Worker for ResyncWorker {
Ok(WorkerState::Idle)
}
Err(e) => {
- // The errors that we have here are only Sled errors
+ // The errors that we have here are only db errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
+ // (there is kind of an assumption that the db won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
// Here we just give the error to the worker manager,
// it will print it to the logs and increment a counter