diff options
author | Alex Auvolat <alex@adnab.me> | 2021-12-06 23:08:22 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-12-06 23:08:22 +0100 |
commit | 2895a8ae2e99f11290800c14f038feda90e1787a (patch) | |
tree | 927ce24939b08613f74b9354456308f69286cfb7 /src | |
download | tricot-2895a8ae2e99f11290800c14f038feda90e1787a.tar.gz tricot-2895a8ae2e99f11290800c14f038feda90e1787a.zip |
First commit to Tricot, a replacement for Traefik v1
Diffstat (limited to 'src')
-rw-r--r-- | src/consul.rs | 62 | ||||
-rw-r--r-- | src/main.rs | 20 | ||||
-rw-r--r-- | src/proxy_config.rs | 113 |
3 files changed, 195 insertions, 0 deletions
diff --git a/src/consul.rs b/src/consul.rs new file mode 100644 index 0000000..81074f4 --- /dev/null +++ b/src/consul.rs @@ -0,0 +1,62 @@ +use std::collections::HashMap; + +use anyhow::Result; +use log::*; +use serde::{Deserialize, Serialize}; + +// ---- Watch and retrieve Consul catalog ---- + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulServiceEntry { + #[serde(rename = "Address")] + pub address: String, + + #[serde(rename = "Port")] + pub port: u16, + + #[serde(rename = "Tags")] + pub tags: Vec<String>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulNodeCatalog { + #[serde(rename = "Services")] + pub services: HashMap<String, ConsulServiceEntry>, +} + +#[derive(Clone)] +pub struct Consul { + client: reqwest::Client, + url: String, + idx: Option<u64>, +} + +impl Consul { + pub fn new(url: &str) -> Self { + return Self { + client: reqwest::Client::new(), + url: url.to_string(), + idx: None, + }; + } + + pub fn watch_node_reset(&mut self) -> () { + self.idx = None; + } + + pub async fn watch_node(&mut self, host: &str) -> Result<ConsulNodeCatalog> { + let url = match self.idx { + Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), + None => format!("{}/v1/catalog/node/{}", self.url, host), + }; + + let http = self.client.get(&url).send().await?; + self.idx = match http.headers().get("X-Consul-Index") { + Some(v) => Some(v.to_str()?.parse::<u64>()?), + None => return Err(anyhow!("X-Consul-Index header not found")), + }; + + let resp: ConsulNodeCatalog = http.json().await?; + return Ok(resp); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3bd8928 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,20 @@ +#[macro_use] +extern crate anyhow; + +mod consul; +mod proxy_config; + +use log::*; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + info!("Starting Tricot"); + + let consul = consul::Consul::new("http://10.42.0.21:8500"); + let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone(), "carcajou"); + + while rx_proxy_config.changed().await.is_ok() { + info!("Proxy config: {:#?}", *rx_proxy_config.borrow()); + } +} diff --git a/src/proxy_config.rs b/src/proxy_config.rs new file mode 100644 index 0000000..e118a33 --- /dev/null +++ b/src/proxy_config.rs @@ -0,0 +1,113 @@ +use std::net::SocketAddr; +use std::{cmp, time::Duration}; + +use log::*; +use tokio::{sync::watch, time::sleep}; + +use crate::consul::*; + +// ---- Extract proxy config from Consul catalog ---- + +#[derive(Debug)] +pub struct ProxyEntry { + pub target_addr: SocketAddr, + pub host: String, + pub path_prefix: Option<String>, + pub priority: u32, +} + +#[derive(Debug)] +pub struct ProxyConfig { + pub entries: Vec<ProxyEntry>, +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 + // minutes + return Duration::from_secs(cmp::min( + max_time.as_secs(), + 1.2f64.powf(retries as f64) as u64, + )); +} + +fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option<ProxyEntry> { + let splits = tag.split(' ').collect::<Vec<_>>(); + if (splits.len() != 2 && splits.len() != 3) || splits[0] != "tricot" { + return None; + } + + let (host, path_prefix) = match splits[1].split_once('/') { + Some((h, p)) => (h, Some(p.to_string())), + None => (splits[1], None), + }; + + let priority = match splits.len() { + 3 => splits[2].parse().ok()?, + _ => 100, + }; + + Some(ProxyEntry { + target_addr, + host: host.to_string(), + path_prefix, + priority, + }) +} + +fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { + let mut entries = vec![]; + + for (_, svc) in catalog.services.iter() { + let ip_addr = match svc.address.parse() { + Ok(ip) => ip, + _ => continue, + }; + let addr = SocketAddr::new(ip_addr, svc.port); + for tag in svc.tags.iter() { + if let Some(ent) = parse_tricot_tag(addr, tag) { + entries.push(ent); + } + } + } + + ProxyConfig { entries } +} + +pub fn spawn_proxy_config_task(mut consul: Consul, node: &str) -> watch::Receiver<ProxyConfig> { + let (tx, rx) = watch::channel(ProxyConfig { + entries: Vec::new(), + }); + + let node = node.to_string(); + + tokio::spawn(async move { + let mut retries = 0; + + loop { + let catalog = match consul.watch_node(&node).await { + Ok(c) => c, + Err(e) => { + consul.watch_node_reset(); + retries = cmp::min(std::u32::MAX - 1, retries) + 1; + let will_retry_in = retry_to_time(retries, Duration::from_secs(600)); + error!( + "Failed to query consul. Will retry in {}s. {}", + will_retry_in.as_secs(), + e + ); + sleep(will_retry_in).await; + continue; + } + }; + retries = 0; + + let config = parse_consul_catalog(&catalog); + debug!("Extracted configuration: {:#?}", config); + + tx.send(config).expect("Internal error"); + } + }); + + rx +} |