diff options
author | Alex Auvolat <alex@adnab.me> | 2023-02-02 16:24:30 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-02-02 16:24:30 +0100 |
commit | a11ed03031cb59b412950d62960fc27cf919e2bf (patch) | |
tree | e08fb386d8bd24977ae119c8b214ed3932c5a035 | |
parent | ff14118db7ac838910f4ace03bc24f6dbafc62b3 (diff) | |
download | df-consul-a11ed03031cb59b412950d62960fc27cf919e2bf.tar.gz df-consul-a11ed03031cb59b412950d62960fc27cf919e2bf.zip |
Avoid sending useless changes to downstream listener
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | examples/watch_test.rs | 3 | ||||
-rw-r--r-- | src/catalog.rs | 25 |
3 files changed, 17 insertions, 13 deletions
@@ -2,7 +2,7 @@ name = "df-consul" description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API" authors = [ "Alex Auvolat <alex@adnab.me>" ] -version = "0.3.2" +version = "0.3.3" edition = "2021" license = "MIT" repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul" diff --git a/examples/watch_test.rs b/examples/watch_test.rs index d4d48a0..c697a53 100644 --- a/examples/watch_test.rs +++ b/examples/watch_test.rs @@ -22,6 +22,7 @@ async fn main() { if watch.changed().await.is_err() { break; } - println!("\n{:?}", watch.borrow_and_update()); + //println!("\n{:?}", watch.borrow_and_update()); + println!("changed, {} values", watch.borrow_and_update().len()); } } diff --git a/src/catalog.rs b/src/catalog.rs index cad7f5b..042f745 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -22,7 +22,7 @@ use crate::{Consul, WithIndex}; /// Node summary, as specified in response to "list nodes" API calls in /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes> -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct Node { pub node: String, @@ -31,7 +31,7 @@ pub struct Node { } /// One of the services returned in a CatalogNode -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct Service { pub service: String, @@ -42,7 +42,7 @@ pub struct Service { /// Full node info, as specified in response to "retrieve map of services for a node" API call in /// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node> -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct CatalogNode { pub node: Node, @@ -55,7 +55,7 @@ pub type ServiceList = HashMap<String, Vec<String>>; /// Node serving a service, as specified in response to "list nodes for a service" API call in /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service> -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct ServiceNode { pub node: String, @@ -70,7 +70,7 @@ pub struct ServiceNode { /// Node serving a service with health info, /// as specified in response to "list service instances for a service" health API call in /// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service> -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct HealthServiceNode { pub node: Node, @@ -79,7 +79,7 @@ pub struct HealthServiceNode { } /// A health check as returned in HealthServiceNode -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct HealthCheck { pub node: String, @@ -244,7 +244,14 @@ async fn do_watch_all_service_health( match watch_res { Ok(nodes) => { let index = nodes.index(); - services.insert(service.clone(), nodes.into_inner().into()); + + let nodes = nodes.into_inner(); + if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) { + services.insert(service.clone(), nodes.into()); + if tx.send(services.clone()).is_err() { + break; + } + } let consul = &consul; service_watchers.push(Box::pin(async move { @@ -252,10 +259,6 @@ async fn do_watch_all_service_health( .map_err(|e| (1, e)); (service, res) })); - - if tx.send(services.clone()).is_err() { - break; - } } Err((err_count, e)) => { warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count); |