aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/consul.rs33
-rw-r--r--src/main.rs23
-rw-r--r--src/proxy_config.rs99
3 files changed, 129 insertions, 26 deletions
diff --git a/src/consul.rs b/src/consul.rs
index 6fc031c..240c177 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -7,6 +7,12 @@ use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
// ---- Watch and retrieve Consul catalog ----
+//
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulNodeListNode {
+ #[serde(rename = "Node")]
+ pub node: String,
+}
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulServiceEntry {
@@ -59,8 +65,6 @@ pub struct Consul {
url: String,
kv_prefix: String,
- idx: Option<u64>,
-
pub local_node: String,
}
@@ -70,29 +74,36 @@ impl Consul {
client: reqwest::Client::new(),
url: url.to_string(),
kv_prefix: kv_prefix.to_string(),
- idx: None,
local_node: local_node.into(),
};
}
- pub fn watch_node_reset(&mut self) -> () {
- self.idx = None;
+ pub async fn list_nodes(&self) -> Result<Vec<String>> {
+ debug!("list_nodes");
+
+ let url = format!("{}/v1/catalog/nodes", self.url);
+
+ let http = self.client.get(&url).send().await?;
+ let resp: Vec<ConsulNodeListNode> = http.json().await?;
+ Ok(resp.into_iter().map(|n| n.node).collect::<Vec<_>>())
}
- pub async fn watch_node(&mut self, host: &str) -> Result<ConsulNodeCatalog> {
- let url = match self.idx {
+ pub async fn watch_node(&self, host: &str, idx: Option<usize>) -> Result<(ConsulNodeCatalog, usize)> {
+ debug!("watch_node {} {:?}", host, idx);
+
+ let url = match idx {
Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
None => format!("{}/v1/catalog/node/{}", self.url, host),
};
let http = self.client.get(&url).send().await?;
- self.idx = match http.headers().get("X-Consul-Index") {
- Some(v) => Some(v.to_str()?.parse::<u64>()?),
- None => return Err(anyhow!("X-Consul-Index header not found")),
+ let new_idx = match http.headers().get("X-Consul-Index") {
+ Some(v) => v.to_str()?.parse::<usize>()?,
+ None => bail!("X-Consul-Index header not found"),
};
let resp: ConsulNodeCatalog = http.json().await?;
- return Ok(resp);
+ return Ok((resp, new_idx));
}
// ---- KV get and put ----
diff --git a/src/main.rs b/src/main.rs
index 9d710b2..4a0c0ec 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,8 @@
#[macro_use]
extern crate anyhow;
+use structopt::StructOpt;
+
mod cert;
mod cert_store;
mod consul;
@@ -11,15 +13,34 @@ mod reverse_proxy;
use log::*;
+#[derive(StructOpt, Debug)]
+#[structopt(name = "tricot")]
+struct Opt {
+ /// Address of consul server
+ #[structopt(long = "consul-addr", env = "TRICOT_CONSUL_HOST", default_value = "http://127.0.0.1:8500/")]
+ pub consul_addr: String,
+
+ /// Prefix of Tricot's entries in Consul KV space
+ #[structopt(long = "consul-kv-prefix", env = "TRICOT_CONSUL_KV_PREFIX", default_value = "tricot/")]
+ pub consul_kv_prefix: String,
+
+ /// Node name
+ #[structopt(long = "node-name", env = "TRICOT_NODE_NAME", default_value = "<none>")]
+ pub node_name: String,
+}
+
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "tricot=debug")
}
pretty_env_logger::init();
+
+ let opt = Opt::from_args();
+
info!("Starting Tricot");
- let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou");
+ let consul = consul::Consul::new(&opt.consul_addr, &opt.consul_kv_prefix, &opt.node_name);
let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
let cert_store = cert_store::CertStore::new(consul.clone());
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index ba58484..31a2659 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -1,7 +1,13 @@
use std::net::SocketAddr;
use std::sync::{atomic, Arc};
+use std::collections::HashMap;
use std::{cmp, time::Duration};
+use anyhow::Result;
+
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::future::{BoxFuture};
+
use log::*;
use tokio::{sync::watch, time::sleep};
@@ -63,7 +69,7 @@ fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option<ProxyEntry> {
})
}
-fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
+fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> {
let mut entries = vec![];
for (_, svc) in catalog.services.iter() {
@@ -79,37 +85,102 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
}
}
- ProxyConfig { entries }
+ entries
}
-pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
+#[derive(Default)]
+struct NodeWatchState {
+ last_idx: Option<usize>,
+ last_catalog: Option<ConsulNodeCatalog>,
+ retries: u32,
+}
+
+pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
entries: Vec::new(),
}));
+
+ let consul = Arc::new(consul);
tokio::spawn(async move {
- let mut retries = 0;
- let node = consul.local_node.clone();
+ let mut nodes = HashMap::new();
+ let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
loop {
- let catalog = match consul.watch_node(&node).await {
- Ok(c) => c,
+ match consul.list_nodes().await {
+ Ok(consul_nodes) => {
+ info!("Watched consul nodes: {:?}", consul_nodes);
+ for node in consul_nodes {
+ if !nodes.contains_key(&node) {
+ nodes.insert(node.clone(), NodeWatchState::default());
+
+ let node = node.to_string();
+ let consul = consul.clone();
+
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
+ }
+ }
+ }
+ Err(e) => {
+ warn!("Could not get Consul node list: {}", e);
+ }
+ }
+
+ let (node, res): (String, Result<_>) = match watches.next().await {
+ Some(v) => v,
+ None => {
+ warn!("No nodes currently watched in proxy_config.rs");
+ sleep(Duration::from_secs(10)).await;
+ continue;
+ }
+ };
+
+ match res {
+ Ok((catalog, new_idx)) => {
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.last_idx = Some(new_idx);
+ watch_state.last_catalog = Some(catalog);
+ watch_state.retries = 0;
+
+ let idx = watch_state.last_idx;
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ let res = consul.watch_node(&node, idx).await;
+ (node, res)
+ }));
+ }
Err(e) => {
- consul.watch_node_reset();
- retries = cmp::min(std::u32::MAX - 1, retries) + 1;
- let will_retry_in = retry_to_time(retries, Duration::from_secs(600));
+ let mut watch_state = nodes.get_mut(&node).unwrap();
+ watch_state.retries += 1;
+ watch_state.last_idx = None;
+
+ let will_retry_in = retry_to_time(watch_state.retries, Duration::from_secs(600));
error!(
"Failed to query consul. Will retry in {}s. {}",
will_retry_in.as_secs(),
e
);
- sleep(will_retry_in).await;
+
+ let consul = consul.clone();
+ watches.push(Box::pin(async move {
+ sleep(will_retry_in).await;
+ let res = consul.watch_node(&node, None).await;
+ (node, res)
+ }));
continue;
}
- };
- retries = 0;
+ }
- let config = parse_consul_catalog(&catalog);
+ let mut entries = vec![];
+ for (_, watch_state) in nodes.iter() {
+ if let Some(catalog) = &watch_state.last_catalog {
+ entries.extend(parse_consul_catalog(catalog));
+ }
+ }
+ let config = ProxyConfig { entries };
debug!("Extracted configuration: {:#?}", config);
tx.send(Arc::new(config)).expect("Internal error");