aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block/manager.rs101
1 files changed, 96 insertions, 5 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 3215d27e..9cf72019 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -86,7 +86,7 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- pub rc: BlockRc,
+ rc: BlockRc,
resync_queue: SledCountedTree,
resync_notify: Notify,
@@ -231,7 +231,10 @@ impl BlockManager {
// so that we can offload them if necessary and then delete them locally.
self.for_each_file(
(),
- move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
+ move |_, hash| async move {
+ self.put_to_resync(&hash, Duration::from_secs(0))
+ .map_err(Into::into)
+ },
must_exit,
)
.await
@@ -410,6 +413,77 @@ impl BlockManager {
// ---- Resync loop ----
+ // This part manages a queue of blocks that need to be
+ // "resynchronized", i.e. that need to have a check that
+ // they are at present if we need them, or that they are
+ // deleted once the garbage collection delay has passed.
+ //
+ // Here are some explanations on how the resync queue works.
+ // There are two Sled trees that are used to have information
+ // about the status of blocks that need to be resynchronized:
+ //
+ // - resync_queue: a tree that is ordered first by a timestamp
+ // (in milliseconds since Unix epoch) that is the time at which
+ // the resync must be done, and second by block hash.
+ // The key in this tree is just:
+ // concat(timestamp (8 bytes), hash (32 bytes))
+ // The value is the same 32-byte hash.
+ //
+ // - resync_errors: a tree that indicates for each block
+ // if the last resync resulted in an error, and if so,
+ // the following two informations (see the ErrorCounter struct):
+ // - how many consecutive resync errors for this block?
+ // - when was the last try?
+ // These two informations are used to implement an
+ // exponential backoff retry strategy.
+ // The key in this tree is the 32-byte hash of the block,
+ // and the value is the encoded ErrorCounter value.
+ //
+ // We need to have these two trees, because the resync queue
+ // is not just a queue of items to process, but a set of items
+ // that are waiting a specific delay until we can process them
+ // (the delay being necessary both internally for the exponential
+ // backoff strategy, and exposed as a parameter when adding items
+ // to the queue, e.g. to wait until the GC delay has passed).
+ // This is why we need one tree ordered by time, and one
+ // ordered by identifier of item to be processed (block hash).
+ //
+ // When the worker wants to process an item it takes from
+ // resync_queue, it checks in resync_errors that if there is an
+ // exponential back-off delay to await, it has passed before we
+ // process the item. If not, the item in the queue is skipped
+ // (but added back for later processing after the time of the
+ // delay).
+ //
+ // An alternative that would have seemed natural is to
+ // only add items to resync_queue with a processing time that is
+ // after the delay, but there are several issues with this:
+ // - This requires to synchronize updates to resync_queue and
+ // resync_errors (with the current model, there is only one thread,
+ // the worker thread, that accesses resync_errors,
+ // so no need to synchronize) by putting them both in a lock.
+ // This would mean that block_incref might need to take a lock
+ // before doing its thing, meaning it has much more chances of
+ // not completing successfully if something bad happens to Garage.
+ // Currently Garage is not able to recover from block_incref that
+ // doesn't complete successfully, because it is necessary to ensure
+ // the consistency between the state of the block manager and
+ // information in the BlockRef table.
+ // - If a resync fails, we put that block in the resync_errors table,
+ // and also add it back to resync_queue to be processed after
+ // the exponential back-off delay,
+ // but maybe the block is already scheduled to be resynced again
+ // at another time that is before the exponential back-off delay,
+ // and we have no way to check that easily. This means that
+ // in all cases, we need to check the resync_errors table
+ // in the resync loop at the time when a block is popped from
+ // the resync_queue.
+ // Overall, the current design is therefore simpler and more robust
+ // because it tolerates inconsistencies between the resync_queue
+ // and resync_errors table (items being scheduled in resync_queue
+ // for times that are earlier than the exponential back-off delay
+ // is a natural condition that is handled properly).
+
fn spawn_background_worker(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
@@ -421,12 +495,12 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
- fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -461,7 +535,14 @@ impl BlockManager {
}
}
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
+ // The result of resync_iter is:
+ // - Ok(true) -> a block was processed (successfully or not)
+ // - Ok(false) -> no block was processed, but we are ready for the next iteration
+ // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
+ async fn resync_iter(
+ &self,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<bool, sled::Error> {
if let Some(first_pair_res) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_pair_res?;
@@ -480,6 +561,8 @@ impl BlockManager {
self.put_to_resync_at(&hash, ec.next_try())?;
// ec.next_try() > now >= time_msec, so this remove
// is not removing the one we added just above
+ // (we want to do the remove after the insert to ensure
+ // that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
return Ok(false);
}
@@ -539,7 +622,15 @@ impl BlockManager {
Ok(false)
}
} else {
+ // Here we wait either for a notification that an item has been
+ // added to the queue, or for a constant delay of 10 secs to expire.
+ // The delay avoids a race condition where the notification happens
+ // between the time we checked the queue and the first poll
+ // to resync_notify.notified(): if that happens, we'll just loop
+ // back 10 seconds later, which is fine.
+ let delay = tokio::time::sleep(Duration::from_secs(10));
select! {
+ _ = delay.fuse() => {},
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}