use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use futures::stream::futures_unordered::FuturesUnordered; use futures::{FutureExt, StreamExt}; use log::*; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::watch; use crate::{Consul, WithIndex}; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] pub struct ConsulNode { pub node: String, pub address: String, pub meta: HashMap, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] pub struct ConsulService { pub service: String, pub address: String, pub port: u16, pub tags: Vec, } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "PascalCase")] pub struct ConsulCatalogNode { pub node: ConsulNode, pub services: HashMap, } pub type ConsulServiceList = HashMap>; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "PascalCase")] pub struct ConsulServiceNode { pub node: String, pub address: String, pub node_meta: HashMap, pub service_name: String, pub service_tags: Vec, pub service_address: String, pub service_port: u16, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] pub struct ConsulHealthServiceNode { pub node: ConsulNode, pub service: ConsulService, pub checks: Vec, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] pub struct ConsulHealthCheck { pub node: String, #[serde(rename = "CheckID")] pub check_id: String, pub name: String, pub status: String, pub output: String, #[serde(rename = "Type")] pub type_: String, } pub type AllServiceHealth = HashMap>; impl Consul { pub async fn catalog_node_list( &self, last_index: Option, ) -> Result>> { self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index) .await } pub async fn catalog_node( &self, host: &str, last_index: Option, ) -> Result>> { self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index) .await } pub async fn catalog_service_list( &self, last_index: Option, ) -> Result> { self.get_with_index::( format!("{}/v1/catalog/services", self.url), last_index, ) .await } pub async fn catalog_service_nodes( &self, service: &str, last_index: Option, ) -> Result>> { self.get_with_index( format!("{}/v1/catalog/service/{}", self.url, service), last_index, ) .await } pub async fn health_service_instances( &self, service: &str, last_index: Option, ) -> Result>> { self.get_with_index( format!("{}/v1/health/service/{}", self.url, service), last_index, ) .await } pub fn watch_all_service_health(&self) -> watch::Receiver { let (tx, rx) = watch::channel(HashMap::new()); tokio::spawn(do_watch_all_service_health(self.clone(), tx)); rx } async fn get_with_index Deserialize<'de>>( &self, mut url: String, last_index: Option, ) -> Result> { if let Some(i) = last_index { if url.contains('?') { write!(&mut url, "&index={}", i).unwrap(); } else { write!(&mut url, "?index={}", i).unwrap(); } } debug!("GET {} as {}", url, std::any::type_name::()); let http = self.client.get(&url).send().await?; Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) } } async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender) { let mut services = AllServiceHealth::new(); let mut service_watchers = FuturesUnordered::)>>::new(); let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None)); loop { select! { list_res = &mut service_list => { match list_res { Ok(list) => { let list_index = list.index(); for service in list.into_inner().keys() { if !services.contains_key(service) { services.insert(service.to_string(), Arc::new([])); let service = service.to_string(); service_watchers.push(Box::pin(async { let res = consul.health_service_instances(&service, None).await; (service, res) })); } } service_list = Box::pin(consul.catalog_service_list(Some(list_index))); } Err(e) => { warn!("Error listing services: {}", e); service_list = Box::pin(async { tokio::time::sleep(Duration::from_secs(30)).await; consul.catalog_service_list(None).await }); } } } (service, watch_res) = service_watchers.next().then(some_or_pending) => { match watch_res { Ok(nodes) => { let index = nodes.index(); services.insert(service.clone(), nodes.into_inner().into()); let consul = &consul; service_watchers.push(Box::pin(async move { let res = consul.health_service_instances(&service, Some(index)).await; (service, res) })); if tx.send(services.clone()).is_err() { break; } } Err(e) => { warn!("Error getting service {}: {}", service, e); service_watchers.push(Box::pin(async { tokio::time::sleep(Duration::from_secs(30)).await; let res = consul.health_service_instances(&service, None).await; (service, res) })); } } } _ = tx.closed() => { break; } } } } async fn some_or_pending(value: Option) -> T { match value { Some(v) => v, None => futures::future::pending().await, } }