aboutsummaryrefslogtreecommitdiff
path: root/src/autodiscovery.rs
blob: 7b61da6dd9debd0dc2b1be2b258cbfbc08808f42 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul

use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Result};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::watch};
use tracing::*;

use df_consul::*;

#[derive(Serialize, Deserialize, Debug)]
pub struct DiplonatAutodiscoveryResult<A> {
    pub timestamp: u64,
    pub address: Option<A>,
}

#[derive(Default, Debug)]
pub struct AutodiscoveredAddresses {
    pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>,
    pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>,
}

pub fn watch_autodiscovered_ips(
    consul: Consul,
    mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<AutodiscoveredAddresses>> {
    let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default()));

    tokio::spawn(async move {
        let mut last_index = None;
        let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap();

        while !*must_exit.borrow() {
            let r = select! {
                _ = must_exit.changed() => continue,
                r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r,
            };

            let entries = match r {
                Err(e) => {
                    warn!("Error fetching diplonat autodiscovery consul prefix: {}", e);
                    tokio::time::sleep(Duration::from_secs(30)).await;
                    continue;
                }
                Ok(r) => {
                    last_index = Some(r.index());
                    r.into_inner()
                }
            };

            let mut addresses = AutodiscoveredAddresses::default();

            for (k, v) in entries {
                if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) {
                    warn!(
                        "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})",
                        k,
                        std::str::from_utf8(&v).unwrap_or("<???>"),
                        e
                    );
                }
            }

            debug!("Autodiscovered addresses fetched from Consul: {:?}", addresses);
            if tx.send(Arc::new(addresses)).is_err() {
                info!("Autodiscovered addresses watcher terminating");
            }
        }
    });

    rx
}

fn parse_autodiscovered_address(
    re: &Regex,
    addresses: &mut AutodiscoveredAddresses,
    k: &str,
    v: &[u8],
) -> Result<()> {
    let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?;

    if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) {
        match family.as_str() {
            "ipv4" => {
                let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?;
                addresses.ipv4.insert(node.as_str().to_string(), r);
            }
            "ipv6" => {
                let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?;
                addresses.ipv6.insert(node.as_str().to_string(), r);
            }
            _ => bail!("invalid address family {}", family.as_str()),
        }
    } else {
        bail!("invalid regex captures {:?}", caps);
    }

    Ok(())
}

pub fn timestamp() -> u64 {
    SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .expect("clock error")
        .as_secs()
}