aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs58
1 files changed, 30 insertions, 28 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 9fe6c76b..023ed3ab 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -258,46 +258,48 @@ impl BlockManager {
async fn resync_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut n_failures = 0usize;
+ ) {
while !*must_exit.borrow() {
- if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
- let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
- let now = now_msec();
- if now >= time_msec {
- let hash = Hash::try_from(&hash_bytes[..]).unwrap();
-
- if let Err(e) = self.resync_iter(&hash).await {
- warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
- n_failures += 1;
- if n_failures >= 10 {
- warn!("Too many resync failures, throttling.");
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- } else {
- n_failures = 0;
- }
- } else {
- self.resync_queue.insert(time_bytes, hash_bytes)?;
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => (),
- _ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
- }
+ if let Err(e) = self.resync_iter(&mut must_exit).await {
+ warn!("Error in block resync loop: {}", e);
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+ }
+ }
+ }
+
+
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ if let Some(first_item) = self.resync_queue.iter().next() {
+ let (time_bytes, hash_bytes) = first_item?;
+ let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
+ let now = now_msec();
+ if now >= time_msec {
+ let hash = Hash::try_from(&hash_bytes[..]).unwrap();
+ let res = self.resync_block(&hash).await;
+ if let Err(e) = &res {
+ warn!("Error when resyncing {:?}: {}", hash, e);
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
+ self.resync_queue.remove(&time_bytes)?;
+ res?; // propagate error to delay main loop
} else {
+ let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
select! {
+ _ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
+ } else {
+ select! {
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
}
Ok(())
}
- async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let lock = self.data_dir_lock.lock().await;
let path = self.block_path(hash);