diff options
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r-- | src/model/index_counter.rs | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 4318a064..9e29b421 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -2,7 +2,6 @@ use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -408,6 +407,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { format!("{} index counter propagator", T::COUNTER_TABLE_NAME) } + fn info(&self) -> Option<String> { + Some(format!("{} items in queue", self.buf.len())) + } + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { // This loop batches updates to counters to be sent all at once. // They are sent once the propagate_rx channel has been emptied (or is closed). @@ -429,9 +432,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); return Ok(WorkerStatus::Done); } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors); - tokio::time::sleep(Duration::from_secs(5)).await; - return Ok(WorkerStatus::Busy); + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); } else { self.buf.clear(); self.errors = 0; |