diff options
Diffstat (limited to 'src/catalog.rs')
-rw-r--r-- | src/catalog.rs | 74 |
1 files changed, 49 insertions, 25 deletions
diff --git a/src/catalog.rs b/src/catalog.rs index 0f0ecf5..cad7f5b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -3,6 +3,7 @@ //! See <https://developer.hashicorp.com/consul/api-docs/catalog> //! for the full definition of the API. +use std::cmp; use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; @@ -11,7 +12,7 @@ use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use futures::stream::futures_unordered::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use log::*; use serde::{Deserialize, Serialize}; use tokio::select; @@ -19,7 +20,7 @@ use tokio::sync::watch; use crate::{Consul, WithIndex}; -/// Node summary, as specified in response to "list nodes" API calls in +/// Node summary, as specified in response to "list nodes" API calls in /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes> #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] @@ -126,11 +127,8 @@ impl Consul { &self, last_index: Option<usize>, ) -> Result<WithIndex<ServiceList>> { - self.get_with_index::<ServiceList>( - format!("{}/v1/catalog/services", self.url), - last_index, - ) - .await + self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index) + .await } /// The "list nodes for a service" API call of the Catalog api @@ -166,10 +164,17 @@ impl Consul { /// Launches a background task that watches all services and the nodes that serve them, /// and make that info available in a tokio watch channel. /// The worker terminates when the channel is dropped. - pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> { + pub fn watch_all_service_health( + &self, + max_retry_interval: Duration, + ) -> watch::Receiver<AllServiceHealth> { let (tx, rx) = watch::channel(HashMap::new()); - tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + tokio::spawn(do_watch_all_service_health( + self.clone(), + tx, + max_retry_interval, + )); rx } @@ -194,10 +199,16 @@ impl Consul { } } -async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) { +async fn do_watch_all_service_health( + consul: Consul, + tx: watch::Sender<AllServiceHealth>, + max_retry_interval: Duration, +) { let mut services = AllServiceHealth::new(); - let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new(); - let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None)); + let mut service_watchers = + FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new(); + let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> = + Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e))); loop { select! { @@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic let service = service.to_string(); service_watchers.push(Box::pin(async { - let res = consul.health_service_instances(&service, None).await; + let res = consul.health_service_instances(&service, None).await + .map_err(|e| (1, e)); (service, res) })); } } - service_list = Box::pin(consul.catalog_service_list(Some(list_index))); + service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e))); } - Err(e) => { - warn!("Error listing services: {}", e); - service_list = Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - consul.catalog_service_list(None).await + Err((err_count, e)) => { + warn!("Error listing services: {} ({} consecutive errors)", e, err_count); + let consul = &consul; + service_list = Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e)) }); } } @@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic let consul = &consul; service_watchers.push(Box::pin(async move { - let res = consul.health_service_instances(&service, Some(index)).await; + let res = consul.health_service_instances(&service, Some(index)).await + .map_err(|e| (1, e)); (service, res) })); @@ -243,11 +257,12 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic break; } } - Err(e) => { - warn!("Error getting service {}: {}", service, e); - service_watchers.push(Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - let res = consul.health_service_instances(&service, None).await; + Err((err_count, e)) => { + warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count); + let consul = &consul; + service_watchers.push(Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e)); (service, res) })); } @@ -266,3 +281,12 @@ async fn some_or_pending<T>(value: Option<T>) -> T { None => futures::future::pending().await, } } + +fn retry_to_time(retries: usize, max_time: Duration) -> Duration { + // Exponential retry interval, starting at 2 seconds, maxing out at max_time, + // with exponential increase of *1.5 each time + cmp::min( + max_time, + Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)), + ) +} |