summaryrefslogblamecommitdiff
path: root/src/catalog.rs
blob: cad7f5be369b52871058580969fc9cbb9249d8d5 (plain) (tree)
1
2
3
4
5
6




                                                                 
             







                                                         
                                                  






                                    
                                                                       
                                                                        

                                               
                 




                                      
                                                 

                                               
                    





                          

                                                                                                 

                                        


                                           

 


                                                                                 
 

                                                                                              

                                        
                        








                                           


                                                                                               

                                               



                                 

 
                                                   

                                               
                        









                                


                                                                               

             


                                                                            


                                   
                                       



                                                                                 


                                                                                                     



                                  
                                                 



                                                                                         


                                                                               


                                      
                                         

                                                                                                   

     


                                                                                        



                                       
                                              






                                                                   


                                                                                                   



                                          
                                                    






                                                                  


                                                                                           



                                            

                                                      




                                                 























                                                                        




                                        
                                               



                                                                                           












                                                                                   

                                                                                                   



                                                  
                                                                                                                   
                     





                                                                                                   











                                                                                     

                                                                                                  






                                                               





                                                                                                                            

















                                                    








                                                                                 
//! Contains structures to interact with the catalog API
//!
//! See <https://developer.hashicorp.com/consul/api-docs/catalog>
//! for the full definition of the API.

use std::cmp;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt, TryFutureExt};
use log::*;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::watch;

use crate::{Consul, WithIndex};

/// Node summary, as specified in response to "list nodes" API calls in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct Node {
    pub node: String,
    pub address: String,
    pub meta: HashMap<String, String>,
}

/// One of the services returned in a CatalogNode
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct Service {
    pub service: String,
    pub address: String,
    pub port: u16,
    pub tags: Vec<String>,
}

/// Full node info, as specified in response to "retrieve map of services for a node" API call in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct CatalogNode {
    pub node: Node,
    pub services: HashMap<String, Service>,
}

/// Concise service list, as specified in response to "list services" API call in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
pub type ServiceList = HashMap<String, Vec<String>>;

/// Node serving a service, as specified in response to "list nodes for a service" API call in
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct ServiceNode {
    pub node: String,
    pub address: String,
    pub node_meta: HashMap<String, String>,
    pub service_name: String,
    pub service_tags: Vec<String>,
    pub service_address: String,
    pub service_port: u16,
}

/// Node serving a service with health info,
/// as specified in response to "list service instances for a service" health API call in
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct HealthServiceNode {
    pub node: Node,
    pub service: Service,
    pub checks: Vec<HealthCheck>,
}

/// A health check as returned in HealthServiceNode
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct HealthCheck {
    pub node: String,
    #[serde(rename = "CheckID")]
    pub check_id: String,
    pub name: String,
    pub status: String,
    pub output: String,
    #[serde(rename = "Type")]
    pub type_: String,
}

/// Map containing all services and their associated nodes, with health checks,
/// returned by `watch_all_service_health`
pub type AllServiceHealth = HashMap<String, Arc<[HealthServiceNode]>>;

impl Consul {
    /// The "list nodes" API call of the Catalog API
    ///
    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
    pub async fn catalog_node_list(
        &self,
        last_index: Option<usize>,
    ) -> Result<WithIndex<Vec<Node>>> {
        self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
            .await
    }

    /// The "retrieve map of services for a node" API call of the Catalog API
    ///
    /// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
    pub async fn catalog_node(
        &self,
        host: &str,
        last_index: Option<usize>,
    ) -> Result<WithIndex<Option<CatalogNode>>> {
        self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
            .await
    }

    /// The "list services" API call of the Catalog api
    ///
    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
    pub async fn catalog_service_list(
        &self,
        last_index: Option<usize>,
    ) -> Result<WithIndex<ServiceList>> {
        self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
            .await
    }

