aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-01 14:55:37 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 11:51:37 +0100
commitd78bf379fb85c0264c9971a26724f8b933a234ee (patch)
tree3d899b7e5ead760df0b0af6103b4579da88425be /src/model/block.rs
parentf7e6f4616f79726674d0fa5e1b19b06cacdc3a2a (diff)
downloadgarage-d78bf379fb85c0264c9971a26724f8b933a234ee.tar.gz
garage-d78bf379fb85c0264c9971a26724f8b933a234ee.zip
Fix resync queue to not drop items
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs37
1 files changed, 21 insertions, 16 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index f0814eda..8329bb6f 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -562,9 +562,12 @@ impl BlockManager {
}
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
- if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
+ if let Some(first_pair_res) = self.resync_queue.iter().next() {
+ let (time_bytes, hash_bytes) = first_pair_res?;
+
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
+
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
@@ -575,6 +578,9 @@ impl BlockManager {
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?;
+ // ec.next_try() > now >= time_msec, so this remove
+ // is not removing the one we added just above
+ self.resync_queue.remove(time_bytes)?;
return Ok(false);
}
}
@@ -605,20 +611,25 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? {
- Some(ec) => ErrorCounter::decode(ec).add1(),
- None => ErrorCounter::new(),
+ Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
+ None => ErrorCounter::new(now + 1),
};
- self.put_to_resync_at(&hash, err_counter.next_try())?;
self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?;
+
+ self.put_to_resync_at(&hash, err_counter.next_try())?;
+ // err_counter.next_try() >= now + 1 > now,
+ // the entry we remove from the queue is not
+ // the entry we inserted with put_to_resync_at
+ self.resync_queue.remove(time_bytes)?;
} else {
self.resync_errors.remove(hash.as_slice())?;
+ self.resync_queue.remove(time_bytes)?;
}
Ok(true)
} else {
- self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => {},
@@ -1044,19 +1055,13 @@ struct ErrorCounter {
last_try: u64,
}
-impl Default for ErrorCounter {
- fn default() -> Self {
+impl ErrorCounter {
+ fn new(now: u64) -> Self {
Self {
errors: 1,
- last_try: now_msec(),
+ last_try: now,
}
}
-}
-
-impl ErrorCounter {
- fn new() -> Self {
- Self::default()
- }
fn decode(data: sled::IVec) -> Self {
Self {
@@ -1072,10 +1077,10 @@ impl ErrorCounter {
.concat()
}
- fn add1(self) -> Self {
+ fn add1(self, now: u64) -> Self {
Self {
errors: self.errors + 1,
- last_try: now_msec(),
+ last_try: now,
}
}