aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml1
-rw-r--r--src/cert_store.rs27
-rw-r--r--src/consul.rs70
-rw-r--r--src/main.rs4
-rw-r--r--src/proxy_config.rs8
6 files changed, 99 insertions, 18 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 5c7e156..ef90926 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1799,7 +1799,6 @@ dependencies = [
"tokio 1.14.0",
"tokio-rustls",
"unicase",
- "uuid",
]
[[package]]
@@ -1876,12 +1875,6 @@ dependencies = [
]
[[package]]
-name = "uuid"
-version = "0.8.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
-
-[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 9c7ae8a..d5f7aa9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,7 +19,6 @@ serde_json = "1.0.53"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
bytes = "1"
acme-micro = "0.12"
-uuid = "0.8"
rustls = "0.20"
rustls-pemfile = "0.2"
chrono = { version = "0.4", features = [ "serde" ] }
diff --git a/src/cert_store.rs b/src/cert_store.rs
index 1b1a478..a58288c 100644
--- a/src/cert_store.rs
+++ b/src/cert_store.rs
@@ -13,7 +13,7 @@ use acme_micro::{Directory, DirectoryUrl};
use rustls::sign::CertifiedKey;
use crate::cert::{Cert, CertSer};
-use crate::consul::Consul;
+use crate::consul::*;
use crate::proxy_config::ProxyConfig;
pub struct CertStore {
@@ -86,6 +86,30 @@ impl CertStore {
pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> {
info!("Renewing certificate for {}", domain);
+ // ---- Acquire lock ----
+
+ let lock_path = format!("renew_lock/{}", domain);
+ let lock_name = format!("tricot/renew:{}@{}", domain, self.consul.local_node.clone());
+ let session = self
+ .consul
+ .create_session(&ConsulSessionRequest {
+ name: lock_name.clone(),
+ node: Some(self.consul.local_node.clone()),
+ lock_delay: Some("30s".into()),
+ ttl: Some("1m".into()),
+ behavior: Some("delete".into()),
+ })
+ .await?;
+ if !self
+ .consul
+ .acquire(&lock_path, lock_name.clone().into(), &session)
+ .await?
+ {
+ bail!("Lock is already taken, not renewing for now.");
+ }
+
+ // ---- Do let's encrypt stuff ----
+
let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?;
let contact = vec!["mailto:alex@adnab.me".to_string()];
@@ -149,6 +173,7 @@ impl CertStore {
self.consul
.kv_put_json(&format!("certs/{}", domain), &certser)
.await?;
+ self.consul.release(&lock_path, "".into(), &session).await?;
let cert = Arc::new(Cert::new(certser)?);
self.certs
diff --git a/src/consul.rs b/src/consul.rs
index 5ef96c5..6fc031c 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -26,21 +26,52 @@ pub struct ConsulNodeCatalog {
pub services: HashMap<String, ConsulServiceEntry>,
}
+// ---- Consul session management ----
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionRequest {
+ #[serde(rename = "Name")]
+ pub name: String,
+
+ #[serde(rename = "Node")]
+ pub node: Option<String>,
+
+ #[serde(rename = "LockDelay")]
+ pub lock_delay: Option<String>,
+
+ #[serde(rename = "TTL")]
+ pub ttl: Option<String>,
+
+ #[serde(rename = "Behavior")]
+ pub behavior: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ConsulSessionResponse {
+ #[serde(rename = "ID")]
+ pub id: String,
+}
+
#[derive(Clone)]
pub struct Consul {
client: reqwest::Client,
+
url: String,
kv_prefix: String,
+
idx: Option<u64>,
+
+ pub local_node: String,
}
impl Consul {
- pub fn new(url: &str, kv_prefix: &str) -> Self {
+ pub fn new(url: &str, kv_prefix: &str, local_node: &str) -> Self {
return Self {
client: reqwest::Client::new(),
url: url.to_string(),
kv_prefix: kv_prefix.to_string(),
idx: None,
+ local_node: local_node.into(),
};
}
@@ -64,6 +95,8 @@ impl Consul {
return Ok(resp);
}
+ // ---- KV get and put ----
+
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
debug!("kv_get {}", key);
@@ -118,4 +151,39 @@ impl Consul {
http.error_for_status()?;
Ok(())
}
+
+ // ---- Locking ----
+
+ pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
+ debug!("create_session {:?}", req);
+
+ let url = format!("{}/v1/session/create", self.url);
+ let http = self.client.put(&url).json(req).send().await?;
+ let resp: ConsulSessionResponse = http.json().await?;
+ Ok(resp.id)
+ }
+
+ pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
+ debug!("acquire {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?acquire={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ let resp: bool = http.json().await?;
+ Ok(resp)
+ }
+
+ pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
+ debug!("release {}", key);
+
+ let url = format!(
+ "{}/v1/kv/{}{}?release={}",
+ self.url, self.kv_prefix, key, session
+ );
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
}
diff --git a/src/main.rs b/src/main.rs
index df0845d..9d710b2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -19,8 +19,8 @@ async fn main() {
pretty_env_logger::init();
info!("Starting Tricot");
- let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/");
- let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone(), "carcajou");
+ let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou");
+ let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
let cert_store = cert_store::CertStore::new(consul.clone());
tokio::spawn(
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index 8c2444a..ba58484 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -82,18 +82,14 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
ProxyConfig { entries }
}
-pub fn spawn_proxy_config_task(
- mut consul: Consul,
- node: &str,
-) -> watch::Receiver<Arc<ProxyConfig>> {
+pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
entries: Vec::new(),
}));
- let node = node.to_string();
-
tokio::spawn(async move {
let mut retries = 0;
+ let node = consul.local_node.clone();
loop {
let catalog = match consul.watch_node(&node).await {