diff options
Diffstat (limited to 'src/autodiscovery.rs')
-rw-r--r-- | src/autodiscovery.rs | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/src/autodiscovery.rs b/src/autodiscovery.rs new file mode 100644 index 0000000..9fcc094 --- /dev/null +++ b/src/autodiscovery.rs @@ -0,0 +1,110 @@ +//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul + +use std::collections::HashMap; +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use anyhow::{anyhow, bail, Result}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio::{select, sync::watch}; +use tracing::*; + +use df_consul::*; + +#[derive(Serialize, Deserialize, Debug)] +pub struct DiplonatAutodiscoveryResult<A> { + pub timestamp: u64, + pub address: Option<A>, +} + +#[derive(Default, Debug)] +pub struct AutodiscoveredAddresses { + pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>, + pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>, +} + +pub fn watch_autodiscovered_ips( + consul: Consul, + mut must_exit: watch::Receiver<bool>, +) -> watch::Receiver<Arc<AutodiscoveredAddresses>> { + let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default())); + + tokio::spawn(async move { + let mut last_index = None; + let re = Regex::new(r".*autodiscovery/(\w+)/(\w+)$").unwrap(); + + while !*must_exit.borrow() { + let r = select! { + _ = must_exit.changed() => continue, + r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r, + }; + + let entries = match r { + Err(e) => { + warn!("Error fetching diplonat autodiscovery consul prefix: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } + Ok(r) => { + last_index = Some(r.index()); + r.into_inner() + } + }; + + let mut addresses = AutodiscoveredAddresses::default(); + + for (k, v) in entries { + if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) { + warn!( + "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})", + k, + std::str::from_utf8(&v).unwrap_or("<???>"), + e + ); + } + } + + if tx.send(Arc::new(addresses)).is_err() { + info!("Autodiscovered addresses watcher terminating"); + } + } + }); + + rx +} + +fn parse_autodiscovered_address( + re: &Regex, + addresses: &mut AutodiscoveredAddresses, + k: &str, + v: &[u8], +) -> Result<()> { + let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?; + + if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) { + match family.as_str() { + "ipv4" => { + let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?; + addresses.ipv4.insert(node.as_str().to_string(), r); + } + "ipv6" => { + let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?; + addresses.ipv6.insert(node.as_str().to_string(), r); + } + _ => bail!("invalid address family {}", family.as_str()), + } + } else { + bail!("invalid regex captures {:?}", caps); + } + + Ok(()) +} + +pub fn timestamp() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("clock error") + .as_secs() +} |