aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-07 14:28:29 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-07 14:28:29 +0100
commitfad172e54af7a4d043410fe6cdc1f8b6112e9c0f (patch)
treee2b4a63a43d19161fb36b625e6e24af37e1f709b
parent752593e2747f64a8f14de3484ab085ed5f65cd40 (diff)
downloadtricot-fad172e54af7a4d043410fe6cdc1f8b6112e9c0f.tar.gz
tricot-fad172e54af7a4d043410fe6cdc1f8b6112e9c0f.zip
Externalise Consul module to df-consul crate
-rw-r--r--Cargo.lock14
-rw-r--r--Cargo.toml1
-rw-r--r--src/cert_store.rs5
-rw-r--r--src/consul.rs261
-rw-r--r--src/main.rs12
-rw-r--r--src/proxy_config.rs12
6 files changed, 33 insertions, 272 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e2acbd6..1752101 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -396,6 +396,19 @@ dependencies = [
]
[[package]]
+name = "df-consul"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ec5111d5daddfbab491780564dc8270c29c90553fab98d677489600a8623c76"
+dependencies = [
+ "anyhow",
+ "bytes",
+ "log",
+ "reqwest",
+ "serde",
+]
+
+[[package]]
name = "dhat"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1945,6 +1958,7 @@ dependencies = [
"async-compression",
"bytes",
"chrono",
+ "df-consul",
"dhat",
"envy",
"futures",
diff --git a/Cargo.toml b/Cargo.toml
index 416038f..0678819 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,7 @@ uuid = { version = "1.2", features = ["v4"] }
opentelemetry = "0.17"
opentelemetry-prometheus = "0.10"
prometheus = "0.13"
+df-consul = "0.1.0"
dhat = { version = "0.3", optional = true }
diff --git a/src/cert_store.rs b/src/cert_store.rs
index 97fbf32..2d3fb90 100644
--- a/src/cert_store.rs
+++ b/src/cert_store.rs
@@ -20,6 +20,7 @@ use crate::proxy_config::*;
pub struct CertStore {
consul: Consul,
+ node_name: String,
letsencrypt_email: String,
certs: RwLock<HashMap<String, Arc<Cert>>>,
self_signed_certs: RwLock<HashMap<String, Arc<Cert>>>,
@@ -30,6 +31,7 @@ pub struct CertStore {
impl CertStore {
pub fn new(
consul: Consul,
+ node_name: String,
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
letsencrypt_email: String,
exit_on_err: impl Fn(anyhow::Error) + Send + 'static,
@@ -38,6 +40,7 @@ impl CertStore {
let cert_store = Arc::new(Self {
consul,
+ node_name,
certs: RwLock::new(HashMap::new()),
self_signed_certs: RwLock::new(HashMap::new()),
rx_proxy_config,
@@ -190,7 +193,7 @@ impl CertStore {
// that delay expires
let lock_path = format!("renew_lock/{}", domain);
- let lock_name = format!("tricot/renew:{}@{}", domain, self.consul.local_node.clone());
+ let lock_name = format!("tricot/renew:{}@{}", domain, self.node_name);
let session = self
.consul
.create_session(&ConsulSessionRequest {
diff --git a/src/consul.rs b/src/consul.rs
deleted file mode 100644
index 13b99d8..0000000
--- a/src/consul.rs
+++ /dev/null
@@ -1,261 +0,0 @@
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::Read;
-
-use anyhow::{bail, Result};
-use bytes::Bytes;
-use log::*;
-use reqwest::StatusCode;
-use serde::{Deserialize, Serialize};
-
-pub struct ConsulConfig {
- pub addr: String,
- pub ca_cert: Option<String>,
- pub tls_skip_verify: bool,
- pub client_cert: Option<String>,
- pub client_key: Option<String>,
-}
-
-// ---- Watch and retrieve Consul catalog ----
-//
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNode {
- #[serde(rename = "Node")]
- pub node: String,
- #[serde(rename = "Address")]
- pub address: String,
- #[serde(rename = "Meta")]
- pub meta: HashMap<String, String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulServiceEntry {
- #[serde(rename = "Service")]
- pub service: String,
-
- #[serde(rename = "Address")]
- pub address: String,
-
- #[serde(rename = "Port")]
- pub port: u16,
-
- #[serde(rename = "Tags")]
- pub tags: Vec<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulNodeCatalog {
- #[serde(rename = "Node")]
- pub node: ConsulNode,
- #[serde(rename = "Services")]
- pub services: HashMap<String, ConsulServiceEntry>,
-}
-
-// ---- Consul session management ----
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionRequest {
- #[serde(rename = "Name")]
- pub name: String,
-
- #[serde(rename = "Node")]
- pub node: Option<String>,
-
- #[serde(rename = "LockDelay")]
- pub lock_delay: Option<String>,
-
- #[serde(rename = "TTL")]
- pub ttl: Option<String>,
-
- #[serde(rename = "Behavior")]
- pub behavior: Option<String>,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct ConsulSessionResponse {
- #[serde(rename = "ID")]
- pub id: String,
-}
-
-#[derive(Clone)]
-pub struct Consul {
- client: reqwest::Client,
-
- url: String,
- kv_prefix: String,
-
- pub local_node: String,
-}
-
-impl Consul {
- pub fn new(config: ConsulConfig, kv_prefix: &str, local_node: &str) -> Result<Self> {
- let client = match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
-
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?
- }
- }
- (None, None) => reqwest::Client::new(),
- _ => bail!("Incomplete Consul TLS configuration parameters"),
- };
-
- Ok(Self {
- client,
- url: config.addr.trim_end_matches('/').to_string(),
- kv_prefix: kv_prefix.to_string(),
- local_node: local_node.into(),
- })
- }
-
- pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
- debug!("list_nodes");
-
- let url = format!("{}/v1/catalog/nodes", self.url);
-
- let http = self.client.get(&url).send().await?;
- let resp: Vec<ConsulNode> = http.json().await?;
- Ok(resp)
- }
-
- pub async fn watch_node(
- &self,
- host: &str,
- idx: Option<usize>,
- ) -> Result<(ConsulNodeCatalog, usize)> {
- debug!("watch_node {} {:?}", host, idx);
-
- let url = match idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
-
- let http = self.client.get(&url).send().await?;
- let new_idx = match http.headers().get("X-Consul-Index") {
- Some(v) => v.to_str()?.parse::<usize>()?,
- None => bail!("X-Consul-Index header not found"),
- };
-
- let resp: ConsulNodeCatalog = http.json().await?;
- Ok((resp, new_idx))
- }
-
- // ---- KV get and put ----
-
- pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
- debug!("kv_get {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.bytes().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
- debug!("kv_get_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
- let http = self.client.get(&url).send().await?;
- match http.status() {
- StatusCode::OK => Ok(Some(http.json().await?)),
- StatusCode::NOT_FOUND => Ok(None),
- _ => Err(anyhow!(
- "Consul request failed: {:?}",
- http.error_for_status()
- )),
- }
- }
-
- pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
- debug!("kv_put {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
- debug!("kv_put_json {}", key);
-
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.put(&url).json(value).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- pub async fn kv_delete(&self, key: &str) -> Result<()> {
- let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
- let http = self.client.delete(&url).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-
- // ---- Locking ----
-
- pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
- debug!("create_session {:?}", req);
-
- let url = format!("{}/v1/session/create", self.url);
- let http = self.client.put(&url).json(req).send().await?;
- let resp: ConsulSessionResponse = http.json().await?;
- Ok(resp.id)
- }
-
- pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
- debug!("acquire {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?acquire={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- let resp: bool = http.json().await?;
- Ok(resp)
- }
-
- pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
- debug!("release {}", key);
-
- let url = format!(
- "{}/v1/kv/{}{}?release={}",
- self.url, self.kv_prefix, key, session
- );
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
-}
diff --git a/src/main.rs b/src/main.rs
index 79c4a9b..c97bddf 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -13,7 +13,6 @@ use tokio::sync::watch;
mod cert;
mod cert_store;
-mod consul;
mod http;
mod https;
mod metrics;
@@ -21,6 +20,7 @@ mod proxy_config;
mod reverse_proxy;
mod tls_util;
+pub use df_consul as consul;
use proxy_config::ProxyConfig;
#[cfg(feature = "dhat-heap")]
@@ -139,13 +139,17 @@ async fn main() {
client_key: opt.consul_client_key.clone(),
};
- let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name)
+ let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix)
.expect("Error creating Consul client");
- let rx_proxy_config =
- proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone());
+ let rx_proxy_config = proxy_config::spawn_proxy_config_task(
+ consul.clone(),
+ opt.node_name.clone(),
+ exit_signal.clone(),
+ );
let cert_store = cert_store::CertStore::new(
consul.clone(),
+ opt.node_name.clone(),
rx_proxy_config.clone(),
opt.letsencrypt_email.clone(),
exit_on_err.clone(),
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index ac37229..af1f576 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -252,6 +252,7 @@ struct NodeWatchState {
pub fn spawn_proxy_config_task(
consul: Consul,
+ local_node: String,
mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
@@ -354,12 +355,11 @@ pub fn spawn_proxy_config_task(
let mut entries = vec![];
for (node_name, watch_state) in nodes.iter() {
if let Some(catalog) = &watch_state.last_catalog {
- let same_node = *node_name == consul.local_node;
- let same_site =
- match (node_site.get(node_name), node_site.get(&consul.local_node)) {
- (Some(s1), Some(s2)) => s1 == s2,
- _ => false,
- };
+ let same_node = *node_name == local_node;
+ let same_site = match (node_site.get(node_name), node_site.get(&local_node)) {
+ (Some(s1), Some(s2)) => s1 == s2,
+ _ => false,
+ };
entries.extend(parse_consul_catalog(catalog, same_node, same_site));
}