//! Contains structures to interact with the catalog API //! //! See //! for the full definition of the API. use std::cmp; use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use futures::stream::futures_unordered::FuturesUnordered; use futures::{FutureExt, StreamExt, TryFutureExt}; use log::*; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::watch; use crate::{Consul, WithIndex}; /// Node summary, as specified in response to "list nodes" API calls in /// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct Node { pub node: String, pub address: String, pub meta: HashMap, } /// One of the services returned in a CatalogNode #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct Service { pub service: String, pub address: String, pub port: u16, pub tags: Vec, } /// Full node info, as specified in response to "retrieve map of services for a node" API call in /// #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct CatalogNode { pub node: Node, pub services: HashMap, } /// Concise service list, as specified in response to "list services" API call in /// pub type ServiceList = HashMap>; /// Node serving a service, as specified in response to "list nodes for a service" API call in /// #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct ServiceNode { pub node: String, pub address: String, pub node_meta: HashMap, pub service_name: String, pub service_tags: Vec, pub service_address: String, pub service_port: u16, } /// Node serving a service with health info, /// as specified in response to "list service instances for a service" health API call in /// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct HealthServiceNode { pub node: Node, pub service: Service, pub checks: Vec, } /// A health check as returned in HealthServiceNode #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct HealthCheck { pub node: String, #[serde(rename = "CheckID")] pub check_id: String, pub name: String, pub status: String, pub output: String, #[serde(rename = "Type")] pub type_: String, } /// Map containing all services and their associated nodes, with health checks, /// returned by `watch_all_service_health` pub type AllServiceHealth = HashMap>; impl Consul { /// The "list nodes" API call of the Catalog API /// /// pub async fn catalog_node_list( &self, last_index: Option, ) -> Result>> { self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index) .await } /// The "retrieve map of services for a node" API call of the Catalog API /// /// pub async fn catalog_node( &self, host: &str, last_index: Option, ) -> Result>> { self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index) .await } /// The "list services" API call of the Catalog api /// /// pub async fn catalog_service_list( &self, last_index: Option, ) -> Result> { self.get_with_index::(format!("{}/v1/catalog/services", self.url), last_index) .await } /// The "list nodes for a service" API call of the Catalog api /// /// pub async fn catalog_service_nodes( &self, service: &str, last_index: Option, ) -> Result>> { self.get_with_index( format!("{}/v1/catalog/service/{}", self.url, service), last_index, ) .await } /// The "list service instances for a service" API call of the Health api /// /// pub async fn health_service_instances( &self, service: &str, last_index: Option, ) -> Result>> { self.get_with_index( format!("{}/v1/health/service/{}", self.url, service), last_index, ) .await } /// Launches a background task that watches all services and the nodes that serve them, /// and make that info available in a tokio watch channel. /// The worker terminates when the channel is dropped. pub fn watch_all_service_health( &self, max_retry_interval: Duration, ) -> watch::Receiver { let (tx, rx) = watch::channel(HashMap::new()); tokio::spawn(do_watch_all_service_health( self.clone(), tx, max_retry_interval, )); rx } async fn get_with_index Deserialize<'de>>( &self, mut url: String, last_index: Option, ) -> Result> { if let Some(i) = last_index { if url.contains('?') { write!(&mut url, "&index={}", i).unwrap(); } else { write!(&mut url, "?index={}", i).unwrap(); } } debug!("GET {} as {}", url, std::any::type_name::()); let http = self.client.get(&url).send().await?; Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) } } async fn do_watch_all_service_health( consul: Consul, tx: watch::Sender, max_retry_interval: Duration, ) { let mut services = AllServiceHealth::new(); let mut service_watchers = FuturesUnordered::)>>::new(); let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e))); loop { select! { list_res = &mut service_list => { match list_res { Ok(list) => { let list_index = list.index(); for service in list.into_inner().keys() { if !services.contains_key(service) { services.insert(service.to_string(), Arc::new([])); let service = service.to_string(); service_watchers.push(Box::pin(async { let res = consul.health_service_instances(&service, None).await .map_err(|e| (1, e)); (service, res) })); } } service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e))); } Err((err_count, e)) => { warn!("Error listing services: {} ({} consecutive errors)", e, err_count); let consul = &consul; service_list = Box::pin(async move { tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e)) }); } } } (service, watch_res) = service_watchers.next().then(some_or_pending) => { match watch_res { Ok(nodes) => { let index = nodes.index(); let nodes = nodes.into_inner(); if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) { services.insert(service.clone(), nodes.into()); if tx.send(services.clone()).is_err() { break; } } let consul = &consul; service_watchers.push(Box::pin(async move { let res = consul.health_service_instances(&service, Some(index)).await .map_err(|e| (1, e)); (service, res) })); } Err((err_count, e)) => { warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count); let consul = &consul; service_watchers.push(Box::pin(async move { tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e)); (service, res) })); } } } _ = tx.closed() => { break; } } } } async fn some_or_pending(value: Option) -> T { match value { Some(v) => v, None => futures::future::pending().await, } } fn retry_to_time(retries: usize, max_time: Duration) -> Duration { // Exponential retry interval, starting at 2 seconds, maxing out at max_time, // with exponential increase of *1.5 each time cmp::min( max_time, Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)), ) }