From bb77e7c459a624bb2b4ea043145fd6ea75771105 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 7 Dec 2021 17:05:25 +0100 Subject: Locking to avoid flooding Let's encrypt --- src/cert_store.rs | 27 ++++++++++++++++++++- src/consul.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++- src/main.rs | 4 +-- src/proxy_config.rs | 8 ++---- 4 files changed, 99 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/cert_store.rs b/src/cert_store.rs index 1b1a478..a58288c 100644 --- a/src/cert_store.rs +++ b/src/cert_store.rs @@ -13,7 +13,7 @@ use acme_micro::{Directory, DirectoryUrl}; use rustls::sign::CertifiedKey; use crate::cert::{Cert, CertSer}; -use crate::consul::Consul; +use crate::consul::*; use crate::proxy_config::ProxyConfig; pub struct CertStore { @@ -86,6 +86,30 @@ impl CertStore { pub async fn renew_cert(self: &Arc, domain: &str) -> Result> { info!("Renewing certificate for {}", domain); + // ---- Acquire lock ---- + + let lock_path = format!("renew_lock/{}", domain); + let lock_name = format!("tricot/renew:{}@{}", domain, self.consul.local_node.clone()); + let session = self + .consul + .create_session(&ConsulSessionRequest { + name: lock_name.clone(), + node: Some(self.consul.local_node.clone()), + lock_delay: Some("30s".into()), + ttl: Some("1m".into()), + behavior: Some("delete".into()), + }) + .await?; + if !self + .consul + .acquire(&lock_path, lock_name.clone().into(), &session) + .await? + { + bail!("Lock is already taken, not renewing for now."); + } + + // ---- Do let's encrypt stuff ---- + let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?; let contact = vec!["mailto:alex@adnab.me".to_string()]; @@ -149,6 +173,7 @@ impl CertStore { self.consul .kv_put_json(&format!("certs/{}", domain), &certser) .await?; + self.consul.release(&lock_path, "".into(), &session).await?; let cert = Arc::new(Cert::new(certser)?); self.certs diff --git a/src/consul.rs b/src/consul.rs index 5ef96c5..6fc031c 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -26,21 +26,52 @@ pub struct ConsulNodeCatalog { pub services: HashMap, } +// ---- Consul session management ---- + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionRequest { + #[serde(rename = "Name")] + pub name: String, + + #[serde(rename = "Node")] + pub node: Option, + + #[serde(rename = "LockDelay")] + pub lock_delay: Option, + + #[serde(rename = "TTL")] + pub ttl: Option, + + #[serde(rename = "Behavior")] + pub behavior: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionResponse { + #[serde(rename = "ID")] + pub id: String, +} + #[derive(Clone)] pub struct Consul { client: reqwest::Client, + url: String, kv_prefix: String, + idx: Option, + + pub local_node: String, } impl Consul { - pub fn new(url: &str, kv_prefix: &str) -> Self { + pub fn new(url: &str, kv_prefix: &str, local_node: &str) -> Self { return Self { client: reqwest::Client::new(), url: url.to_string(), kv_prefix: kv_prefix.to_string(), idx: None, + local_node: local_node.into(), }; } @@ -64,6 +95,8 @@ impl Consul { return Ok(resp); } + // ---- KV get and put ---- + pub async fn kv_get(&self, key: &str) -> Result> { debug!("kv_get {}", key); @@ -118,4 +151,39 @@ impl Consul { http.error_for_status()?; Ok(()) } + + // ---- Locking ---- + + pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { + debug!("create_session {:?}", req); + + let url = format!("{}/v1/session/create", self.url); + let http = self.client.put(&url).json(req).send().await?; + let resp: ConsulSessionResponse = http.json().await?; + Ok(resp.id) + } + + pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { + debug!("acquire {}", key); + + let url = format!( + "{}/v1/kv/{}{}?acquire={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + let resp: bool = http.json().await?; + Ok(resp) + } + + pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { + debug!("release {}", key); + + let url = format!( + "{}/v1/kv/{}{}?release={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index df0845d..9d710b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,8 @@ async fn main() { pretty_env_logger::init(); info!("Starting Tricot"); - let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/"); - let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone(), "carcajou"); + let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou"); + let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone()); let cert_store = cert_store::CertStore::new(consul.clone()); tokio::spawn( diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 8c2444a..ba58484 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -82,18 +82,14 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { ProxyConfig { entries } } -pub fn spawn_proxy_config_task( - mut consul: Consul, - node: &str, -) -> watch::Receiver> { +pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver> { let (tx, rx) = watch::channel(Arc::new(ProxyConfig { entries: Vec::new(), })); - let node = node.to_string(); - tokio::spawn(async move { let mut retries = 0; + let node = consul.local_node.clone(); loop { let catalog = match consul.watch_node(&node).await { -- cgit v1.2.3