summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-02-02 16:24:30 +0100
committerAlex Auvolat <alex@adnab.me>2023-02-02 16:24:30 +0100
commita11ed03031cb59b412950d62960fc27cf919e2bf (patch)
treee08fb386d8bd24977ae119c8b214ed3932c5a035
parentff14118db7ac838910f4ace03bc24f6dbafc62b3 (diff)
downloaddf-consul-a11ed03031cb59b412950d62960fc27cf919e2bf.tar.gz
df-consul-a11ed03031cb59b412950d62960fc27cf919e2bf.zip
Avoid sending useless changes to downstream listener
-rw-r--r--Cargo.toml2
-rw-r--r--examples/watch_test.rs3
-rw-r--r--src/catalog.rs25
3 files changed, 17 insertions, 13 deletions
diff --git a/Cargo.toml b/Cargo.toml
index b844a1f..3f967b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,7 +2,7 @@
name = "df-consul"
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
authors = [ "Alex Auvolat <alex@adnab.me>" ]
-version = "0.3.2"
+version = "0.3.3"
edition = "2021"
license = "MIT"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
diff --git a/examples/watch_test.rs b/examples/watch_test.rs
index d4d48a0..c697a53 100644
--- a/examples/watch_test.rs
+++ b/examples/watch_test.rs
@@ -22,6 +22,7 @@ async fn main() {
if watch.changed().await.is_err() {
break;
}
- println!("\n{:?}", watch.borrow_and_update());
+ //println!("\n{:?}", watch.borrow_and_update());
+ println!("changed, {} values", watch.borrow_and_update().len());
}
}
diff --git a/src/catalog.rs b/src/catalog.rs
index cad7f5b..042f745 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -22,7 +22,7 @@ use crate::{Consul, WithIndex};
/// Node summary, as specified in response to "list nodes" API calls in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
-#[derive(Serialize, Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct Node {
pub node: String,
@@ -31,7 +31,7 @@ pub struct Node {
}
/// One of the services returned in a CatalogNode
-#[derive(Serialize, Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct Service {
pub service: String,
@@ -42,7 +42,7 @@ pub struct Service {
/// Full node info, as specified in response to "retrieve map of services for a node" API call in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct CatalogNode {
pub node: Node,
@@ -55,7 +55,7 @@ pub type ServiceList = HashMap<String, Vec<String>>;
/// Node serving a service, as specified in response to "list nodes for a service" API call in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct ServiceNode {
pub node: String,
@@ -70,7 +70,7 @@ pub struct ServiceNode {
/// Node serving a service with health info,
/// as specified in response to "list service instances for a service" health API call in
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
-#[derive(Serialize, Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct HealthServiceNode {
pub node: Node,
@@ -79,7 +79,7 @@ pub struct HealthServiceNode {
}
/// A health check as returned in HealthServiceNode
-#[derive(Serialize, Deserialize, Debug, Clone)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct HealthCheck {
pub node: String,
@@ -244,7 +244,14 @@ async fn do_watch_all_service_health(
match watch_res {
Ok(nodes) => {
let index = nodes.index();
- services.insert(service.clone(), nodes.into_inner().into());
+
+ let nodes = nodes.into_inner();
+ if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
+ services.insert(service.clone(), nodes.into());
+ if tx.send(services.clone()).is_err() {
+ break;
+ }
+ }
let consul = &consul;
service_watchers.push(Box::pin(async move {
@@ -252,10 +259,6 @@ async fn do_watch_all_service_health(
.map_err(|e| (1, e));
(service, res)
}));
-
- if tx.send(services.clone()).is_err() {
- break;
- }
}
Err((err_count, e)) => {
warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);