diff options
author | Alex Auvolat <alex@adnab.me> | 2022-03-01 14:55:37 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 11:51:37 +0100 |
commit | d78bf379fb85c0264c9971a26724f8b933a234ee (patch) | |
tree | 3d899b7e5ead760df0b0af6103b4579da88425be | |
parent | f7e6f4616f79726674d0fa5e1b19b06cacdc3a2a (diff) | |
download | garage-d78bf379fb85c0264c9971a26724f8b933a234ee.tar.gz garage-d78bf379fb85c0264c9971a26724f8b933a234ee.zip |
Fix resync queue to not drop items
-rw-r--r-- | src/model/block.rs | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index f0814eda..8329bb6f 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -562,9 +562,12 @@ impl BlockManager { } async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { + if let Some(first_pair_res) = self.resync_queue.iter().next() { + let (time_bytes, hash_bytes) = first_pair_res?; + let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); + if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -575,6 +578,9 @@ impl BlockManager { // don't do resync and return early, but still // make sure the item is still in queue at expected time self.put_to_resync_at(&hash, ec.next_try())?; + // ec.next_try() > now >= time_msec, so this remove + // is not removing the one we added just above + self.resync_queue.remove(time_bytes)?; return Ok(false); } } @@ -605,20 +611,25 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.resync_errors.get(hash.as_slice())? { - Some(ec) => ErrorCounter::decode(ec).add1(), - None => ErrorCounter::new(), + Some(ec) => ErrorCounter::decode(ec).add1(now + 1), + None => ErrorCounter::new(now + 1), }; - self.put_to_resync_at(&hash, err_counter.next_try())?; self.resync_errors .insert(hash.as_slice(), err_counter.encode())?; + + self.put_to_resync_at(&hash, err_counter.next_try())?; + // err_counter.next_try() >= now + 1 > now, + // the entry we remove from the queue is not + // the entry we inserted with put_to_resync_at + self.resync_queue.remove(time_bytes)?; } else { self.resync_errors.remove(hash.as_slice())?; + self.resync_queue.remove(time_bytes)?; } Ok(true) } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => {}, @@ -1044,19 +1055,13 @@ struct ErrorCounter { last_try: u64, } -impl Default for ErrorCounter { - fn default() -> Self { +impl ErrorCounter { + fn new(now: u64) -> Self { Self { errors: 1, - last_try: now_msec(), + last_try: now, } } -} - -impl ErrorCounter { - fn new() -> Self { - Self::default() - } fn decode(data: sled::IVec) -> Self { Self { @@ -1072,10 +1077,10 @@ impl ErrorCounter { .concat() } - fn add1(self) -> Self { + fn add1(self, now: u64) -> Self { Self { errors: self.errors + 1, - last_try: now_msec(), + last_try: now, } } |