aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs12
1 files changed, 8 insertions, 4 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 4318a064..9e29b421 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -2,7 +2,6 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -408,6 +407,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
}
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items in queue", self.buf.len()))
+ }
+
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
// This loop batches updates to counters to be sent all at once.
// They are sent once the propagate_rx channel has been emptied (or is closed).
@@ -429,9 +432,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
return Ok(WorkerStatus::Done);
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors);
- tokio::time::sleep(Duration::from_secs(5)).await;
- return Ok(WorkerStatus::Busy);
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
} else {
self.buf.clear();
self.errors = 0;