    /// The "list nodes for a service" API call of the Catalog api
    ///
    /// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
    pub async fn catalog_service_nodes(
        &self,
        service: &str,
        last_index: Option<usize>,
    ) -> Result<WithIndex<Vec<ServiceNode>>> {
        self.get_with_index(
            format!("{}/v1/catalog/service/{}", self.url, service),
            last_index,
        )
        .await
    }

    /// The "list service instances for a service" API call of the Health api
    ///
    /// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
    pub async fn health_service_instances(
        &self,
        service: &str,
        last_index: Option<usize>,
    ) -> Result<WithIndex<Vec<HealthServiceNode>>> {
        self.get_with_index(
            format!("{}/v1/health/service/{}", self.url, service),
            last_index,
        )
        .await
    }

    /// Launches a background task that watches all services and the nodes that serve them,
    /// and make that info available in a tokio watch channel.
    /// The worker terminates when the channel is dropped.
    pub fn watch_all_service_health(
        &self,
        max_retry_interval: Duration,
    ) -> watch::Receiver<AllServiceHealth> {
        let (tx, rx) = watch::channel(HashMap::new());

        tokio::spawn(do_watch_all_service_health(
            self.clone(),
            tx,
            max_retry_interval,
        ));

        rx
    }

    async fn get_with_index<T: for<'de> Deserialize<'de>>(
        &self,
        mut url: String,
        last_index: Option<usize>,
    ) -> Result<WithIndex<T>> {
        if let Some(i) = last_index {
            if url.contains('?') {
                write!(&mut url, "&index={}", i).unwrap();
            } else {
                write!(&mut url, "?index={}", i).unwrap();
            }
        }
        debug!("GET {} as {}", url, std::any::type_name::<T>());

        let http = self.client.get(&url).send().await?;

        Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
    }
}

async fn do_watch_all_service_health(
    consul: Consul,
    tx: watch::Sender<AllServiceHealth>,
    max_retry_interval: Duration,
) {
    let mut services = AllServiceHealth::new();
    let mut service_watchers =
        FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
    let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
        Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));

    loop {
        select! {
            list_res = &mut service_list => {
                match list_res {
                    Ok(list) => {
                        let list_index = list.index();
                        for service in list.into_inner().keys() {
                            if !services.contains_key(service) {
                                services.insert(service.to_string(), Arc::new([]));

                                let service = service.to_string();
                                service_watchers.push(Box::pin(async {
                                    let res = consul.health_service_instances(&service, None).await
                                        .map_err(|e| (1, e));
                                    (service, res)
                                }));
                            }
                        }
                        service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
                    }
                    Err((err_count, e)) => {
                        warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
                        let consul = &consul;
                        service_list = Box::pin(async move {
                            tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
                            consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
                        });
                    }
                }
            }
            (service, watch_res) = service_watchers.next().then(some_or_pending) => {
                match watch_res {
                    Ok(nodes) => {
                        let index = nodes.index();
                        services.insert(service.clone(), nodes.into_inner().into());

                        let consul = &consul;
                        service_watchers.push(Box::pin(async move {
                            let res = consul.health_service_instances(&service, Some(index)).await
                                .map_err(|e| (1, e));
                            (service, res)
                        }));

                        if tx.send(services.clone()).is_err() {
                            break;
                        }
                    }
                    Err((err_count, e)) => {
                        warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
                        let consul = &consul;
                        service_watchers.push(Box::pin(async move {
                            tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
                            let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
                            (service, res)
                        }));
                    }
                }
            }
            _ = tx.closed() => {
                break;
            }
        }
    }
}

async fn some_or_pending<T>(value: Option<T>) -> T {
    match value {
        Some(v) => v,
        None => futures::future::pending().await,
    }
}

fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
    // Exponential retry interval, starting at 2 seconds, maxing out at max_time,
    // with exponential increase of *1.5 each time
    cmp::min(
        max_time,
        Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)),
    )
}