aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-07 15:35:12 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-07 16:09:38 +0100
commited2653ae7dba9c072dcca1aed03b7cda0d910c85 (patch)
tree84452e729d4dbe8c27e3b7b7d4b3160833cfa40e /src
downloadD53-ed2653ae7dba9c072dcca1aed03b7cda0d910c85.tar.gz
D53-ed2653ae7dba9c072dcca1aed03b7cda0d910c85.zip
First version of D53 that does something
First working version
Diffstat (limited to 'src')
-rw-r--r--src/dns_config.rs281
-rw-r--r--src/dns_updater.rs98
-rw-r--r--src/main.rs143
-rw-r--r--src/provider/gandi.rs102
-rw-r--r--src/provider/mod.rs20
5 files changed, 644 insertions, 0 deletions
diff --git a/src/dns_config.rs b/src/dns_config.rs
new file mode 100644
index 0000000..188c6fc
--- /dev/null
+++ b/src/dns_config.rs
@@ -0,0 +1,281 @@
+use std::collections::{HashMap, HashSet};
+use std::fmt;
+use std::sync::Arc;
+use std::{cmp, time::Duration};
+
+use anyhow::Result;
+
+use futures::future::BoxFuture;
+use futures::stream::{FuturesUnordered, StreamExt};
+
+use log::*;
+use tokio::{select, sync::watch, time::sleep};
+
+use df_consul::*;
+
+const IPV4_TARGET_METADATA_TAG: &str = "public_ipv4";
+const IPV6_TARGET_METADATA_TAG: &str = "public_ipv6";
+const CNAME_TARGET_METADATA_TAG: &str = "cname_target";
+
+// ---- Extract DNS config from Consul catalog ----
+
+#[derive(Debug)]
+pub struct DnsConfig {
+ pub entries: HashMap<DnsEntryKey, DnsEntryValue>,
+}
+
+#[derive(Debug, Hash, PartialEq, Eq)]
+pub struct DnsEntryKey {
+ pub domain: String,
+ pub subdomain: String,
+ pub record_type: DnsRecordType,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct DnsEntryValue {
+ pub targets: HashSet<String>,
+}
+
+#[derive(Debug, Hash, PartialEq, Eq)]
+#[allow(clippy::upper_case_acronyms)]
+pub enum DnsRecordType {
+ A,
+ AAAA,
+ CNAME,
+}
+
+impl DnsConfig {
+ pub fn new() -> Self {
+ Self {
+ entries: HashMap::new(),
+ }
+ }
+
+ fn add(&mut self, k: DnsEntryKey, v: DnsEntryValue) {
+ if let Some(ent) = self.entries.get_mut(&k) {
+ ent.targets.extend(v.targets);
+ } else {
+ self.entries.insert(k, v);
+ }
+ }
+}
+
+fn parse_d53_tag(tag: &str, node: &ConsulNode) -> Option<(DnsEntryKey, DnsEntryValue)> {
+ let splits = tag.split(' ').collect::<Vec<_>>();
+ if splits.len() != 3 {
+ return None;
+ }
+
+ let (record_type, targets) = match splits[0] {
+ "d53-a" => match node.meta.get(IPV4_TARGET_METADATA_TAG) {
+ Some(tgt) => (DnsRecordType::A, [tgt.to_string()].into_iter().collect()),
+ None => {
+ warn!("Got d53-a tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV4_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ "d53-aaaa" => match node.meta.get(IPV6_TARGET_METADATA_TAG) {
+ Some(tgt) => (DnsRecordType::AAAA, [tgt.to_string()].into_iter().collect()),
+ None => {
+ warn!("Got d53-aaaa tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, IPV6_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ "d53-cname" => match node.meta.get(CNAME_TARGET_METADATA_TAG) {
+ Some(tgt) => (
+ DnsRecordType::CNAME,
+ [tgt.to_string()].into_iter().collect(),
+ ),
+ None => {
+ warn!("Got d53-cname tag `{}` but node {} does not have a {} metadata value. Tag is ignored.", tag, node.node, CNAME_TARGET_METADATA_TAG);
+ return None;
+ }
+ },
+ _ => return None,
+ };
+
+ Some((
+ DnsEntryKey {
+ domain: splits[1].to_string(),
+ subdomain: splits[2].to_string(),
+ record_type,
+ },
+ DnsEntryValue { targets },
+ ))
+}
+
+fn parse_consul_catalog(catalog: &ConsulNodeCatalog, dns_config: &mut DnsConfig) {
+ trace!("Parsing node catalog: {:#?}", catalog);
+
+ for (_, svc) in catalog.services.iter() {
+ for tag in svc.tags.iter() {
+ if let Some((k, v)) = parse_d53_tag(tag, &catalog.node) {
+ dns_config.add(k, v);
+ }
+ }
+ }
+}
+
+#[derive(Default)]
+struct NodeWatchState {
+ last_idx: Option<usize>,
+ last_catalog: Option<ConsulNodeCatalog>,
+ retries: u32,
+}
+
+pub fn spawn_dns_config_task(
+ consul: Consul,
+ mut must_exit: watch::Receiver<bool>,
+) -> watch::Receiver<Arc<DnsConfig>> {
+ let (tx, rx) = watch::channel(Arc::new(DnsConfig::new()));
+
+ let consul = Arc::new(consul);
+
+ tokio::spawn(async move {
+ let mut nodes = HashMap::new();
+ let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
+
+ let mut node_site = HashMap::new();
+
+ while !*must_exit.borrow() {
+ let list_nodes = select! {
+ ln = consul.list_nodes() => ln,
+ _ = must_exit.changed() => continue,
+ };
+
+ match list_nodes {
+ Ok(consul_nodes) => {
+ debug!("Watched consul nodes: {:?}", consul_nodes);
+ for consul_node in consul_nodes {
+ let node = &consul_node.node;
+ if !nodes.contains_key(node) {
+ nodes.insert(node.clone(), NodeWatchState::default());
+
+ let node = node.to_string();
+ let consul = consul.clone();
+
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
+ }
+ if let Some(site) = consul_node.meta.get("site") {
+ node_site.insert(node.clone(), site.clone());
+ }
+ }
+ }
+ Err(e) => {
+ error!("Could not get Consul node list: {}", e);
+ }
+ }
+
+ let next_watch = select! {
+ nw = watches.next() => nw,
+ _ = must_exit.changed() => continue,
+ };
+
+ let (node, res): (String, Result<_>) = match next_watch {
+ Some(v) => v,
+ None => {
+ warn!("No nodes currently watched in dns_config.rs");
+ sleep(Duration::from_secs(10)).await;
+ continue;
+ }
+ };
+
+ match res {
+ Ok((catalog, new_idx)) => {
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.last_idx = Some(new_idx);
+ watch_state.last_catalog = Some(catalog);
+ watch_state.retries = 0;
+
+ let idx = watch_state.last_idx;
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, idx).await;
+ (node, res)
+ }));
+ }
+ Err(e) => {
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.retries += 1;
+ watch_state.last_idx = None;
+
+ let will_retry_in =
+ retry_to_time(watch_state.retries, Duration::from_secs(600));
+ error!(
+ "Failed to query consul for node {}. Will retry in {}s. {}",
+ node,
+ will_retry_in.as_secs(),
+ e
+ );
+
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ sleep(will_retry_in).await;
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
+ continue;
+ }
+ }
+
+ let mut dns_config = DnsConfig::new();
+ for (_, watch_state) in nodes.iter() {
+ if let Some(catalog) = &watch_state.last_catalog {
+ parse_consul_catalog(catalog, &mut dns_config);
+ }
+ }
+
+ tx.send(Arc::new(dns_config)).expect("Internal error");
+ }
+ });
+
+ rx
+}
+
+fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
+ // 1.2^x seems to be a good value to exponentially increase time at a good pace
+ // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
+ // minutes
+ Duration::from_secs(cmp::min(
+ max_time.as_secs(),
+ 1.2f64.powf(retries as f64) as u64,
+ ))
+}
+
+// ---- Display impls ----
+
+impl std::fmt::Display for DnsRecordType {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ DnsRecordType::A => write!(f, "A"),
+ DnsRecordType::AAAA => write!(f, "AAAA"),
+ DnsRecordType::CNAME => write!(f, "CNAME"),
+ }
+ }
+}
+
+impl std::fmt::Display for DnsEntryKey {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "{}.{} IN {}",
+ self.subdomain, self.domain, self.record_type
+ )
+ }
+}
+
+impl std::fmt::Display for DnsEntryValue {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "[")?;
+ for (i, tgt) in self.targets.iter().enumerate() {
+ if i > 0 {
+ write!(f, " ")?;
+ }
+ write!(f, "{}", tgt)?;
+ }
+ write!(f, "]")
+ }
+}
diff --git a/src/dns_updater.rs b/src/dns_updater.rs
new file mode 100644
index 0000000..06ba408
--- /dev/null
+++ b/src/dns_updater.rs
@@ -0,0 +1,98 @@
+use std::net::{Ipv4Addr, Ipv6Addr};
+use std::sync::Arc;
+
+use anyhow::{anyhow, bail, Result};
+use log::*;
+use tokio::select;
+use tokio::sync::watch;
+
+use crate::dns_config::*;
+use crate::provider::DnsProvider;
+
+pub async fn dns_updater_task(
+ mut rx_dns_config: watch::Receiver<Arc<DnsConfig>>,
+ provider: Box<dyn DnsProvider>,
+ allowed_domains: Vec<String>,
+ mut must_exit: watch::Receiver<bool>,
+) {
+ let mut config = Arc::new(DnsConfig::new());
+ while !*must_exit.borrow() {
+ select!(
+ c = rx_dns_config.changed() => {
+ if c.is_err() {
+ break;
+ }
+ }
+ _ = must_exit.changed() => continue,
+ );
+ let new_config: Arc<DnsConfig> = rx_dns_config.borrow().clone();
+
+ for (k, v) in new_config.entries.iter() {
+ if config.entries.get(k) != Some(v) {
+ let fulldomain = format!("{}.{}", k.subdomain, k.domain);
+ if !allowed_domains.iter().any(|d| fulldomain.ends_with(d)) {
+ error!(
+ "Got an entry for domain {} which is not in allowed list",
+ k.domain
+ );
+ continue;
+ }
+
+ info!("Updating {} {}", k, v);
+ if let Err(e) = update_dns_entry(k, v, provider.as_ref()).await {
+ error!("Unable to update entry {} {}: {}", k, v, e);
+ }
+ }
+ }
+
+ config = new_config;
+ }
+}
+
+async fn update_dns_entry(
+ key: &DnsEntryKey,
+ value: &DnsEntryValue,
+ provider: &dyn DnsProvider,
+) -> Result<()> {
+ if value.targets.is_empty() {
+ bail!("zero targets (internal error)");
+ }
+
+ match key.record_type {
+ DnsRecordType::A => {
+ let mut targets = vec![];
+ for tgt in value.targets.iter() {
+ targets.push(
+ tgt.parse::<Ipv4Addr>()
+ .map_err(|_| anyhow!("Invalid ipv4 address: {}", tgt))?,
+ );
+ }
+ provider
+ .update_a(&key.domain, &key.subdomain, &targets)
+ .await?;
+ }
+ DnsRecordType::AAAA => {
+ let mut targets = vec![];
+ for tgt in value.targets.iter() {
+ targets.push(
+ tgt.parse::<Ipv6Addr>()
+ .map_err(|_| anyhow!("Invalid ipv6 address: {}", tgt))?,
+ );
+ }
+ provider
+ .update_aaaa(&key.domain, &key.subdomain, &targets)
+ .await?;
+ }
+ DnsRecordType::CNAME => {
+ let mut targets = value.targets.iter().cloned().collect::<Vec<_>>();
+ if targets.len() > 1 {
+ targets.sort();
+ warn!("Several CNAME targets for {}: {:?}. Taking first one in alphabetical order. Consider switching to a single global target instead.", key, targets);
+ }
+ provider
+ .update_cname(&key.domain, &key.subdomain, &targets[0])
+ .await?;
+ }
+ }
+ Ok(())
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..a55f271
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,143 @@
+use std::sync::Arc;
+
+use log::*;
+use structopt::StructOpt;
+use tokio::select;
+use tokio::sync::watch;
+
+mod dns_config;
+mod dns_updater;
+mod provider;
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "d53")]
+pub struct Opt {
+ /// Address of consul server
+ #[structopt(
+ long = "consul-addr",
+ env = "D53_CONSUL_HOST",
+ default_value = "http://127.0.0.1:8500"
+ )]
+ pub consul_addr: String,
+
+ /// CA certificate for Consul server with TLS
+ #[structopt(long = "consul-ca-cert", env = "D53_CONSUL_CA_CERT")]
+ pub consul_ca_cert: Option<String>,
+
+ /// Skip TLS verification for Consul
+ #[structopt(long = "consul-tls-skip-verify", env = "D53_CONSUL_TLS_SKIP_VERIFY")]
+ pub consul_tls_skip_verify: bool,
+
+ /// Client certificate for Consul server with TLS
+ #[structopt(long = "consul-client-cert", env = "D53_CONSUL_CLIENT_CERT")]
+ pub consul_client_cert: Option<String>,
+
+ /// Client key for Consul server with TLS
+ #[structopt(long = "consul-client-key", env = "D53_CONSUL_CLIENT_KEY")]
+ pub consul_client_key: Option<String>,
+
+ /// DNS provider
+ #[structopt(long = "provider", env = "D53_PROVIDER")]
+ pub provider: String,
+
+ /// Allowed domains
+ #[structopt(long = "allowed-domains", env = "D53_ALLOWED_DOMAINS")]
+ pub allowed_domains: String,
+
+ /// API key for Gandi DNS provider
+ #[structopt(long = "gandi-api-key", env = "D53_GANDI_API_KEY")]
+ pub gandi_api_key: Option<String>,
+}
+
+#[tokio::main]
+async fn main() {
+ if std::env::var("RUST_LOG").is_err() {
+ std::env::set_var("RUST_LOG", "tricot=info")
+ }
+ pretty_env_logger::init();
+
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+
+ let opt = Opt::from_args();
+
+ info!("Starting D53");
+
+ let (exit_signal, _) = watch_ctrl_c();
+
+ let consul_config = df_consul::ConsulConfig {
+ addr: opt.consul_addr.clone(),
+ ca_cert: opt.consul_ca_cert.clone(),
+ tls_skip_verify: opt.consul_tls_skip_verify,
+ client_cert: opt.consul_client_cert.clone(),
+ client_key: opt.consul_client_key.clone(),
+ };
+
+ let consul = df_consul::Consul::new(consul_config, "").expect("Cannot build Consul");
+
+ let provider: Box<dyn provider::DnsProvider> = match opt.provider.as_str() {
+ "gandi" => Box::new(
+ provider::gandi::GandiProvider::new(&opt).expect("Cannot initialize Gandi provier"),
+ ),
+ p => panic!("Unsupported DNS provider: {}", p),
+ };
+
+ let allowed_domains = opt
+ .allowed_domains
+ .split(',')
+ .map(ToString::to_string)
+ .collect::<Vec<_>>();
+
+ let rx_dns_config = dns_config::spawn_dns_config_task(consul.clone(), exit_signal.clone());
+
+ let updater_task = tokio::spawn(dns_updater::dns_updater_task(
+ rx_dns_config.clone(),
+ provider,
+ allowed_domains,
+ exit_signal.clone(),
+ ));
+ let dump_task = tokio::spawn(dump_config_on_change(rx_dns_config, exit_signal));
+
+ updater_task.await.expect("Tokio task await failure");
+ dump_task.await.expect("Tokio task await failure");
+}
+
+async fn dump_config_on_change(
+ mut rx_dns_config: watch::Receiver<Arc<dns_config::DnsConfig>>,
+ mut must_exit: watch::Receiver<bool>,
+) {
+ while !*must_exit.borrow() {
+ select!(
+ c = rx_dns_config.changed() => {
+ if c.is_err() {
+ break;
+ }
+ }
+ _ = must_exit.changed() => continue,
+ );
+ println!("---- DNS CONFIGURATION ----");
+ for (k, v) in rx_dns_config.borrow().entries.iter() {
+ println!(" {} {}", k, v);
+ }
+ println!();
+ }
+}
+
+/// Creates a watch that contains `false`, and that changes
+/// to `true` when a Ctrl+C signal is received.
+pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ let send_cancel = Arc::new(send_cancel);
+ let send_cancel_2 = send_cancel.clone();
+ tokio::spawn(async move {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ info!("Received CTRL+C, shutting down.");
+ send_cancel.send(true).unwrap();
+ });
+ (watch_cancel, send_cancel_2)
+}
diff --git a/src/provider/gandi.rs b/src/provider/gandi.rs
new file mode 100644
index 0000000..1f4f51f
--- /dev/null
+++ b/src/provider/gandi.rs
@@ -0,0 +1,102 @@
+use std::net::{Ipv4Addr, Ipv6Addr};
+
+use anyhow::{anyhow, Result};
+use async_trait::async_trait;
+use log::{info, warn};
+use reqwest::header;
+use serde::Serialize;
+
+use crate::provider::DnsProvider;
+use crate::Opt;
+
+pub struct GandiProvider {
+ client: reqwest::Client,
+}
+
+impl GandiProvider {
+ pub fn new(opts: &Opt) -> Result<Self> {
+ let api_key = opts
+ .gandi_api_key
+ .clone()
+ .ok_or_else(|| anyhow!("Must specify D53_GANDI_API_KEY"))?;
+
+ let mut headers = header::HeaderMap::new();
+ let mut auth_value = header::HeaderValue::from_str(&format!("Apikey {}", api_key))?;
+ auth_value.set_sensitive(true);
+ headers.insert(header::AUTHORIZATION, auth_value);
+
+ let client = reqwest::Client::builder()
+ .default_headers(headers)
+ .use_rustls_tls()
+ .build()?;
+
+ Ok(Self { client })
+ }
+
+ async fn put_rrset(&self, url: &str, rrset: &GandiRrset) -> Result<()> {
+ info!("PUT {} with {:?}", url, rrset);
+ let http = self.client.put(url).json(rrset).send().await?;
+
+ if !http.status().is_success() {
+ warn!("PUT {} returned {}", url, http.status());
+ }
+
+ http.error_for_status()?;
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl DnsProvider for GandiProvider {
+ fn provider(&self) -> &'static str {
+ "gandi"
+ }
+
+ async fn update_a(&self, domain: &str, subdomain: &str, targets: &[Ipv4Addr]) -> Result<()> {
+ let url = format!(
+ "https://api.gandi.net/v5/livedns/domains/{}/records/{}/A",
+ domain, subdomain
+ );
+
+ let rrset = GandiRrset {
+ rrset_values: targets.iter().map(ToString::to_string).collect::<Vec<_>>(),
+ rrset_ttl: 300,
+ };
+
+ self.put_rrset(&url, &rrset).await
+ }
+
+ async fn update_aaaa(&self, domain: &str, subdomain: &str, targets: &[Ipv6Addr]) -> Result<()> {
+ let url = format!(
+ "https://api.gandi.net/v5/livedns/domains/{}/records/{}/AAAA",
+ domain, subdomain
+ );
+
+ let rrset = GandiRrset {
+ rrset_values: targets.iter().map(ToString::to_string).collect::<Vec<_>>(),
+ rrset_ttl: 300,
+ };
+
+ self.put_rrset(&url, &rrset).await
+ }
+
+ async fn update_cname(&self, domain: &str, subdomain: &str, target: &str) -> Result<()> {
+ let url = format!(
+ "https://api.gandi.net/v5/livedns/domains/{}/records/{}/CNAME",
+ domain, subdomain
+ );
+
+ let rrset = GandiRrset {
+ rrset_values: vec![target.to_string()],
+ rrset_ttl: 300,
+ };
+
+ self.put_rrset(&url, &rrset).await
+ }
+}
+
+#[derive(Serialize, Debug)]
+struct GandiRrset {
+ rrset_values: Vec<String>,
+ rrset_ttl: u32,
+}
diff --git a/src/provider/mod.rs b/src/provider/mod.rs
new file mode 100644
index 0000000..6527631
--- /dev/null
+++ b/src/provider/mod.rs
@@ -0,0 +1,20 @@
+pub mod gandi;
+
+use std::net::{Ipv4Addr, Ipv6Addr};
+
+use anyhow::Result;
+use async_trait::async_trait;
+
+#[async_trait]
+pub trait DnsProvider: Send + Sync {
+ fn provider(&self) -> &'static str;
+ async fn update_a(&self, domain: &str, subdomain: &str, targets: &[Ipv4Addr]) -> Result<()>;
+ async fn update_aaaa(&self, domain: &str, subdomain: &str, targets: &[Ipv6Addr]) -> Result<()>;
+ async fn update_cname(&self, domain: &str, subdomain: &str, target: &str) -> Result<()>;
+}
+
+impl std::fmt::Debug for dyn DnsProvider {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
+ write!(f, "DnsProvider({})", self.provider())
+ }
+}