diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dns_config.rs | 281 | ||||
-rw-r--r-- | src/dns_updater.rs | 98 | ||||
-rw-r--r-- | src/main.rs | 143 | ||||
-rw-r--r-- | src/provider/gandi.rs | 102 | ||||
-rw-r--r-- | src/provider/mod.rs | 20 |
5 files changed, 644 insertions, 0 deletions
diff --git a/src/dns_config.rs b/src/dns_config.rs new file mode 100644 index 0000000..188c6fc --- /dev/null +++ b/src/dns_config.rs @@ -0,0 +1,281 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::sync::Arc; +use std::{cmp, time::Duration}; + +use anyhow::Result; + +use futures::future::BoxFuture; +use futures::stream::{FuturesUnordered, StreamExt}; + +use log::*; +use tokio::{select, sync::watch, time::sleep}; + +use df_consul::*; + +const IPV4_TARGET_METADATA_TAG: &str = "public_ipv4"; +const IPV6_TARGET_METADATA_TAG: &str = "public_ipv6"; +const CNAME_TARGET_METADATA_TAG: &str = "cname_target"; + +// ---- Extract DNS config from Consul catalog ---- + +#[derive(Debug)] +pub struct DnsConfig { + pub entries: HashMap<DnsEntryKey, DnsEntryValue>, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub struct DnsEntryKey { + pub domain: String, + pub subdomain: String, + pub record_type: DnsRecordType, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct DnsEntryValue { + pub targets: HashSet<String>, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +#[allow(clippy::upper_case_acronyms)] +pub enum DnsRecordType { + A, + AAAA, + CNAME, +} + +impl DnsConfig { + pub fn new() -> Self { + Self { + entries: HashMap::new(), + } + } + + fn add(&mut self, k: DnsEntryKey, v: DnsEntryValue) { + if let Some(ent) = self.entries.get_mut(&k) { + ent.targets.extend(v.targets); + } else { + self.entries.insert(k, v); + } + } +} + +fn parse_d53_tag(tag: &str, node: &ConsulNode) -> Option<(DnsEntryKey, DnsEntryValue)> { + let splits = tag.split(' ').collect::<Vec<_>>(); + if splits.len() != 3 { + return None; + } + + let (record_type, targets) = match splits[0] { + "d53-a" => match node.meta.get(IPV4_TARGET_METADATA_TAG) { + Some(tgt) => (DnsRecordType::A, [tgt.to_string()].into_iter().collect()), + None => { + warn!("Got d53-a tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV4_TARGET_METADATA_TAG); + return None; + } + }, + "d53-aaaa" => match node.meta.get(IPV6_TARGET_METADATA_TAG) { + Some(tgt) => (DnsRecordType::AAAA, [tgt.to_string()].into_iter().collect()), + None => { + warn!("Got d53-aaaa tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV6_TARGET_METADATA_TAG); + return None; + } + }, + "d53-cname" => match node.meta.get(CNAME_TARGET_METADATA_TAG) { + Some(tgt) => ( + DnsRecordType::CNAME, + [tgt.to_string()].into_iter().collect(), + ), + None => { + warn!("Got d53-cname tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, CNAME_TARGET_METADATA_TAG); + return None; + } + }, + _ => return None, + }; + + Some(( + DnsEntryKey { + domain: splits[1].to_string(), + subdomain: splits[2].to_string(), + record_type, + }, + DnsEntryValue { targets }, + )) +} + +fn parse_consul_catalog(catalog: &ConsulNodeCatalog, dns_config: &mut DnsConfig) { + trace!("Parsing node catalog: {:#?}", catalog); + + for (_, svc) in catalog.services.iter() { + for tag in svc.tags.iter() { + if let Some((k, v)) = parse_d53_tag(tag, &catalog.node) { + dns_config.add(k, v); + } + } + } +} + +#[derive(Default)] +struct NodeWatchState { + last_idx: Option<usize>, + last_catalog: Option<ConsulNodeCatalog>, + retries: u32, +} + +pub fn spawn_dns_config_task( + consul: Consul, + mut must_exit: watch::Receiver<bool>, +) -> watch::Receiver<Arc<DnsConfig>> { + let (tx, rx) = watch::channel(Arc::new(DnsConfig::new())); + + let consul = Arc::new(consul); + + tokio::spawn(async move { + let mut nodes = HashMap::new(); + let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); + + let mut node_site = HashMap::new(); + + while !*must_exit.borrow() { + let list_nodes = select! { + ln = consul.list_nodes() => ln, + _ = must_exit.changed() => continue, + }; + + match list_nodes { + Ok(consul_nodes) => { + debug!("Watched consul nodes: {:?}", consul_nodes); + for consul_node in consul_nodes { + let node = &consul_node.node; + if !nodes.contains_key(node) { + nodes.insert(node.clone(), NodeWatchState::default()); + + let node = node.to_string(); + let consul = consul.clone(); + + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, None).await; + (node, res) + })); + } + if let Some(site) = consul_node.meta.get("site") { + node_site.insert(node.clone(), site.clone()); + } + } + } + Err(e) => { + error!("Could not get Consul node list: {}", e); + } + } + + let next_watch = select! { + nw = watches.next() => nw, + _ = must_exit.changed() => continue, + }; + + let (node, res): (String, Result<_>) = match next_watch { + Some(v) => v, + None => { + warn!("No nodes currently watched in dns_config.rs"); + sleep(Duration::from_secs(10)).await; + continue; + } + }; + + match res { + Ok((catalog, new_idx)) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.last_idx = Some(new_idx); + watch_state.last_catalog = Some(catalog); + watch_state.retries = 0; + + let idx = watch_state.last_idx; + let consul = consul.clone(); + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, idx).await; + (node, res) + })); + } + Err(e) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.retries += 1; + watch_state.last_idx = None; + + let will_retry_in = + retry_to_time(watch_state.retries, Duration::from_secs(600)); + error!( + "Failed to query consul for node {}. Will retry in {}s. {}", + node, + will_retry_in.as_secs(), + e + ); + + let consul = consul.clone(); + watches.push(Box::pin(async move { + sleep(will_retry_in).await; + let res = consul.watch_node(&node, None).await; + (node, res) + })); + continue; + } + } + + let mut dns_config = DnsConfig::new(); + for (_, watch_state) in nodes.iter() { + if let Some(catalog) = &watch_state.last_catalog { + parse_consul_catalog(catalog, &mut dns_config); + } + } + + tx.send(Arc::new(dns_config)).expect("Internal error"); + } + }); + + rx +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 + // minutes + Duration::from_secs(cmp::min( + max_time.as_secs(), + 1.2f64.powf(retries as f64) as u64, + )) +} + +// ---- Display impls ---- + +impl std::fmt::Display for DnsRecordType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DnsRecordType::A => write!(f, "A"), + DnsRecordType::AAAA => write!(f, "AAAA"), + DnsRecordType::CNAME => write!(f, "CNAME"), + } + } +} + +impl std::fmt::Display for DnsEntryKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}.{} IN {}", + self.subdomain, self.domain, self.record_type + ) + } +} + +impl std::fmt::Display for DnsEntryValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (i, tgt) in self.targets.iter().enumerate() { + if i > 0 { + write!(f, " ")?; + } + write!(f, "{}", tgt)?; + } + write!(f, "]") + } +} diff --git a/src/dns_updater.rs b/src/dns_updater.rs new file mode 100644 index 0000000..06ba408 --- /dev/null +++ b/src/dns_updater.rs @@ -0,0 +1,98 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::sync::Arc; + +use anyhow::{anyhow, bail, Result}; +use log::*; +use tokio::select; +use tokio::sync::watch; + +use crate::dns_config::*; +use crate::provider::DnsProvider; + +pub async fn dns_updater_task( + mut rx_dns_config: watch::Receiver<Arc<DnsConfig>>, + provider: Box<dyn DnsProvider>, + allowed_domains: Vec<String>, + mut must_exit: watch::Receiver<bool>, +) { + let mut config = Arc::new(DnsConfig::new()); + while !*must_exit.borrow() { + select!( + c = rx_dns_config.changed() => { + if c.is_err() { + break; + } + } + _ = must_exit.changed() => continue, + ); + let new_config: Arc<DnsConfig> = rx_dns_config.borrow().clone(); + + for (k, v) in new_config.entries.iter() { + if config.entries.get(k) != Some(v) { + let fulldomain = format!("{}.{}", k.subdomain, k.domain); + if !allowed_domains.iter().any(|d| fulldomain.ends_with(d)) { + error!( + "Got an entry for domain {} which is not in allowed list", + k.domain + ); + continue; + } + + info!("Updating {} {}", k, v); + if let Err(e) = update_dns_entry(k, v, provider.as_ref()).await { + error!("Unable to update entry {} {}: {}", k, v, e); + } + } + } + + config = new_config; + } +} + +async fn update_dns_entry( + key: &DnsEntryKey, + value: &DnsEntryValue, + provider: &dyn DnsProvider, +) -> Result<()> { + if value.targets.is_empty() { + bail!("zero targets (internal error)"); + } + + match key.record_type { + DnsRecordType::A => { + let mut targets = vec![]; + for tgt in value.targets.iter() { + targets.push( + tgt.parse::<Ipv4Addr>() + .map_err(|_| anyhow!("Invalid ipv4 address: {}", tgt))?, + ); + } + provider + .update_a(&key.domain, &key.subdomain, &targets) + .await?; + } + DnsRecordType::AAAA => { + let mut targets = vec![]; + for tgt in value.targets.iter() { + targets.push( + tgt.parse::<Ipv6Addr>() + .map_err(|_| anyhow!("Invalid ipv6 address: {}", tgt))?, + ); + } + provider + .update_aaaa(&key.domain, &key.subdomain, &targets) + .await?; + } + DnsRecordType::CNAME => { + let mut targets = value.targets.iter().cloned().collect::<Vec<_>>(); + if targets.len() > 1 { + targets.sort(); + warn!("Several CNAME targets for {}: {:?}. Taking first one in alphabetical order. Consider switching to a single global target instead.", key, targets); + } + provider + .update_cname(&key.domain, &key.subdomain, &targets[0]) + .await?; + } + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a55f271 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,143 @@ +use std::sync::Arc; + +use log::*; +use structopt::StructOpt; +use tokio::select; +use tokio::sync::watch; + +mod dns_config; +mod dns_updater; +mod provider; + +#[derive(StructOpt, Debug)] +#[structopt(name = "d53")] +pub struct Opt { + /// Address of consul server + #[structopt( + long = "consul-addr", + env = "D53_CONSUL_HOST", + default_value = "http://127.0.0.1:8500" + )] + pub consul_addr: String, + + /// CA certificate for Consul server with TLS + #[structopt(long = "consul-ca-cert", env = "D53_CONSUL_CA_CERT")] + pub consul_ca_cert: Option<String>, + + /// Skip TLS verification for Consul + #[structopt(long = "consul-tls-skip-verify", env = "D53_CONSUL_TLS_SKIP_VERIFY")] + pub consul_tls_skip_verify: bool, + + /// Client certificate for Consul server with TLS + #[structopt(long = "consul-client-cert", env = "D53_CONSUL_CLIENT_CERT")] + pub consul_client_cert: Option<String>, + + /// Client key for Consul server with TLS + #[structopt(long = "consul-client-key", env = "D53_CONSUL_CLIENT_KEY")] + pub consul_client_key: Option<String>, + + /// DNS provider + #[structopt(long = "provider", env = "D53_PROVIDER")] + pub provider: String, + + /// Allowed domains + #[structopt(long = "allowed-domains", env = "D53_ALLOWED_DOMAINS")] + pub allowed_domains: String, + + /// API key for Gandi DNS provider + #[structopt(long = "gandi-api-key", env = "D53_GANDI_API_KEY")] + pub gandi_api_key: Option<String>, +} + +#[tokio::main] +async fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "tricot=info") + } + pretty_env_logger::init(); + + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + + let opt = Opt::from_args(); + + info!("Starting D53"); + + let (exit_signal, _) = watch_ctrl_c(); + + let consul_config = df_consul::ConsulConfig { + addr: opt.consul_addr.clone(), + ca_cert: opt.consul_ca_cert.clone(), + tls_skip_verify: opt.consul_tls_skip_verify, + client_cert: opt.consul_client_cert.clone(), + client_key: opt.consul_client_key.clone(), + }; + + let consul = df_consul::Consul::new(consul_config, "").expect("Cannot build Consul"); + + let provider: Box<dyn provider::DnsProvider> = match opt.provider.as_str() { + "gandi" => Box::new( + provider::gandi::GandiProvider::new(&opt).expect("Cannot initialize Gandi provier"), + ), + p => panic!("Unsupported DNS provider: {}", p), + }; + + let allowed_domains = opt + .allowed_domains + .split(',') + .map(ToString::to_string) + .collect::<Vec<_>>(); + + let rx_dns_config = dns_config::spawn_dns_config_task(consul.clone(), exit_signal.clone()); + + let updater_task = tokio::spawn(dns_updater::dns_updater_task( + rx_dns_config.clone(), + provider, + allowed_domains, + exit_signal.clone(), + )); + let dump_task = tokio::spawn(dump_config_on_change(rx_dns_config, exit_signal)); + + updater_task.await.expect("Tokio task await failure"); + dump_task.await.expect("Tokio task await failure"); +} + +async fn dump_config_on_change( + mut rx_dns_config: watch::Receiver<Arc<dns_config::DnsConfig>>, + mut must_exit: watch::Receiver<bool>, +) { + while !*must_exit.borrow() { + select!( + c = rx_dns_config.changed() => { + if c.is_err() { + break; + } + } + _ = must_exit.changed() => continue, + ); + println!("---- DNS CONFIGURATION ----"); + for (k, v) in rx_dns_config.borrow().entries.iter() { + println!(" {} {}", k, v); + } + println!(); + } +} + +/// Creates a watch that contains `false`, and that changes +/// to `true` when a Ctrl+C signal is received. +pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} diff --git a/src/provider/gandi.rs b/src/provider/gandi.rs new file mode 100644 index 0000000..1f4f51f --- /dev/null +++ b/src/provider/gandi.rs @@ -0,0 +1,102 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use log::{info, warn}; +use reqwest::header; +use serde::Serialize; + +use crate::provider::DnsProvider; +use crate::Opt; + +pub struct GandiProvider { + client: reqwest::Client, +} + +impl GandiProvider { + pub fn new(opts: &Opt) -> Result<Self> { + let api_key = opts + .gandi_api_key + .clone() + .ok_or_else(|| anyhow!("Must specify D53_GANDI_API_KEY"))?; + + let mut headers = header::HeaderMap::new(); + let mut auth_value = header::HeaderValue::from_str(&format!("Apikey {}", api_key))?; + auth_value.set_sensitive(true); + headers.insert(header::AUTHORIZATION, auth_value); + + let client = reqwest::Client::builder() + .default_headers(headers) + .use_rustls_tls() + .build()?; + + Ok(Self { client }) + } + + async fn put_rrset(&self, url: &str, rrset: &GandiRrset) -> Result<()> { + info!("PUT {} with {:?}", url, rrset); + let http = self.client.put(url).json(rrset).send().await?; + + if !http.status().is_success() { + warn!("PUT {} returned {}", url, http.status()); + } + + http.error_for_status()?; + Ok(()) + } +} + +#[async_trait] +impl DnsProvider for GandiProvider { + fn provider(&self) -> &'static str { + "gandi" + } + + async fn update_a(&self, domain: &str, subdomain: &str, targets: &[Ipv4Addr]) -> Result<()> { + let url = format!( + "https://api.gandi.net/v5/livedns/domains/{}/records/{}/A", + domain, subdomain + ); + + let rrset = GandiRrset { + rrset_values: targets.iter().map(ToString::to_string).collect::<Vec<_>>(), + rrset_ttl: 300, + }; + + self.put_rrset(&url, &rrset).await + } + + async fn update_aaaa(&self, domain: &str, subdomain: &str, targets: &[Ipv6Addr]) -> Result<()> { + let url = format!( + "https://api.gandi.net/v5/livedns/domains/{}/records/{}/AAAA", + domain, subdomain + ); + + let rrset = GandiRrset { + rrset_values: targets.iter().map(ToString::to_string).collect::<Vec<_>>(), + rrset_ttl: 300, + }; + + self.put_rrset(&url, &rrset).await + } + + async fn update_cname(&self, domain: &str, subdomain: &str, target: &str) -> Result<()> { + let url = format!( + "https://api.gandi.net/v5/livedns/domains/{}/records/{}/CNAME", + domain, subdomain + ); + + let rrset = GandiRrset { + rrset_values: vec![target.to_string()], + rrset_ttl: 300, + }; + + self.put_rrset(&url, &rrset).await + } +} + +#[derive(Serialize, Debug)] +struct GandiRrset { + rrset_values: Vec<String>, + rrset_ttl: u32, +} diff --git a/src/provider/mod.rs b/src/provider/mod.rs new file mode 100644 index 0000000..6527631 --- /dev/null +++ b/src/provider/mod.rs @@ -0,0 +1,20 @@ +pub mod gandi; + +use std::net::{Ipv4Addr, Ipv6Addr}; + +use anyhow::Result; +use async_trait::async_trait; + +#[async_trait] +pub trait DnsProvider: Send + Sync { + fn provider(&self) -> &'static str; + async fn update_a(&self, domain: &str, subdomain: &str, targets: &[Ipv4Addr]) -> Result<()>; + async fn update_aaaa(&self, domain: &str, subdomain: &str, targets: &[Ipv6Addr]) -> Result<()>; + async fn update_cname(&self, domain: &str, subdomain: &str, target: &str) -> Result<()>; +} + +impl std::fmt::Debug for dyn DnsProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(f, "DnsProvider({})", self.provider()) + } +} |