diff options
Diffstat (limited to 'src/proxy_config.rs')
-rw-r--r-- | src/proxy_config.rs | 228 |
1 files changed, 74 insertions, 154 deletions
diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 18f9d53..848c99c 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -1,16 +1,13 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{atomic, Arc}; -use std::{cmp, time::Duration}; +use std::time::Duration; use anyhow::Result; use opentelemetry::{metrics, KeyValue}; -use futures::future::BoxFuture; -use futures::stream::{FuturesUnordered, StreamExt}; - use log::*; -use tokio::{select, sync::watch, time::sleep}; +use tokio::{select, sync::watch}; use crate::consul; @@ -93,6 +90,9 @@ impl Eq for ProxyEntry {} #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct ProxyEntryFlags { + /// Is the target healthy? + pub healthy: bool, + /// Is the target the same node as we are running on? /// (if yes priorize it over other matching targets) pub same_node: bool, @@ -119,6 +119,9 @@ impl std::fmt::Display for ProxyEntry { self.path_prefix.as_deref().unwrap_or_default(), self.priority )?; + if !self.flags.healthy { + write!(f, " UNHEALTHY")?; + } if self.flags.same_node { write!(f, " OURSELF")?; } else if self.flags.same_site { @@ -141,16 +144,6 @@ pub struct ProxyConfig { pub entries: Vec<ProxyEntry>, } -fn retry_to_time(retries: u32, max_time: Duration) -> Duration { - // 1.2^x seems to be a good value to exponentially increase time at a good pace - // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 - // minutes - Duration::from_secs(cmp::min( - max_time.as_secs(), - 1.2f64.powf(retries as f64) as u64, - )) -} - fn parse_tricot_tag( service_name: String, tag: &str, @@ -209,63 +202,55 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> { } } -fn parse_consul_catalog( - catalog: &consul::catalog::CatalogNode, - same_node: bool, - same_site: bool, +fn parse_consul_service( + s: &consul::catalog::HealthServiceNode, + mut flags: ProxyEntryFlags, ) -> Vec<ProxyEntry> { - trace!("Parsing node catalog: {:#?}", catalog); + trace!("Parsing service: {:#?}", s); let mut entries = vec![]; - for (_, svc) in catalog.services.iter() { - let ip_addr = match svc.address.parse() { + let ip_addr = match s.service.address.parse() { + Ok(ip) => ip, + _ => match s.node.address.parse() { Ok(ip) => ip, - _ => match catalog.node.address.parse() { - Ok(ip) => ip, - _ => { - warn!( - "Could not get address for service {} at node {}", - svc.service, catalog.node.node - ); - continue; - } - }, - }; - let addr = SocketAddr::new(ip_addr, svc.port); - - let (site_lb, global_lb) = if svc.tags.contains(&"tricot-global-lb".into()) { - (false, true) - } else if svc.tags.contains(&"tricot-site-lb".into()) { - (true, false) - } else { - (false, false) - }; - - let flags = ProxyEntryFlags { - same_node, - same_site, - site_lb, - global_lb, - }; - - let mut add_headers = vec![]; - for tag in svc.tags.iter() { - if let Some(pair) = parse_tricot_add_header_tag(tag) { - add_headers.push(pair); + _ => { + warn!( + "Could not get address for service {} at node {}", + s.service.service, s.node.node + ); + return vec![]; } + }, + }; + let addr = SocketAddr::new(ip_addr, s.service.port); + + if s.service.tags.contains(&"tricot-global-lb".into()) { + flags.global_lb = true; + } else if s.service.tags.contains(&"tricot-site-lb".into()) { + flags.site_lb = true; + }; + + let mut add_headers = vec![]; + for tag in s.service.tags.iter() { + if let Some(pair) = parse_tricot_add_header_tag(tag) { + add_headers.push(pair); } + } - for tag in svc.tags.iter() { - if let Some(ent) = - parse_tricot_tag(svc.service.clone(), tag, addr, &add_headers[..], flags) - { - entries.push(ent); - } + for tag in s.service.tags.iter() { + if let Some(ent) = parse_tricot_tag( + s.service.service.clone(), + tag, + addr, + &add_headers[..], + flags, + ) { + entries.push(ent); } } - trace!("Result of parsing catalog:"); + trace!("Result of parsing service:"); for ent in entries.iter() { trace!(" {}", ent); } @@ -273,13 +258,6 @@ fn parse_consul_catalog( entries } -#[derive(Default)] -struct NodeWatchState { - last_idx: Option<usize>, - last_catalog: Option<consul::catalog::CatalogNode>, - retries: u32, -} - pub fn spawn_proxy_config_task( consul: consul::Consul, local_node: String, @@ -293,108 +271,50 @@ pub fn spawn_proxy_config_task( let consul = Arc::new(consul); tokio::spawn(async move { - let mut nodes = HashMap::new(); - let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); - - let mut node_site = HashMap::new(); + let mut catalog_rx = consul.watch_all_service_health(Duration::from_secs(300)); + let mut local_node_site = None; while !*must_exit.borrow() { - let list_nodes = select! { - ln = consul.catalog_node_list(None) => ln, + select! { + _ = catalog_rx.changed() => (), _ = must_exit.changed() => continue, }; - match list_nodes { - Ok(consul_nodes) => { - info!("Watched consul nodes: {:?}", consul_nodes); - for consul_node in consul_nodes.into_inner() { - let node = &consul_node.node; - if !nodes.contains_key(node) { - nodes.insert(node.clone(), NodeWatchState::default()); - - let node = node.to_string(); - let consul = consul.clone(); - - watches.push(Box::pin(async move { - let res = consul.catalog_node(&node, None).await; - (node, res) - })); - } - if let Some(site) = consul_node.meta.get("site") { - node_site.insert(node.clone(), site.clone()); + let services = catalog_rx.borrow_and_update().clone(); + if local_node_site.is_none() { + for (_, svcnodes) in services.iter() { + for svcnode in svcnodes.iter() { + if svcnode.node.node == local_node { + if let Some(site) = svcnode.node.meta.get("site") { + local_node_site = Some(site.to_string()); + } } } } - Err(e) => { - warn!("Could not get Consul node list: {}", e); - } } - let next_watch = select! { - nw = watches.next() => nw, - _ = must_exit.changed() => continue, - }; - - let (node, res): (String, Result<_>) = match next_watch { - Some(v) => v, - None => { - warn!("No nodes currently watched in proxy_config.rs"); - sleep(Duration::from_secs(10)).await; - continue; - } - }; + let mut entries = vec![]; - match res { - Ok(res) => { - let new_idx = res.index(); - let catalog = res.into_inner(); - - let mut watch_state = nodes.get_mut(&node).unwrap(); - watch_state.last_idx = Some(new_idx); - watch_state.last_catalog = catalog; - watch_state.retries = 0; - - let idx = watch_state.last_idx; - let consul = consul.clone(); - watches.push(Box::pin(async move { - let res = consul.catalog_node(&node, idx).await; - (node, res) - })); - } - Err(e) => { - let mut watch_state = nodes.get_mut(&node).unwrap(); - watch_state.retries += 1; - watch_state.last_idx = None; - - let will_retry_in = - retry_to_time(watch_state.retries, Duration::from_secs(600)); - error!( - "Failed to query consul for node {}. Will retry in {}s. {}", - node, - will_retry_in.as_secs(), - e - ); - - let consul = consul.clone(); - watches.push(Box::pin(async move { - sleep(will_retry_in).await; - let res = consul.catalog_node(&node, None).await; - (node, res) - })); - continue; - } - } + for (_service, svcnodes) in services.iter() { + for svcnode in svcnodes.iter() { + let healthy = !svcnode.checks.iter().any(|x| x.status == "critical"); - let mut entries = vec![]; - for (node_name, watch_state) in nodes.iter() { - if let Some(catalog) = &watch_state.last_catalog { - let same_node = *node_name == local_node; - let same_site = match (node_site.get(node_name), node_site.get(&local_node)) { + let same_node = svcnode.node.node == local_node; + let same_site = match (svcnode.node.meta.get("site"), local_node_site.as_ref()) + { (Some(s1), Some(s2)) => s1 == s2, _ => false, }; - entries.extend(parse_consul_catalog(catalog, same_node, same_site)); + let flags = ProxyEntryFlags { + healthy, + same_node, + same_site, + site_lb: false, + global_lb: false, + }; + + entries.extend(parse_consul_service(&svcnode, flags)); } } |