summaryrefslogtreecommitdiff
path: root/src/catalog.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/catalog.rs')
-rw-r--r--src/catalog.rs74
1 files changed, 49 insertions, 25 deletions
diff --git a/src/catalog.rs b/src/catalog.rs
index 0f0ecf5..cad7f5b 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -3,6 +3,7 @@
//! See <https://developer.hashicorp.com/consul/api-docs/catalog>
//! for the full definition of the API.
+use std::cmp;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
@@ -11,7 +12,7 @@ use std::time::Duration;
use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::futures_unordered::FuturesUnordered;
-use futures::{FutureExt, StreamExt};
+use futures::{FutureExt, StreamExt, TryFutureExt};
use log::*;
use serde::{Deserialize, Serialize};
use tokio::select;
@@ -19,7 +20,7 @@ use tokio::sync::watch;
use crate::{Consul, WithIndex};
-/// Node summary, as specified in response to "list nodes" API calls in
+/// Node summary, as specified in response to "list nodes" API calls in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
@@ -126,11 +127,8 @@ impl Consul {
&self,
last_index: Option<usize>,
) -> Result<WithIndex<ServiceList>> {
- self.get_with_index::<ServiceList>(
- format!("{}/v1/catalog/services", self.url),
- last_index,
- )
- .await
+ self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
+ .await
}
/// The "list nodes for a service" API call of the Catalog api
@@ -166,10 +164,17 @@ impl Consul {
/// Launches a background task that watches all services and the nodes that serve them,
/// and make that info available in a tokio watch channel.
/// The worker terminates when the channel is dropped.
- pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> {
+ pub fn watch_all_service_health(
+ &self,
+ max_retry_interval: Duration,
+ ) -> watch::Receiver<AllServiceHealth> {
let (tx, rx) = watch::channel(HashMap::new());
- tokio::spawn(do_watch_all_service_health(self.clone(), tx));
+ tokio::spawn(do_watch_all_service_health(
+ self.clone(),
+ tx,
+ max_retry_interval,
+ ));
rx
}
@@ -194,10 +199,16 @@ impl Consul {
}
}
-async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) {
+async fn do_watch_all_service_health(
+ consul: Consul,
+ tx: watch::Sender<AllServiceHealth>,
+ max_retry_interval: Duration,
+) {
let mut services = AllServiceHealth::new();
- let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new();
- let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None));
+ let mut service_watchers =
+ FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
+ let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
+ Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
loop {
select! {
@@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let service = service.to_string();
service_watchers.push(Box::pin(async {
- let res = consul.health_service_instances(&service, None).await;
+ let res = consul.health_service_instances(&service, None).await
+ .map_err(|e| (1, e));
(service, res)
}));
}
}
- service_list = Box::pin(consul.catalog_service_list(Some(list_index)));
+ service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
}
- Err(e) => {
- warn!("Error listing services: {}", e);
- service_list = Box::pin(async {
- tokio::time::sleep(Duration::from_secs(30)).await;
- consul.catalog_service_list(None).await
+ Err((err_count, e)) => {
+ warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
+ let consul = &consul;
+ service_list = Box::pin(async move {
+ tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
+ consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
});
}
}
@@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let consul = &consul;
service_watchers.push(Box::pin(async move {
- let res = consul.health_service_instances(&service, Some(index)).await;
+ let res = consul.health_service_instances(&service, Some(index)).await
+ .map_err(|e| (1, e));
(service, res)
}));
@@ -243,11 +257,12 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
break;
}
}
- Err(e) => {
- warn!("Error getting service {}: {}", service, e);
- service_watchers.push(Box::pin(async {
- tokio::time::sleep(Duration::from_secs(30)).await;
- let res = consul.health_service_instances(&service, None).await;
+ Err((err_count, e)) => {
+ warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
+ let consul = &consul;
+ service_watchers.push(Box::pin(async move {
+ tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
+ let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
(service, res)
}));
}
@@ -266,3 +281,12 @@ async fn some_or_pending<T>(value: Option<T>) -> T {
None => futures::future::pending().await,
}
}
+
+fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
+ // Exponential retry interval, starting at 2 seconds, maxing out at max_time,
+ // with exponential increase of *1.5 each time
+ cmp::min(
+ max_time,
+ Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)),
+ )
+}