diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/acme.rs | 41 | ||||
-rw-r--r-- | src/cert.rs | 59 | ||||
-rw-r--r-- | src/cert_store.rs | 159 | ||||
-rw-r--r-- | src/consul.rs | 37 | ||||
-rw-r--r-- | src/http.rs | 75 | ||||
-rw-r--r-- | src/main.rs | 18 | ||||
-rw-r--r-- | src/proxy_config.rs | 12 |
7 files changed, 350 insertions, 51 deletions
diff --git a/src/acme.rs b/src/acme.rs deleted file mode 100644 index c6dbc5b..0000000 --- a/src/acme.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::collections::HashSet; - -use log::*; -use anyhow::Result; -use tokio::{sync::watch, time::sleep}; - -use acme_micro::{Error, Certificate, Directory, DirectoryUrl}; -use acme_micro::create_p384_key; - -use crate::consul::Consul; -use crate::proxy_config::ProxyConfig; - -pub async fn acme_task(mut consul: Consul, mut rx_proxy_config: watch::Receiver<ProxyConfig>) { - while rx_proxy_config.changed().await.is_ok() { - let mut domains: HashSet<String> = HashSet::new(); - - for ent in rx_proxy_config.borrow().entries.iter() { - domains.insert(ent.host.clone()); - } - info!("Ensuring we have certs for domains: {:#?}", domains); - - let results = futures::future::join_all( - domains.iter() - .map(|dom| renew_cert(dom, &consul)) - ).await; - - for (res, dom) in results.iter().zip(domains.iter()) { - if let Err(e) = res { - error!("{}: {}", dom, e); - } - } - } -} - -async fn renew_cert(dom: &str, consul: &Consul) -> Result<()> { - let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?; - let contact = vec!["mailto:alex@adnab.me".to_string()]; - let acc = dir.register_account(contact.clone())?; - // TODO - unimplemented!() -} diff --git a/src/cert.rs b/src/cert.rs new file mode 100644 index 0000000..de0d821 --- /dev/null +++ b/src/cert.rs @@ -0,0 +1,59 @@ +use anyhow::Result; + +use chrono::{Date, NaiveDate, Utc}; +use rustls::sign::CertifiedKey; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct CertSer { + pub hostname: String, + pub date: NaiveDate, + pub valid_days: i64, + + pub key_pem: String, + pub cert_pem: String, +} + +pub struct Cert { + pub ser: CertSer, + + pub certkey: CertifiedKey, +} + +impl Cert { + pub fn new(ser: CertSer) -> Result<Self> { + let pem_certs = rustls_pemfile::read_all(&mut ser.cert_pem.as_bytes())?; + let certs = pem_certs + .into_iter() + .filter_map(|cert| match cert { + rustls_pemfile::Item::X509Certificate(cert) => Some(rustls::Certificate(cert)), + _ => None, + }) + .collect::<Vec<_>>(); + + let pem_keys = rustls_pemfile::read_all(&mut ser.key_pem.as_bytes())?; + let keys = pem_keys + .into_iter() + .filter_map(|key| match key { + rustls_pemfile::Item::RSAKey(bytes) | rustls_pemfile::Item::PKCS8Key(bytes) => { + Some(rustls::sign::any_supported_type(&rustls::PrivateKey(bytes)).ok()?) + } + _ => None, + }) + .collect::<Vec<_>>(); + + if keys.len() != 1 { + bail!("{} keys present in pem file", keys.len()); + } + + let certkey = CertifiedKey::new(certs, keys.into_iter().next().unwrap()); + + Ok(Cert { ser, certkey }) + } + + pub fn is_old(&self) -> bool { + let date = Date::<Utc>::from_utc(self.ser.date, Utc); + let today = Utc::today(); + today - date > chrono::Duration::days(self.ser.valid_days / 2) + } +} diff --git a/src/cert_store.rs b/src/cert_store.rs new file mode 100644 index 0000000..6529395 --- /dev/null +++ b/src/cert_store.rs @@ -0,0 +1,159 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use anyhow::Result; +use chrono::Utc; +use log::*; +use tokio::sync::watch; + +use acme_micro::create_p384_key; +use acme_micro::{Directory, DirectoryUrl}; + +use crate::cert::{Cert, CertSer}; +use crate::consul::Consul; +use crate::proxy_config::ProxyConfig; + +pub struct CertStore { + consul: Consul, + certs: RwLock<HashMap<String, Arc<Cert>>>, +} + +impl CertStore { + pub fn new(consul: Consul) -> Arc<Self> { + Arc::new(Self { + consul, + certs: RwLock::new(HashMap::new()), + }) + } + + pub async fn watch_proxy_config( + self: Arc<Self>, + mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, + ) { + while rx_proxy_config.changed().await.is_ok() { + let mut domains: HashSet<String> = HashSet::new(); + + let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone(); + for ent in proxy_config.entries.iter() { + domains.insert(ent.host.clone()); + } + info!("Ensuring we have certs for domains: {:#?}", domains); + + for dom in domains.iter() { + if let Err(e) = self.get_cert(dom).await { + warn!("Error get_cert {}: {}", dom, e); + } + } + } + } + + pub async fn get_cert(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> { + // First, try locally. + { + let certs = self.certs.read().unwrap(); + if let Some(cert) = certs.get(domain) { + if !cert.is_old() { + return Ok(cert.clone()); + } + } + } + + // Second, try from Consul. + if let Some(consul_cert) = self + .consul + .kv_get_json::<CertSer>(&format!("certs/{}", domain)) + .await? + { + if let Ok(cert) = Cert::new(consul_cert) { + let cert = Arc::new(cert); + if !cert.is_old() { + self.certs + .write() + .unwrap() + .insert(domain.to_string(), cert.clone()); + return Ok(cert); + } + } + } + + // Third, ask from Let's Encrypt + self.renew_cert(domain).await + } + + pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> { + info!("Renewing certificate for {}", domain); + + let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?; + let contact = vec!["mailto:alex@adnab.me".to_string()]; + + let acc = + if let Some(acc_privkey) = self.consul.kv_get("letsencrypt_account_key.pem").await? { + info!("Using existing Let's encrypt account"); + dir.load_account(std::str::from_utf8(&acc_privkey)?, contact)? + } else { + info!("Creating new Let's encrypt account"); + let acc = dir.register_account(contact.clone())?; + self.consul + .kv_put( + "letsencrypt_account_key.pem", + acc.acme_private_key_pem()?.into_bytes().into(), + ) + .await?; + acc + }; + + let mut ord_new = acc.new_order(domain, &[])?; + let ord_csr = loop { + if let Some(ord_csr) = ord_new.confirm_validations() { + break ord_csr; + } + + let auths = ord_new.authorizations()?; + + info!("Creating challenge and storing in Consul"); + let chall = auths[0].http_challenge().unwrap(); + let chall_key = format!("challenge/{}", chall.http_token()); + self.consul + .kv_put(&chall_key, chall.http_proof()?.into()) + .await?; + + info!("Validating challenge"); + chall.validate(Duration::from_millis(5000))?; + + info!("Deleting challenge"); + self.consul.kv_delete(&chall_key).await?; + + ord_new.refresh()?; + }; + + let pkey_pri = create_p384_key()?; + let ord_cert = ord_csr.finalize_pkey(pkey_pri, Duration::from_millis(5000))?; + let cert = ord_cert.download_cert()?; + + info!("Keys and certificate obtained"); + let key_pem = cert.private_key().to_string(); + let cert_pem = cert.certificate().to_string(); + + let certser = CertSer { + hostname: domain.to_string(), + date: Utc::today().naive_utc(), + valid_days: cert.valid_days_left()?, + key_pem, + cert_pem, + }; + + self.consul + .kv_put_json(&format!("certs/{}", domain), &certser) + .await?; + + let cert = Arc::new(Cert::new(certser)?); + self.certs + .write() + .unwrap() + .insert(domain.to_string(), cert.clone()); + + info!("Cert successfully renewed: {}", domain); + Ok(cert) + } +} diff --git a/src/consul.rs b/src/consul.rs index 0f7d7c1..5ef96c5 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; use anyhow::Result; -use log::*; -use serde::{Deserialize, Serialize}; use bytes::Bytes; +use log::*; use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; // ---- Watch and retrieve Consul catalog ---- @@ -65,22 +65,53 @@ impl Consul { } pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> { + debug!("kv_get {}", key); + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); let http = self.client.get(&url).send().await?; match http.status() { StatusCode::OK => Ok(Some(http.bytes().await?)), StatusCode::NOT_FOUND => Ok(None), - _ => Err(anyhow!("Consul request failed: {:?}", http.error_for_status())), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), + } + } + + pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> { + debug!("kv_get_json {}", key); + + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); + let http = self.client.get(&url).send().await?; + match http.status() { + StatusCode::OK => Ok(Some(http.json().await?)), + StatusCode::NOT_FOUND => Ok(None), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), } } pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { + debug!("kv_put {}", key); + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.put(&url).body(bytes).send().await?; http.error_for_status()?; Ok(()) } + pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> { + debug!("kv_put_json {}", key); + + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.put(&url).json(value).send().await?; + http.error_for_status()?; + Ok(()) + } + pub async fn kv_delete(&self, key: &str) -> Result<()> { let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.delete(&url).send().await?; diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..385456a --- /dev/null +++ b/src/http.rs @@ -0,0 +1,75 @@ +use std::sync::Arc; + +use anyhow::Result; +use log::*; + +use http::uri::Authority; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server, StatusCode, Uri}; + +use crate::consul::Consul; + +const CHALLENGE_PREFIX: &str = "/.well-known/acme-challenge/"; + +async fn handle(req: Request<Body>, consul: Arc<Consul>) -> Result<Response<Body>> { + let path = req.uri().path(); + info!("HTTP request {}", path); + + if let Some(token) = path.strip_prefix(CHALLENGE_PREFIX) { + let response = consul.kv_get(&format!("challenge/{}", token)).await?; + match response { + Some(r) => Ok(Response::new(Body::from(r))), + None => Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from(""))?), + } + } else { + // Redirect to HTTPS + let uri2 = req.uri().clone(); + let mut parts = uri2.into_parts(); + + let host = req + .headers() + .get("Host") + .map(|h| h.to_str()) + .ok_or_else(|| anyhow!("Missing host header"))?? + .to_string(); + + parts.authority = Some(Authority::from_maybe_shared(host)?); + parts.scheme = Some("https".parse().unwrap()); + let uri2 = Uri::from_parts(parts)?; + + Ok(Response::builder() + .status(StatusCode::MOVED_PERMANENTLY) + .header("Location", uri2.to_string()) + .body(Body::from(""))?) + } +} + +pub async fn serve_http(consul: Consul) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let consul = Arc::new(consul); + // For every connection, we must make a `Service` to handle all + // incoming HTTP requests on said connection. + let make_svc = make_service_fn(|_conn| { + let consul = consul.clone(); + // This is the `Service` that will handle the connection. + // `service_fn` is a helper to convert a function that + // returns a Response into a `Service`. + async move { + Ok::<_, anyhow::Error>(service_fn(move |req: Request<Body>| { + let consul = consul.clone(); + handle(req, consul) + })) + } + }); + + let addr = ([0, 0, 0, 0], 1080).into(); + + let server = Server::bind(&addr).serve(make_svc); + + println!("Listening on http://{}", addr); + + server.await?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 3289c46..d7f1e24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,33 @@ #[macro_use] extern crate anyhow; +mod cert; +mod cert_store; mod consul; +mod http; mod proxy_config; -mod acme; use log::*; -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "tricot=debug") + } pretty_env_logger::init(); info!("Starting Tricot"); let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/"); let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone(), "carcajou"); - tokio::spawn(acme::acme_task(consul.clone(), rx_proxy_config.clone())); + let cert_store = cert_store::CertStore::new(consul.clone()); + tokio::spawn( + cert_store + .clone() + .watch_proxy_config(rx_proxy_config.clone()), + ); + + tokio::spawn(http::serve_http(consul.clone())); while rx_proxy_config.changed().await.is_ok() { info!("Proxy config: {:#?}", *rx_proxy_config.borrow()); diff --git a/src/proxy_config.rs b/src/proxy_config.rs index e118a33..891fdef 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use std::sync::Arc; use std::{cmp, time::Duration}; use log::*; @@ -74,10 +75,13 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { ProxyConfig { entries } } -pub fn spawn_proxy_config_task(mut consul: Consul, node: &str) -> watch::Receiver<ProxyConfig> { - let (tx, rx) = watch::channel(ProxyConfig { +pub fn spawn_proxy_config_task( + mut consul: Consul, + node: &str, +) -> watch::Receiver<Arc<ProxyConfig>> { + let (tx, rx) = watch::channel(Arc::new(ProxyConfig { entries: Vec::new(), - }); + })); let node = node.to_string(); @@ -105,7 +109,7 @@ pub fn spawn_proxy_config_task(mut consul: Consul, node: &str) -> watch::Receive let config = parse_consul_catalog(&catalog); debug!("Extracted configuration: {:#?}", config); - tx.send(config).expect("Internal error"); + tx.send(Arc::new(config)).expect("Internal error"); } }); |