summaryrefslogtreecommitdiff
path: root/src/catalog.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/catalog.rs')
-rw-r--r--src/catalog.rs230
1 files changed, 230 insertions, 0 deletions
diff --git a/src/catalog.rs b/src/catalog.rs
new file mode 100644
index 0000000..952e99e
--- /dev/null
+++ b/src/catalog.rs
@@ -0,0 +1,230 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+use std::time::Duration;
+
+use anyhow::Result;
+use futures::future::BoxFuture;
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::{FutureExt, StreamExt};
+use log::*;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::watch;
+
+use crate::{Consul, WithIndex};
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulNode {
+ pub node: String,
+ pub address: String,
+ pub meta: HashMap<String, String>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulService {
+ pub service: String,
+ pub address: String,
+ pub port: u16,
+ pub tags: Vec<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulCatalogNode {
+ pub node: ConsulNode,
+ pub services: HashMap<String, ConsulService>,
+}
+
+pub type ConsulServiceList = HashMap<String, Vec<String>>;
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulServiceNode {
+ pub node: String,
+ pub address: String,
+ pub node_meta: HashMap<String, String>,
+ pub service_name: String,
+ pub service_tags: Vec<String>,
+ pub service_address: String,
+ pub service_port: u16,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulHealthServiceNode {
+ pub node: ConsulNode,
+ pub service: ConsulService,
+ pub checks: Vec<ConsulHealthCheck>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "PascalCase")]
+pub struct ConsulHealthCheck {
+ pub node: String,
+ #[serde(rename = "CheckID")]
+ pub check_id: String,
+ pub name: String,
+ pub status: String,
+ pub output: String,
+ #[serde(rename = "Type")]
+ pub type_: String,
+}
+
+pub type AllServiceHealth = HashMap<String, Arc<[ConsulHealthServiceNode]>>;
+
+impl Consul {
+ pub async fn catalog_node_list(
+ &self,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulNode>>> {
+ self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
+ .await
+ }
+
+ pub async fn catalog_node(
+ &self,
+ host: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Option<ConsulCatalogNode>>> {
+ self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
+ .await
+ }
+
+ pub async fn catalog_service_list(
+ &self,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<ConsulServiceList>> {
+ self.get_with_index::<ConsulServiceList>(
+ format!("{}/v1/catalog/services", self.url),
+ last_index,
+ )
+ .await
+ }
+
+ pub async fn catalog_service_nodes(
+ &self,
+ service: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulServiceNode>>> {
+ self.get_with_index(
+ format!("{}/v1/catalog/service/{}", self.url, service),
+ last_index,
+ )
+ .await
+ }
+
+ pub async fn health_service_instances(
+ &self,
+ service: &str,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<Vec<ConsulHealthServiceNode>>> {
+ self.get_with_index(
+ format!("{}/v1/health/service/{}", self.url, service),
+ last_index,
+ )
+ .await
+ }
+
+ pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> {
+ let (tx, rx) = watch::channel(HashMap::new());
+
+ tokio::spawn(do_watch_all_service_health(self.clone(), tx));
+
+ rx
+ }
+
+ async fn get_with_index<T: for<'de> Deserialize<'de>>(
+ &self,
+ mut url: String,
+ last_index: Option<usize>,
+ ) -> Result<WithIndex<T>> {
+ if let Some(i) = last_index {
+ if url.contains('?') {
+ write!(&mut url, "&index={}", i).unwrap();
+ } else {
+ write!(&mut url, "?index={}", i).unwrap();
+ }
+ }
+ debug!("GET {} as {}", url, std::any::type_name::<T>());
+
+ let http = self.client.get(&url).send().await?;
+
+ Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
+ }
+}
+
+async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) {
+ let mut services = AllServiceHealth::new();
+ let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new();
+ let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None));
+
+ loop {
+ select! {
+ list_res = &mut service_list => {
+ match list_res {
+ Ok(list) => {
+ let list_index = list.index();
+ for service in list.into_inner().keys() {
+ if !services.contains_key(service) {
+ services.insert(service.to_string(), Arc::new([]));
+
+ let service = service.to_string();
+ service_watchers.push(Box::pin(async {
+ let res = consul.health_service_instances(&service, None).await;
+ (service, res)
+ }));
+ }
+ }
+ service_list = Box::pin(consul.catalog_service_list(Some(list_index)));
+ }
+ Err(e) => {
+ warn!("Error listing services: {}", e);
+ service_list = Box::pin(async {
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ consul.catalog_service_list(None).await
+ });
+ }
+ }
+ }
+ (service, watch_res) = service_watchers.next().then(some_or_pending) => {
+ match watch_res {
+ Ok(nodes) => {
+ let index = nodes.index();
+ services.insert(service.clone(), nodes.into_inner().into());
+
+ let consul = &consul;
+ service_watchers.push(Box::pin(async move {
+ let res = consul.health_service_instances(&service, Some(index)).await;
+ (service, res)
+ }));
+
+ if tx.send(services.clone()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ warn!("Error getting service {}: {}", service, e);
+ service_watchers.push(Box::pin(async {
+ tokio::time::sleep(Duration::from_secs(30)).await;
+ let res = consul.health_service_instances(&service, None).await;
+ (service, res)
+ }));
+ }
+ }
+ }
+ _ = tx.closed() => {
+ break;
+ }
+ }
+ }
+}
+
+async fn some_or_pending<T>(value: Option<T>) -> T {
+ match value {
+ Some(v) => v,
+ None => futures::future::pending().await,
+ }
+}