diff options
Diffstat (limited to 'src/dns_config.rs')
-rw-r--r-- | src/dns_config.rs | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/src/dns_config.rs b/src/dns_config.rs new file mode 100644 index 0000000..188c6fc --- /dev/null +++ b/src/dns_config.rs @@ -0,0 +1,281 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::sync::Arc; +use std::{cmp, time::Duration}; + +use anyhow::Result; + +use futures::future::BoxFuture; +use futures::stream::{FuturesUnordered, StreamExt}; + +use log::*; +use tokio::{select, sync::watch, time::sleep}; + +use df_consul::*; + +const IPV4_TARGET_METADATA_TAG: &str = "public_ipv4"; +const IPV6_TARGET_METADATA_TAG: &str = "public_ipv6"; +const CNAME_TARGET_METADATA_TAG: &str = "cname_target"; + +// ---- Extract DNS config from Consul catalog ---- + +#[derive(Debug)] +pub struct DnsConfig { + pub entries: HashMap<DnsEntryKey, DnsEntryValue>, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub struct DnsEntryKey { + pub domain: String, + pub subdomain: String, + pub record_type: DnsRecordType, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct DnsEntryValue { + pub targets: HashSet<String>, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +#[allow(clippy::upper_case_acronyms)] +pub enum DnsRecordType { + A, + AAAA, + CNAME, +} + +impl DnsConfig { + pub fn new() -> Self { + Self { + entries: HashMap::new(), + } + } + + fn add(&mut self, k: DnsEntryKey, v: DnsEntryValue) { + if let Some(ent) = self.entries.get_mut(&k) { + ent.targets.extend(v.targets); + } else { + self.entries.insert(k, v); + } + } +} + +fn parse_d53_tag(tag: &str, node: &ConsulNode) -> Option<(DnsEntryKey, DnsEntryValue)> { + let splits = tag.split(' ').collect::<Vec<_>>(); + if splits.len() != 3 { + return None; + } + + let (record_type, targets) = match splits[0] { + "d53-a" => match node.meta.get(IPV4_TARGET_METADATA_TAG) { + Some(tgt) => (DnsRecordType::A, [tgt.to_string()].into_iter().collect()), + None => { + warn!("Got d53-a tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV4_TARGET_METADATA_TAG); + return None; + } + }, + "d53-aaaa" => match node.meta.get(IPV6_TARGET_METADATA_TAG) { + Some(tgt) => (DnsRecordType::AAAA, [tgt.to_string()].into_iter().collect()), + None => { + warn!("Got d53-aaaa tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV6_TARGET_METADATA_TAG); + return None; + } + }, + "d53-cname" => match node.meta.get(CNAME_TARGET_METADATA_TAG) { + Some(tgt) => ( + DnsRecordType::CNAME, + [tgt.to_string()].into_iter().collect(), + ), + None => { + warn!("Got d53-cname tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, CNAME_TARGET_METADATA_TAG); + return None; + } + }, + _ => return None, + }; + + Some(( + DnsEntryKey { + domain: splits[1].to_string(), + subdomain: splits[2].to_string(), + record_type, + }, + DnsEntryValue { targets }, + )) +} + +fn parse_consul_catalog(catalog: &ConsulNodeCatalog, dns_config: &mut DnsConfig) { + trace!("Parsing node catalog: {:#?}", catalog); + + for (_, svc) in catalog.services.iter() { + for tag in svc.tags.iter() { + if let Some((k, v)) = parse_d53_tag(tag, &catalog.node) { + dns_config.add(k, v); + } + } + } +} + +#[derive(Default)] +struct NodeWatchState { + last_idx: Option<usize>, + last_catalog: Option<ConsulNodeCatalog>, + retries: u32, +} + +pub fn spawn_dns_config_task( + consul: Consul, + mut must_exit: watch::Receiver<bool>, +) -> watch::Receiver<Arc<DnsConfig>> { + let (tx, rx) = watch::channel(Arc::new(DnsConfig::new())); + + let consul = Arc::new(consul); + + tokio::spawn(async move { + let mut nodes = HashMap::new(); + let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); + + let mut node_site = HashMap::new(); + + while !*must_exit.borrow() { + let list_nodes = select! { + ln = consul.list_nodes() => ln, + _ = must_exit.changed() => continue, + }; + + match list_nodes { + Ok(consul_nodes) => { + debug!("Watched consul nodes: {:?}", consul_nodes); + for consul_node in consul_nodes { + let node = &consul_node.node; + if !nodes.contains_key(node) { + nodes.insert(node.clone(), NodeWatchState::default()); + + let node = node.to_string(); + let consul = consul.clone(); + + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, None).await; + (node, res) + })); + } + if let Some(site) = consul_node.meta.get("site") { + node_site.insert(node.clone(), site.clone()); + } + } + } + Err(e) => { + error!("Could not get Consul node list: {}", e); + } + } + + let next_watch = select! { + nw = watches.next() => nw, + _ = must_exit.changed() => continue, + }; + + let (node, res): (String, Result<_>) = match next_watch { + Some(v) => v, + None => { + warn!("No nodes currently watched in dns_config.rs"); + sleep(Duration::from_secs(10)).await; + continue; + } + }; + + match res { + Ok((catalog, new_idx)) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.last_idx = Some(new_idx); + watch_state.last_catalog = Some(catalog); + watch_state.retries = 0; + + let idx = watch_state.last_idx; + let consul = consul.clone(); + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, idx).await; + (node, res) + })); + } + Err(e) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.retries += 1; + watch_state.last_idx = None; + + let will_retry_in = + retry_to_time(watch_state.retries, Duration::from_secs(600)); + error!( + "Failed to query consul for node {}. Will retry in {}s. {}", + node, + will_retry_in.as_secs(), + e + ); + + let consul = consul.clone(); + watches.push(Box::pin(async move { + sleep(will_retry_in).await; + let res = consul.watch_node(&node, None).await; + (node, res) + })); + continue; + } + } + + let mut dns_config = DnsConfig::new(); + for (_, watch_state) in nodes.iter() { + if let Some(catalog) = &watch_state.last_catalog { + parse_consul_catalog(catalog, &mut dns_config); + } + } + + tx.send(Arc::new(dns_config)).expect("Internal error"); + } + }); + + rx +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 + // minutes + Duration::from_secs(cmp::min( + max_time.as_secs(), + 1.2f64.powf(retries as f64) as u64, + )) +} + +// ---- Display impls ---- + +impl std::fmt::Display for DnsRecordType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DnsRecordType::A => write!(f, "A"), + DnsRecordType::AAAA => write!(f, "AAAA"), + DnsRecordType::CNAME => write!(f, "CNAME"), + } + } +} + +impl std::fmt::Display for DnsEntryKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}.{} IN {}", + self.subdomain, self.domain, self.record_type + ) + } +} + +impl std::fmt::Display for DnsEntryValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, tgt) in self.targets.iter().enumerate() { + if i > 0 { + write!(f, " ")?; + } + write!(f, "{}", tgt)?; + } + write!(f, "]") + } +} |