diff options
Diffstat (limited to 'src/proxy_config.rs')
-rw-r--r-- | src/proxy_config.rs | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/src/proxy_config.rs b/src/proxy_config.rs new file mode 100644 index 0000000..e118a33 --- /dev/null +++ b/src/proxy_config.rs @@ -0,0 +1,113 @@ +use std::net::SocketAddr; +use std::{cmp, time::Duration}; + +use log::*; +use tokio::{sync::watch, time::sleep}; + +use crate::consul::*; + +// ---- Extract proxy config from Consul catalog ---- + +#[derive(Debug)] +pub struct ProxyEntry { + pub target_addr: SocketAddr, + pub host: String, + pub path_prefix: Option<String>, + pub priority: u32, +} + +#[derive(Debug)] +pub struct ProxyConfig { + pub entries: Vec<ProxyEntry>, +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 + // minutes + return Duration::from_secs(cmp::min( + max_time.as_secs(), + 1.2f64.powf(retries as f64) as u64, + )); +} + +fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option<ProxyEntry> { + let splits = tag.split(' ').collect::<Vec<_>>(); + if (splits.len() != 2 && splits.len() != 3) || splits[0] != "tricot" { + return None; + } + + let (host, path_prefix) = match splits[1].split_once('/') { + Some((h, p)) => (h, Some(p.to_string())), + None => (splits[1], None), + }; + + let priority = match splits.len() { + 3 => splits[2].parse().ok()?, + _ => 100, + }; + + Some(ProxyEntry { + target_addr, + host: host.to_string(), + path_prefix, + priority, + }) +} + +fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { + let mut entries = vec![]; + + for (_, svc) in catalog.services.iter() { + let ip_addr = match svc.address.parse() { + Ok(ip) => ip, + _ => continue, + }; + let addr = SocketAddr::new(ip_addr, svc.port); + for tag in svc.tags.iter() { + if let Some(ent) = parse_tricot_tag(addr, tag) { + entries.push(ent); + } + } + } + + ProxyConfig { entries } +} + +pub fn spawn_proxy_config_task(mut consul: Consul, node: &str) -> watch::Receiver<ProxyConfig> { + let (tx, rx) = watch::channel(ProxyConfig { + entries: Vec::new(), + }); + + let node = node.to_string(); + + tokio::spawn(async move { + let mut retries = 0; + + loop { + let catalog = match consul.watch_node(&node).await { + Ok(c) => c, + Err(e) => { + consul.watch_node_reset(); + retries = cmp::min(std::u32::MAX - 1, retries) + 1; + let will_retry_in = retry_to_time(retries, Duration::from_secs(600)); + error!( + "Failed to query consul. Will retry in {}s. {}", + will_retry_in.as_secs(), + e + ); + sleep(will_retry_in).await; + continue; + } + }; + retries = 0; + + let config = parse_consul_catalog(&catalog); + debug!("Extracted configuration: {:#?}", config); + + tx.send(config).expect("Internal error"); + } + }); + + rx +} |