diff options
Diffstat (limited to 'src/catalog.rs')
-rw-r--r-- | src/catalog.rs | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/src/catalog.rs b/src/catalog.rs new file mode 100644 index 0000000..952e99e --- /dev/null +++ b/src/catalog.rs @@ -0,0 +1,230 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use futures::future::BoxFuture; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio::sync::watch; + +use crate::{Consul, WithIndex}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulNode { + pub node: String, + pub address: String, + pub meta: HashMap<String, String>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulService { + pub service: String, + pub address: String, + pub port: u16, + pub tags: Vec<String>, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulCatalogNode { + pub node: ConsulNode, + pub services: HashMap<String, ConsulService>, +} + +pub type ConsulServiceList = HashMap<String, Vec<String>>; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulServiceNode { + pub node: String, + pub address: String, + pub node_meta: HashMap<String, String>, + pub service_name: String, + pub service_tags: Vec<String>, + pub service_address: String, + pub service_port: u16, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthServiceNode { + pub node: ConsulNode, + pub service: ConsulService, + pub checks: Vec<ConsulHealthCheck>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthCheck { + pub node: String, + #[serde(rename = "CheckID")] + pub check_id: String, + pub name: String, + pub status: String, + pub output: String, + #[serde(rename = "Type")] + pub type_: String, +} + +pub type AllServiceHealth = HashMap<String, Arc<[ConsulHealthServiceNode]>>; + +impl Consul { + pub async fn catalog_node_list( + &self, + last_index: Option<usize>, + ) -> Result<WithIndex<Vec<ConsulNode>>> { + self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index) + .await + } + + pub async fn catalog_node( + &self, + host: &str, + last_index: Option<usize>, + ) -> Result<WithIndex<Option<ConsulCatalogNode>>> { + self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index) + .await + } + + pub async fn catalog_service_list( + &self, + last_index: Option<usize>, + ) -> Result<WithIndex<ConsulServiceList>> { + self.get_with_index::<ConsulServiceList>( + format!("{}/v1/catalog/services", self.url), + last_index, + ) + .await + } + + pub async fn catalog_service_nodes( + &self, + service: &str, + last_index: Option<usize>, + ) -> Result<WithIndex<Vec<ConsulServiceNode>>> { + self.get_with_index( + format!("{}/v1/catalog/service/{}", self.url, service), + last_index, + ) + .await + } + + pub async fn health_service_instances( + &self, + service: &str, + last_index: Option<usize>, + ) -> Result<WithIndex<Vec<ConsulHealthServiceNode>>> { + self.get_with_index( + format!("{}/v1/health/service/{}", self.url, service), + last_index, + ) + .await + } + + pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> { + let (tx, rx) = watch::channel(HashMap::new()); + + tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + + rx + } + + async fn get_with_index<T: for<'de> Deserialize<'de>>( + &self, + mut url: String, + last_index: Option<usize>, + ) -> Result<WithIndex<T>> { + if let Some(i) = last_index { + if url.contains('?') { + write!(&mut url, "&index={}", i).unwrap(); + } else { + write!(&mut url, "?index={}", i).unwrap(); + } + } + debug!("GET {} as {}", url, std::any::type_name::<T>()); + + let http = self.client.get(&url).send().await?; + + Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?)) + } +} + +async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) { + let mut services = AllServiceHealth::new(); + let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new(); + let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None)); + + loop { + select! { + list_res = &mut service_list => { + match list_res { + Ok(list) => { + let list_index = list.index(); + for service in list.into_inner().keys() { + if !services.contains_key(service) { + services.insert(service.to_string(), Arc::new([])); + + let service = service.to_string(); + service_watchers.push(Box::pin(async { + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + service_list = Box::pin(consul.catalog_service_list(Some(list_index))); + } + Err(e) => { + warn!("Error listing services: {}", e); + service_list = Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + consul.catalog_service_list(None).await + }); + } + } + } + (service, watch_res) = service_watchers.next().then(some_or_pending) => { + match watch_res { + Ok(nodes) => { + let index = nodes.index(); + services.insert(service.clone(), nodes.into_inner().into()); + + let consul = &consul; + service_watchers.push(Box::pin(async move { + let res = consul.health_service_instances(&service, Some(index)).await; + (service, res) + })); + + if tx.send(services.clone()).is_err() { + break; + } + } + Err(e) => { + warn!("Error getting service {}: {}", service, e); + service_watchers.push(Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + } + _ = tx.closed() => { + break; + } + } + } +} + +async fn some_or_pending<T>(value: Option<T>) -> T { + match value { + Some(v) => v, + None => futures::future::pending().await, + } +} |