From 5d38f2cf7f312de69a3106556fc43219ca1473c3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Dec 2022 18:43:48 +0100 Subject: Add basic support for metrics --- src/https.rs | 72 ++++++++++++++++++++++++++++++++++++++++++---- src/main.rs | 15 ++++++++++ src/metrics.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/proxy_config.rs | 70 ++++++++++++++++++++++++++++++++++++++------ 4 files changed, 226 insertions(+), 14 deletions(-) create mode 100644 src/metrics.rs (limited to 'src') diff --git a/src/https.rs b/src/https.rs index ba4365d..cb5e1c8 100644 --- a/src/https.rs +++ b/src/https.rs @@ -21,6 +21,8 @@ use tokio::sync::watch; use tokio_rustls::TlsAcceptor; use tokio_util::io::{ReaderStream, StreamReader}; +use opentelemetry::{metrics, KeyValue}; + use crate::cert_store::{CertStore, StoreResolver}; use crate::proxy_config::ProxyConfig; use crate::reverse_proxy; @@ -33,6 +35,11 @@ pub struct HttpsConfig { pub compress_mime_types: Vec, } +struct HttpsMetrics { + requests_received: metrics::Counter, + requests_served: metrics::Counter, +} + pub async fn serve_https( config: HttpsConfig, cert_store: Arc, @@ -41,6 +48,18 @@ pub async fn serve_https( ) -> Result<()> { let config = Arc::new(config); + let meter = opentelemetry::global::meter("tricot"); + let metrics = Arc::new(HttpsMetrics { + requests_received: meter + .u64_counter("https_requests_received") + .with_description("Total number of requests received over HTTPS") + .init(), + requests_served: meter + .u64_counter("https_requests_served") + .with_description("Total number of requests served over HTTPS") + .init(), + }); + let mut tls_cfg = rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() @@ -71,6 +90,7 @@ pub async fn serve_https( let rx_proxy_config = rx_proxy_config.clone(); let tls_acceptor = tls_acceptor.clone(); let config = config.clone(); + let metrics = metrics.clone(); let mut must_exit_2 = must_exit.clone(); let conn = tokio::spawn(async move { @@ -84,7 +104,8 @@ pub async fn serve_https( let https_config = config.clone(); let proxy_config: Arc = rx_proxy_config.borrow().clone(); - handle_outer(remote_addr, req, https_config, proxy_config) + let metrics = metrics.clone(); + handle_outer(remote_addr, req, https_config, proxy_config, metrics) }), ) .with_upgrades(); @@ -124,17 +145,43 @@ async fn handle_outer( req: Request, https_config: Arc, proxy_config: Arc, + metrics: Arc, ) -> Result, Infallible> { - match handle(remote_addr, req, https_config, proxy_config).await { + let mut tags = vec![ + KeyValue::new("method", req.method().to_string()), + KeyValue::new( + "host", + req.uri() + .authority() + .map(|auth| auth.to_string()) + .or_else(|| { + req.headers() + .get("host") + .map(|host| host.to_str().unwrap_or_default().to_string()) + }) + .unwrap_or_default(), + ), + ]; + metrics.requests_received.add(1, &tags); + + let resp = match handle(remote_addr, req, https_config, proxy_config, &mut tags).await { Err(e) => { warn!("Handler error: {}", e); - Ok(Response::builder() + Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(format!("{}", e))) - .unwrap()) + .unwrap() } - Ok(r) => Ok(r), - } + Ok(r) => r, + }; + + tags.push(KeyValue::new( + "response_code", + resp.status().as_u16().to_string(), + )); + metrics.requests_served.add(1, &tags); + + Ok(resp) } // Custom echo service, handling two different routes and a @@ -144,6 +191,7 @@ async fn handle( req: Request, https_config: Arc, proxy_config: Arc, + tags: &mut Vec, ) -> Result, anyhow::Error> { let method = req.method().clone(); let uri = req.uri().to_string(); @@ -184,6 +232,18 @@ async fn handle( }); if let Some(proxy_to) = best_match { + tags.push(KeyValue::new("service_name", proxy_to.service_name.clone())); + tags.push(KeyValue::new( + "target_addr", + proxy_to.target_addr.to_string(), + )); + tags.push(KeyValue::new( + "https_target", + proxy_to.https_target.to_string(), + )); + tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string())); + tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string())); + proxy_to.calls.fetch_add(1, Ordering::SeqCst); debug!("{}{} -> {}", host, path, proxy_to); diff --git a/src/main.rs b/src/main.rs index edc79b4..cb39c49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ mod cert_store; mod consul; mod http; mod https; +mod metrics; mod proxy_config; mod reverse_proxy; mod tls_util; @@ -80,6 +81,10 @@ struct Opt { )] pub https_bind_addr: SocketAddr, + /// Bind address for metrics server (Prometheus format over HTTP) + #[structopt(long = "metrics-bind-addr", env = "TRICOT_METRICS_BIND_ADDR")] + pub metrics_bind_addr: Option, + /// E-mail address for Let's Encrypt certificate requests #[structopt(long = "letsencrypt-email", env = "TRICOT_LETSENCRYPT_EMAIL")] pub letsencrypt_email: String, @@ -123,6 +128,8 @@ async fn main() { let _ = provoke_exit.send(true); }; + let metrics_server = metrics::MetricsServer::init(opt.metrics_bind_addr); + let consul_config = consul::ConsulConfig { addr: opt.consul_addr.clone(), ca_cert: opt.consul_ca_cert.clone(), @@ -143,6 +150,13 @@ async fn main() { exit_on_err.clone(), ); + let metrics_task = tokio::spawn( + metrics_server + .run(wait_from(exit_signal.clone())) + .map_err(exit_on_err.clone()) + .then(|_| async { info!("Metrics server exited") }), + ); + let http_task = tokio::spawn( http::serve_http( opt.http_bind_addr, @@ -176,6 +190,7 @@ async fn main() { let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone())); + let _ = metrics_task.await.expect("Tokio task await failure"); let _ = http_task.await.expect("Tokio task await failure"); let _ = https_task.await.expect("Tokio task await failure"); let _ = dump_task.await.expect("Tokio task await failure"); diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..c635269 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,83 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::Result; +use futures::future::*; +use log::*; + +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, +}; +use opentelemetry_prometheus::PrometheusExporter; +use prometheus::{Encoder, TextEncoder}; + +pub struct MetricsServer { + bind_addr: Option, + exporter: PrometheusExporter, +} + +impl MetricsServer { + pub fn init(bind_addr: Option) -> MetricsServer { + let exporter = opentelemetry_prometheus::exporter().init(); + Self { + bind_addr, + exporter, + } + } + + pub async fn run(self, shutdown_signal: impl Future) -> Result<()> { + if let Some(addr) = self.bind_addr { + let metrics_server = Arc::new(self); + + let make_svc = make_service_fn(move |_conn| { + let metrics_server = metrics_server.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + metrics_server.clone().serve_req(req) + })) + } + }); + + let server = Server::bind(&addr).serve(make_svc); + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("Metrics server listening on http://{}", addr); + + graceful.await?; + } else { + info!("Metrics server is disabled"); + } + + Ok(()) + } + + async fn serve_req( + self: Arc, + req: Request, + ) -> Result, hyper::Error> { + debug!("{} {}", req.method(), req.uri()); + + let response = match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = self.exporter.registry().gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } + _ => Response::builder() + .status(404) + .body(Body::from("Not implemented")) + .unwrap(), + }; + + Ok(response) + } +} diff --git a/src/proxy_config.rs b/src/proxy_config.rs index e45cc7b..24ade8b 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -4,6 +4,7 @@ use std::sync::{atomic, Arc}; use std::{cmp, time::Duration}; use anyhow::Result; +use opentelemetry::{metrics, KeyValue}; use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; @@ -38,6 +39,15 @@ impl HostDescription { } } +impl std::fmt::Display for HostDescription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HostDescription::Hostname(h) => write!(f, "{}", h), + HostDescription::Pattern(p) => write!(f, "Pattern('{}')", p.as_str()), + } + } +} + #[derive(Debug)] pub struct ProxyEntry { /// Publicly exposed TLS hostnames for matching this rule @@ -47,6 +57,8 @@ pub struct ProxyEntry { /// Priority with which this rule is considered (highest first) pub priority: u32, + /// Consul service name + pub service_name: String, /// Node address (ip+port) to handle requests that match this entry pub target_addr: SocketAddr, /// Is the target serving HTTPS instead of HTTP? @@ -75,14 +87,11 @@ impl std::fmt::Display for ProxyEntry { write!(f, "https://")?; } write!(f, "{} ", self.target_addr)?; - match &self.host { - HostDescription::Hostname(h) => write!(f, "{}", h)?, - HostDescription::Pattern(p) => write!(f, "Pattern('{}')", p.as_str())?, - } write!( f, - "{} {}", - self.path_prefix.as_ref().unwrap_or(&String::new()), + "{}{} {}", + self.host, + self.path_prefix.as_deref().unwrap_or_default(), self.priority )?; if self.same_node { @@ -113,6 +122,7 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration { } fn parse_tricot_tag( + service_name: String, tag: &str, target_addr: SocketAddr, add_headers: &[(String, String)], @@ -148,6 +158,7 @@ fn parse_tricot_tag( }; Some(ProxyEntry { + service_name, target_addr, https_target: (splits[0] == "tricot-https"), host, @@ -178,7 +189,7 @@ fn parse_consul_catalog( let mut entries = vec![]; - for (_, svc) in catalog.services.iter() { + for (service_name, svc) in catalog.services.iter() { let ip_addr = match svc.address.parse() { Ok(ip) => ip, _ => match catalog.node.address.parse() { @@ -210,7 +221,14 @@ fn parse_consul_catalog( } for tag in svc.tags.iter() { - if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..], same_node, same_site) { + if let Some(ent) = parse_tricot_tag( + service_name.clone(), + tag, + addr, + &add_headers[..], + same_node, + same_site, + ) { entries.push(ent); } } @@ -239,6 +257,7 @@ pub fn spawn_proxy_config_task( entries: Vec::new(), })); + let metrics = ProxyConfigMetrics::new(rx.clone()); let consul = Arc::new(consul); tokio::spawn(async move { @@ -348,11 +367,46 @@ pub fn spawn_proxy_config_task( tx.send(Arc::new(config)).expect("Internal error"); } + + drop(metrics); // ensure Metrics lives up to here }); rx } +// ---- + +struct ProxyConfigMetrics { + _proxy_config_entries: metrics::ValueObserver, +} + +impl ProxyConfigMetrics { + fn new(rx: watch::Receiver>) -> Self { + let meter = opentelemetry::global::meter("tricot"); + Self { + _proxy_config_entries: meter + .u64_value_observer("proxy_config_entries", move |observer| { + let mut patterns = HashMap::new(); + for ent in rx.borrow().entries.iter() { + let pat = format!( + "{}{}", + ent.host, + ent.path_prefix.as_deref().unwrap_or_default() + ); + *patterns.entry(pat).or_default() += 1; + } + for (pat, num) in patterns { + observer.observe(num, &[KeyValue::new("host", pat)]); + } + }) + .with_description("Number of proxy entries (back-ends) configured in Tricot") + .init(), + } + } +} + +// ---- + #[cfg(test)] mod tests { use super::*; -- cgit v1.2.3