//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::{anyhow, bail, Result};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::watch};
use tracing::*;
use df_consul::*;
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct DiplonatAutodiscoveryResult<A> {
pub timestamp: u64,
pub address: Option<A>,
}
#[derive(Default, Debug, Eq, PartialEq)]
pub struct AutodiscoveredAddresses {
pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>,
pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>,
}
pub fn watch_autodiscovered_ips(
consul: Consul,
mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<AutodiscoveredAddresses>> {
let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default()));
let rx2 = rx.clone();
tokio::spawn(async move {
let mut last_index = None;
let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap();
while !*must_exit.borrow() {
let r = select! {
_ = must_exit.changed() => continue,
r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r,
};
let entries = match r {
Err(e) => {
warn!("Error fetching diplonat autodiscovery consul prefix: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
Ok(r) => {
last_index = Some(r.index());
r.into_inner()
}
};
let mut addresses = AutodiscoveredAddresses::default();
for (k, v) in entries {
if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) {
warn!(
"Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})",
k,
std::str::from_utf8(&v).unwrap_or("<???>"),
e
);
}
}
if addresses.strip_timestamps() != rx2.borrow().strip_timestamps() {
addresses.dump();
}
if tx.send(Arc::new(addresses)).is_err() {
info!("Autodiscovered addresses watcher terminating");
return;
}
}
});
rx
}
fn parse_autodiscovered_address(
re: &Regex,
addresses: &mut AutodiscoveredAddresses,
k: &str,
v: &[u8],
) -> Result<()> {
let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?;
if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) {
match family.as_str() {
"ipv4" => {
let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?;
addresses.ipv4.insert(node.as_str().to_string(), r);
}
"ipv6" => {
let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?;
addresses.ipv6.insert(node.as_str().to_string(), r);
}
_ => bail!("invalid address family {}", family.as_str()),
}
} else {
bail!("invalid regex captures {:?}", caps);
}
Ok(())
}
impl AutodiscoveredAddresses {
fn strip_timestamps(
&self,
) -> (
HashMap<&str, Option<Ipv4Addr>>,
HashMap<&str, Option<Ipv6Addr>>,
) {
(
self.ipv4
.iter()
.map(|(k, v)| (k.as_str(), v.address))
.collect(),
self.ipv6
.iter()
.map(|(k, v)| (k.as_str(), v.address))
.collect(),
)
}
fn dump(&self) {
println!("---- Autodiscovered addresses (fetched from DiploNAT): ----");
for (k, v) in self.ipv4.iter() {
println!(" IPv4 {} {} {:?}", k, v.timestamp, v.address);
}
for (k, v) in self.ipv6.iter() {
println!(" IPv6 {} {} {:?}", k, v.timestamp, v.address);
}
println!("");
}
}
pub fn timestamp() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("clock error")
.as_secs()
}