summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml3
-rw-r--r--examples/test.rs13
-rw-r--r--examples/watch_test.rs27
-rw-r--r--src/catalog.rs74
-rw-r--r--src/lib.rs2
5 files changed, 83 insertions, 36 deletions
diff --git a/Cargo.toml b/Cargo.toml
index fc616df..b844a1f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,7 +2,7 @@
name = "df-consul"
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
authors = [ "Alex Auvolat <alex@adnab.me>" ]
-version = "0.3.1"
+version = "0.3.2"
edition = "2021"
license = "MIT"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
@@ -20,3 +20,4 @@ futures = "0.3.25"
[dev-dependencies]
tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] }
+pretty_env_logger = "0.4.0"
diff --git a/examples/test.rs b/examples/test.rs
index b3e6455..7e38c66 100644
--- a/examples/test.rs
+++ b/examples/test.rs
@@ -1,7 +1,11 @@
+use std::time::Duration;
+
use df_consul::*;
#[tokio::main]
async fn main() {
+ pretty_env_logger::init();
+
let config = Config {
addr: "http://localhost:8500".into(),
ca_cert: None,
@@ -41,13 +45,4 @@ async fn main() {
.unwrap()
);
}
-
- println!("== WATCHING EVERYTHING ==");
- let mut watch = consul.watch_all_service_health();
- loop {
- if watch.changed().await.is_err() {
- break;
- }
- println!("\n{:?}", watch.borrow_and_update());
- }
}
diff --git a/examples/watch_test.rs b/examples/watch_test.rs
new file mode 100644
index 0000000..d4d48a0
--- /dev/null
+++ b/examples/watch_test.rs
@@ -0,0 +1,27 @@
+use std::time::Duration;
+
+use df_consul::*;
+
+#[tokio::main]
+async fn main() {
+ pretty_env_logger::init();
+
+ let config = Config {
+ addr: "http://localhost:8500".into(),
+ ca_cert: None,
+ tls_skip_verify: false,
+ client_cert: None,
+ client_key: None,
+ };
+
+ let consul = Consul::new(config, "").unwrap();
+
+ println!("== WATCHING EVERYTHING ==");
+ let mut watch = consul.watch_all_service_health(Duration::from_secs(30));
+ loop {
+ if watch.changed().await.is_err() {
+ break;
+ }
+ println!("\n{:?}", watch.borrow_and_update());
+ }
+}
diff --git a/src/catalog.rs b/src/catalog.rs
index 0f0ecf5..cad7f5b 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -3,6 +3,7 @@
//! See <https://developer.hashicorp.com/consul/api-docs/catalog>
//! for the full definition of the API.
+use std::cmp;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
@@ -11,7 +12,7 @@ use std::time::Duration;
use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::futures_unordered::FuturesUnordered;
-use futures::{FutureExt, StreamExt};
+use futures::{FutureExt, StreamExt, TryFutureExt};
use log::*;
use serde::{Deserialize, Serialize};
use tokio::select;
@@ -19,7 +20,7 @@ use tokio::sync::watch;
use crate::{Consul, WithIndex};
-/// Node summary, as specified in response to "list nodes" API calls in
+/// Node summary, as specified in response to "list nodes" API calls in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
@@ -126,11 +127,8 @@ impl Consul {
&self,
last_index: Option<usize>,
) -> Result<WithIndex<ServiceList>> {
- self.get_with_index::<ServiceList>(
- format!("{}/v1/catalog/services", self.url),
- last_index,
- )
- .await
+ self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
+ .await
}
/// The "list nodes for a service" API call of the Catalog api
@@ -166,10 +164,17 @@ impl Consul {
/// Launches a background task that watches all services and the nodes that serve them,
/// and make that info available in a tokio watch channel.
/// The worker terminates when the channel is dropped.
- pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> {
+ pub fn watch_all_service_health(
+ &self,
+ max_retry_interval: Duration,
+ ) -> watch::Receiver<AllServiceHealth> {
let (tx, rx) = watch::channel(HashMap::new());
- tokio::spawn(do_watch_all_service_health(self.clone(), tx));
+ tokio::spawn(do_watch_all_service_health(
+ self.clone(),
+ tx,
+ max_retry_interval,
+ ));
rx
}
@@ -194,10 +199,16 @@ impl Consul {
}
}
-async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) {
+async fn do_watch_all_service_health(
+ consul: Consul,
+ tx: watch::Sender<AllServiceHealth>,
+ max_retry_interval: Duration,
+) {
let mut services = AllServiceHealth::new();
- let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new();
- let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None));
+ let mut service_watchers =
+ FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
+ let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
+ Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
loop {
select! {
@@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let service = service.to_string();
service_watchers.push(Box::pin(async {
- let res = consul.health_service_instances(&service, None).await;
+ let res = consul.health_service_instances(&service, None).await
+ .map_err(|e| (1, e));
(service, res)
}));
}
}
- service_list = Box::pin(consul.catalog_service_list(Some(list_index)));
+ service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
}
- Err(e) => {
- warn!("Error listing services: {}", e);
- service_list = Box::pin(async {
- tokio::time::sleep(Duration::from_secs(30)).await;
- consul.catalog_service_list(None).await
+ Err((err_count, e)) => {
+ warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
+ let consul = &consul;
+ service_list = Box::pin(async move {
+ tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
+ consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
});
}
}
@@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let consul = &consul;
service_watchers.push(Box::pin(async move {
- let res = consul.health_service_instances(&service, Some(index)).await;
+ let res = consul.health_service_instances(&service, Some(index)).await
+ .map_err(|e| (1, e));
(service, res)
}));
@@ -243,11 +257,12 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
break;
}
}
- Err(e) => {
- warn!("Error getting service {}: {}", service, e);
- service_watchers.push(Box::pin(async {
- tokio::time::sleep(Duration::from_secs(30)).await;
- let res = consul.health_service_instances(&service, None).await;
+ Err((err_count, e)) => {
+ warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
+ let consul = &consul;
+ service_watchers.push(Box::pin(async move {
+ tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
+ let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
(service, res)
}));
}
@@ -266,3 +281,12 @@ async fn some_or_pending<T>(value: Option<T>) -> T {
None => futures::future::pending().await,
}
}
+
+fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
+ // Exponential retry interval, starting at 2 seconds, maxing out at max_time,
+ // with exponential increase of *1.5 each time
+ cmp::min(
+ max_time,
+ Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)),
+ )
+}
diff --git a/src/lib.rs b/src/lib.rs
index c320936..583f106 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,6 +1,6 @@
pub mod catalog;
-pub mod locking;
mod kv;
+pub mod locking;
mod with_index;
use std::fs::File;