aboutsummaryrefslogblamecommitdiff
path: root/src/autodiscovery.rs
blob: aa33d016aeba75d9636fc7d47c00473162d4dc40 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15














                                                                   
                                                       




                                           
                                        









                                                                                
                         


                                  
                                                                            































                                                                                           



                                                                                

                                                                      
                       

































                                                                                          
                              















                                                      













                                                                                





                                               
//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul

use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Result};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::watch};
use tracing::*;

use df_consul::*;

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct DiplonatAutodiscoveryResult<A> {
    pub timestamp: u64,
    pub address: Option<A>,
}

#[derive(Default, Debug, Eq, PartialEq)]
pub struct AutodiscoveredAddresses {
    pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>,
    pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>,
}

pub fn watch_autodiscovered_ips(
    consul: Consul,
    mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<AutodiscoveredAddresses>> {
    let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default()));
    let rx2 = rx.clone();

    tokio::spawn(async move {
        let mut last_index = None;
        let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap();

        while !*must_exit.borrow() {
            let r = select! {
                _ = must_exit.changed() => continue,
                r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r,
            };

            let entries = match r {
                Err(e) => {
                    warn!("Error fetching diplonat autodiscovery consul prefix: {}", e);
                    tokio::time::sleep(Duration::from_secs(30)).await;
                    continue;
                }
                Ok(r) => {
                    last_index = Some(r.index());
                    r.into_inner()
                }
            };

            let mut addresses = AutodiscoveredAddresses::default();

            for (k, v) in entries {
                if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) {
                    warn!(
                        "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})",
                        k,
                        std::str::from_utf8(&v).unwrap_or("<???>"),
                        e
                    );
                }
            }

            if addresses.strip_timestamps() != rx2.borrow().strip_timestamps() {
                addresses.dump();
            }

            if tx.send(Arc::new(addresses)).is_err() {
                info!("Autodiscovered addresses watcher terminating");
                return;
            }
        }
    });

    rx
}

fn parse_autodiscovered_address(
    re: &Regex,
    addresses: &mut AutodiscoveredAddresses,
    k: &str,
    v: &[u8],
) -> Result<()> {
    let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?;

    if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) {
        match family.as_str() {
            "ipv4" => {
                let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?;
                addresses.ipv4.insert(node.as_str().to_string(), r);
            }
            "ipv6" => {
                let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?;
                addresses.ipv6.insert(node.as_str().to_string(), r);
            }
            _ => bail!("invalid address family {}", family.as_str()),
        }
    } else {
        bail!("invalid regex captures {:?}", caps);
    }

    Ok(())
}

impl AutodiscoveredAddresses {
    fn strip_timestamps(
        &self,
    ) -> (
        HashMap<&str, Option<Ipv4Addr>>,
        HashMap<&str, Option<Ipv6Addr>>,
    ) {
        (
            self.ipv4
                .iter()
                .map(|(k, v)| (k.as_str(), v.address))
                .collect(),
            self.ipv6
                .iter()
                .map(|(k, v)| (k.as_str(), v.address))
                .collect(),
        )
    }

    fn dump(&self) {
        println!("---- Autodiscovered addresses (fetched from DiploNAT): ----");
        for (k, v) in self.ipv4.iter() {
            println!("   IPv4 {} {} {:?}", k, v.timestamp, v.address);
        }
        for (k, v) in self.ipv6.iter() {
            println!("   IPv6 {} {} {:?}", k, v.timestamp, v.address);
        }
        println!("");
    }
}

pub fn timestamp() -> u64 {
    SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .expect("clock error")
        .as_secs()
}