From ff14118db7ac838910f4ace03bc24f6dbafc62b3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 2 Feb 2023 16:10:56 +0100 Subject: Exponential backoff retry on catalog watcher --- src/catalog.rs | 74 ++++++++++++++++++++++++++++++++++++++-------------------- src/lib.rs | 2 +- 2 files changed, 50 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/catalog.rs b/src/catalog.rs index 0f0ecf5..cad7f5b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -3,6 +3,7 @@ //! See //! for the full definition of the API. +use std::cmp; use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; @@ -11,7 +12,7 @@ use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use futures::stream::futures_unordered::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use log::*; use serde::{Deserialize, Serialize}; use tokio::select; @@ -19,7 +20,7 @@ use tokio::sync::watch; use crate::{Consul, WithIndex}; -/// Node summary, as specified in response to "list nodes" API calls in +/// Node summary, as specified in response to "list nodes" API calls in /// #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] @@ -126,11 +127,8 @@ impl Consul { &self, last_index: Option, ) -> Result> { - self.get_with_index::( - format!("{}/v1/catalog/services", self.url), - last_index, - ) - .await + self.get_with_index::(format!("{}/v1/catalog/services", self.url), last_index) + .await } /// The "list nodes for a service" API call of the Catalog api @@ -166,10 +164,17 @@ impl Consul { /// Launches a background task that watches all services and the nodes that serve them, /// and make that info available in a tokio watch channel. /// The worker terminates when the channel is dropped. - pub fn watch_all_service_health(&self) -> watch::Receiver { + pub fn watch_all_service_health( + &self, + max_retry_interval: Duration, + ) -> watch::Receiver { let (tx, rx) = watch::channel(HashMap::new()); - tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + tokio::spawn(do_watch_all_service_health( + self.clone(), + tx, + max_retry_interval, + )); rx } @@ -194,10 +199,16 @@ impl Consul { } } -async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender) { +async fn do_watch_all_service_health( + consul: Consul, + tx: watch::Sender, + max_retry_interval: Duration, +) { let mut services = AllServiceHealth::new(); - let mut service_watchers = FuturesUnordered::)>>::new(); - let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None)); + let mut service_watchers = + FuturesUnordered::)>>::new(); + let mut service_list: BoxFuture> = + Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e))); loop { select! { @@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender { - warn!("Error listing services: {}", e); - service_list = Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - consul.catalog_service_list(None).await + Err((err_count, e)) => { + warn!("Error listing services: {} ({} consecutive errors)", e, err_count); + let consul = &consul; + service_list = Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e)) }); } } @@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender { - warn!("Error getting service {}: {}", service, e); - service_watchers.push(Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - let res = consul.health_service_instances(&service, None).await; + Err((err_count, e)) => { + warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count); + let consul = &consul; + service_watchers.push(Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e)); (service, res) })); } @@ -266,3 +281,12 @@ async fn some_or_pending(value: Option) -> T { None => futures::future::pending().await, } } + +fn retry_to_time(retries: usize, max_time: Duration) -> Duration { + // Exponential retry interval, starting at 2 seconds, maxing out at max_time, + // with exponential increase of *1.5 each time + cmp::min( + max_time, + Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)), + ) +} diff --git a/src/lib.rs b/src/lib.rs index c320936..583f106 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ pub mod catalog; -pub mod locking; mod kv; +pub mod locking; mod with_index; use std::fs::File; -- cgit v1.2.3