aboutsummaryrefslogtreecommitdiff
path: root/src/autodiscovery.rs
blob: aa33d016aeba75d9636fc7d47c00473162d4dc40 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul

use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Result};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::watch};
use tracing::*;

use df_consul::*;

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct DiplonatAutodiscoveryResult<A> {
    pub timestamp: u64,
    pub address: Option<A>,
}

#[derive(Default, Debug, Eq, PartialEq)]
pub struct AutodiscoveredAddresses {
    pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>,
    pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>,
}

pub fn watch_autodiscovered_ips(
    consul: Consul,
    mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<AutodiscoveredAddresses>> {
    let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default()));
    let rx2 = rx.clone();

    tokio::spawn(async move {
        let mut last_index = None;
        let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap();

        while !*must_exit.borrow() {
            let r = select! {
                _ = must_exit.changed() => continue,
                r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r,
            };

            let entries = match r {
                Err(e) => {
                    warn!("Error fetching diplonat autodiscovery consul prefix: {}", e);
                    tokio::time::sleep(Duration::from_secs(30)).await;
                    continue;
                }
                Ok(r) => {
                    last_index = Some(r.index());
                    r.into_inner()
                }
            };

            let mut addresses = AutodiscoveredAddresses::default();

            for (k, v) in entries {
                if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) {
                    warn!(
                        "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})",
                        k,
                        std::str::from_utf8(&v).unwrap_or("<???>"),
                        e
                    );
                }
            }

            if addresses.strip_timestamps() != rx2.borrow().strip_timestamps() {
                addresses.dump();
            }

            if tx.send(Arc::new(addresses)).is_err() {
                info!("Autodiscovered addresses watcher terminating");
                return;
            }
        }
    });

    rx
}

fn parse_autodiscovered_address(
    re: &Regex,
    addresses: &mut AutodiscoveredAddresses,
    k: &str,
    v: &[u8],
) -> Result<()> {
    let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?;

    if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) {
        match family.as_str() {
            "ipv4" => {
                let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?;
                addresses.ipv4.insert(node.as_str().to_string(), r);
            }
            "ipv6" => {
                let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?;
                addresses.ipv6.insert(node.as_str().to_string(), r);
            }
            _ => bail!("invalid address family {}", family.as_str()),
        }
    } else {
        bail!("invalid regex captures {:?}", caps);
    }

    Ok(())
}

impl AutodiscoveredAddresses {
    fn strip_timestamps(
        &self,
    ) -> (
        HashMap<&str, Option<Ipv4Addr>>,
        HashMap<&str, Option<Ipv6Addr>>,
    ) {
        (
            self.ipv4
                .iter()
                .map(|(k, v)| (k.as_str(), v.address))
                .collect(),
            self.ipv6
                .iter()
                .map(|(k, v)| (k.as_str(), v.address))
                .collect(),
        )
    }

    fn dump(&self) {
        println!("---- Autodiscovered addresses (fetched from DiploNAT): ----");
        for (k, v) in self.ipv4.iter() {
            println!("   IPv4 {} {} {:?}", k, v.timestamp, v.address);
        }
        for (k, v) in self.ipv6.iter() {
            println!("   IPv6 {} {} {:?}", k, v.timestamp, v.address);
        }
        println!("");
    }
}

pub fn timestamp() -> u64 {
    SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .expect("clock error")
        .as_secs()
}