aboutsummaryrefslogtreecommitdiff
path: root/src/dns_config.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/dns_config.rs')
-rw-r--r--src/dns_config.rs281
1 files changed, 281 insertions, 0 deletions
diff --git a/src/dns_config.rs b/src/dns_config.rs
new file mode 100644
index 0000000..188c6fc
--- /dev/null
+++ b/src/dns_config.rs
@@ -0,0 +1,281 @@
+use std::collections::{HashMap, HashSet};
+use std::fmt;
+use std::sync::Arc;
+use std::{cmp, time::Duration};
+
+use anyhow::Result;
+
+use futures::future::BoxFuture;
+use futures::stream::{FuturesUnordered, StreamExt};
+
+use log::*;
+use tokio::{select, sync::watch, time::sleep};
+
+use df_consul::*;
+
+const IPV4_TARGET_METADATA_TAG: &str = "public_ipv4";
+const IPV6_TARGET_METADATA_TAG: &str = "public_ipv6";
+const CNAME_TARGET_METADATA_TAG: &str = "cname_target";
+
+// ---- Extract DNS config from Consul catalog ----
+
+#[derive(Debug)]
+pub struct DnsConfig {
+ pub entries: HashMap<DnsEntryKey, DnsEntryValue>,
+}
+
+#[derive(Debug, Hash, PartialEq, Eq)]
+pub struct DnsEntryKey {
+ pub domain: String,
+ pub subdomain: String,
+ pub record_type: DnsRecordType,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct DnsEntryValue {
+ pub targets: HashSet<String>,
+}
+
+#[derive(Debug, Hash, PartialEq, Eq)]
+#[allow(clippy::upper_case_acronyms)]
+pub enum DnsRecordType {
+ A,
+ AAAA,
+ CNAME,
+}
+
+impl DnsConfig {
+ pub fn new() -> Self {
+ Self {
+ entries: HashMap::new(),
+ }
+ }
+
+ fn add(&mut self, k: DnsEntryKey, v: DnsEntryValue) {
+ if let Some(ent) = self.entries.get_mut(&k) {
+ ent.targets.extend(v.targets);
+ } else {
+ self.entries.insert(k, v);
+ }
+ }
+}
+
+fn parse_d53_tag(tag: &str, node: &ConsulNode) -> Option<(DnsEntryKey, DnsEntryValue)> {
+ let splits = tag.split(' ').collect::<Vec<_>>();
+ if splits.len() != 3 {
+ return None;
+ }
+
+ let (record_type, targets) = match splits[0] {
+ "d53-a" => match node.meta.get(IPV4_TARGET_METADATA_TAG) {
+ Some(tgt) => (DnsRecordType::A, [tgt.to_string()].into_iter().collect()),
+ None => {
+ warn!("Got d53-a tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV4_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ "d53-aaaa" => match node.meta.get(IPV6_TARGET_METADATA_TAG) {
+ Some(tgt) => (DnsRecordType::AAAA, [tgt.to_string()].into_iter().collect()),
+ None => {
+ warn!("Got d53-aaaa tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV6_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ "d53-cname" => match node.meta.get(CNAME_TARGET_METADATA_TAG) {
+ Some(tgt) => (
+ DnsRecordType::CNAME,
+ [tgt.to_string()].into_iter().collect(),
+ ),
+ None => {
+ warn!("Got d53-cname tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, CNAME_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ _ => return None,
+ };
+
+ Some((
+ DnsEntryKey {
+ domain: splits[1].to_string(),
+ subdomain: splits[2].to_string(),
+ record_type,
+ },
+ DnsEntryValue { targets },
+ ))
+}
+
+fn parse_consul_catalog(catalog: &ConsulNodeCatalog, dns_config: &mut DnsConfig) {
+ trace!("Parsing node catalog: {:#?}", catalog);
+
+ for (_, svc) in catalog.services.iter() {
+ for tag in svc.tags.iter() {
+ if let Some((k, v)) = parse_d53_tag(tag, &catalog.node) {
+ dns_config.add(k, v);
+ }
+ }
+ }
+}
+
+#[derive(Default)]
+struct NodeWatchState {
+ last_idx: Option<usize>,
+ last_catalog: Option<ConsulNodeCatalog>,
+ retries: u32,
+}
+
+pub fn spawn_dns_config_task(
+ consul: Consul,
+ mut must_exit: watch::Receiver<bool>,
+) -> watch::Receiver<Arc<DnsConfig>> {
+ let (tx, rx) = watch::channel(Arc::new(DnsConfig::new()));
+
+ let consul = Arc::new(consul);
+
+ tokio::spawn(async move {
+ let mut nodes = HashMap::new();
+ let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
+
+ let mut node_site = HashMap::new();
+
+ while !*must_exit.borrow() {
+ let list_nodes = select! {
+ ln = consul.list_nodes() => ln,
+ _ = must_exit.changed() => continue,
+ };
+
+ match list_nodes {
+ Ok(consul_nodes) => {
+ debug!("Watched consul nodes: {:?}", consul_nodes);
+ for consul_node in consul_nodes {
+ let node = &consul_node.node;
+ if !nodes.contains_key(node) {
+ nodes.insert(node.clone(), NodeWatchState::default());
+
+ let node = node.to_string();
+ let consul = consul.clone();
+
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
+ }
+ if let Some(site) = consul_node.meta.get("site") {
+ node_site.insert(node.clone(), site.clone());
+ }
+ }
+ }
+ Err(e) => {
+ error!("Could not get Consul node list: {}", e);
+ }
+ }
+
+ let next_watch = select! {
+ nw = watches.next() => nw,
+ _ = must_exit.changed() => continue,
+ };
+
+ let (node, res): (String, Result<_>) = match next_watch {
+ Some(v) => v,
+ None => {
+ warn!("No nodes currently watched in dns_config.rs");
+ sleep(Duration::from_secs(10)).await;
+ continue;
+ }
+ };
+
+ match res {
+ Ok((catalog, new_idx)) => {
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.last_idx = Some(new_idx);
+ watch_state.last_catalog = Some(catalog);
+ watch_state.retries = 0;
+
+ let idx = watch_state.last_idx;
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, idx).await;
+ (node, res)
+ }));
+ }
+ Err(e) => {
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.retries += 1;
+ watch_state.last_idx = None;
+
+ let will_retry_in =
+ retry_to_time(watch_state.retries, Duration::from_secs(600));
+ error!(
+ "Failed to query consul for node {}. Will retry in {}s. {}",
+ node,
+ will_retry_in.as_secs(),
+ e
+ );
+
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ sleep(will_retry_in).await;
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
+ continue;
+ }
+ }
+
+ let mut dns_config = DnsConfig::new();
+ for (_, watch_state) in nodes.iter() {
+ if let Some(catalog) = &watch_state.last_catalog {
+ parse_consul_catalog(catalog, &mut dns_config);
+ }
+ }
+
+ tx.send(Arc::new(dns_config)).expect("Internal error");
+ }
+ });
+
+ rx
+}
+
+fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
+ // 1.2^x seems to be a good value to exponentially increase time at a good pace
+ // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
+ // minutes
+ Duration::from_secs(cmp::min(
+ max_time.as_secs(),
+ 1.2f64.powf(retries as f64) as u64,
+ ))
+}
+
+// ---- Display impls ----
+
+impl std::fmt::Display for DnsRecordType {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ DnsRecordType::A => write!(f, "A"),
+ DnsRecordType::AAAA => write!(f, "AAAA"),
+ DnsRecordType::CNAME => write!(f, "CNAME"),
+ }
+ }
+}
+
+impl std::fmt::Display for DnsEntryKey {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "{}.{} IN {}",
+ self.subdomain, self.domain, self.record_type
+ )
+ }
+}
+
+impl std::fmt::Display for DnsEntryValue {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "[")?;
+ for (i, tgt) in self.targets.iter().enumerate() {
+ if i > 0 {
+ write!(f, " ")?;
+ }
+ write!(f, "{}", tgt)?;
+ }
+ write!(f, "]")
+ }
+}