aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-05 18:43:48 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-05 18:43:48 +0100
commit5d38f2cf7f312de69a3106556fc43219ca1473c3 (patch)
tree2ecc767ffeae5dd9b4a528f8294a61385a1fb5b1 /src
parent43a0fe14b2ae6a3e97f39169cda7ca5579a3259c (diff)
downloadtricot-5d38f2cf7f312de69a3106556fc43219ca1473c3.tar.gz
tricot-5d38f2cf7f312de69a3106556fc43219ca1473c3.zip
Add basic support for metrics
Diffstat (limited to 'src')
-rw-r--r--src/https.rs72
-rw-r--r--src/main.rs15
-rw-r--r--src/metrics.rs83
-rw-r--r--src/proxy_config.rs70
4 files changed, 226 insertions, 14 deletions
diff --git a/src/https.rs b/src/https.rs
index ba4365d..cb5e1c8 100644
--- a/src/https.rs
+++ b/src/https.rs
@@ -21,6 +21,8 @@ use tokio::sync::watch;
use tokio_rustls::TlsAcceptor;
use tokio_util::io::{ReaderStream, StreamReader};
+use opentelemetry::{metrics, KeyValue};
+
use crate::cert_store::{CertStore, StoreResolver};
use crate::proxy_config::ProxyConfig;
use crate::reverse_proxy;
@@ -33,6 +35,11 @@ pub struct HttpsConfig {
pub compress_mime_types: Vec<String>,
}
+struct HttpsMetrics {
+ requests_received: metrics::Counter<u64>,
+ requests_served: metrics::Counter<u64>,
+}
+
pub async fn serve_https(
config: HttpsConfig,
cert_store: Arc<CertStore>,
@@ -41,6 +48,18 @@ pub async fn serve_https(
) -> Result<()> {
let config = Arc::new(config);
+ let meter = opentelemetry::global::meter("tricot");
+ let metrics = Arc::new(HttpsMetrics {
+ requests_received: meter
+ .u64_counter("https_requests_received")
+ .with_description("Total number of requests received over HTTPS")
+ .init(),
+ requests_served: meter
+ .u64_counter("https_requests_served")
+ .with_description("Total number of requests served over HTTPS")
+ .init(),
+ });
+
let mut tls_cfg = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
@@ -71,6 +90,7 @@ pub async fn serve_https(
let rx_proxy_config = rx_proxy_config.clone();
let tls_acceptor = tls_acceptor.clone();
let config = config.clone();
+ let metrics = metrics.clone();
let mut must_exit_2 = must_exit.clone();
let conn = tokio::spawn(async move {
@@ -84,7 +104,8 @@ pub async fn serve_https(
let https_config = config.clone();
let proxy_config: Arc<ProxyConfig> =
rx_proxy_config.borrow().clone();
- handle_outer(remote_addr, req, https_config, proxy_config)
+ let metrics = metrics.clone();
+ handle_outer(remote_addr, req, https_config, proxy_config, metrics)
}),
)
.with_upgrades();
@@ -124,17 +145,43 @@ async fn handle_outer(
req: Request<Body>,
https_config: Arc<HttpsConfig>,
proxy_config: Arc<ProxyConfig>,
+ metrics: Arc<HttpsMetrics>,
) -> Result<Response<Body>, Infallible> {
- match handle(remote_addr, req, https_config, proxy_config).await {
+ let mut tags = vec![
+ KeyValue::new("method", req.method().to_string()),
+ KeyValue::new(
+ "host",
+ req.uri()
+ .authority()
+ .map(|auth| auth.to_string())
+ .or_else(|| {
+ req.headers()
+ .get("host")
+ .map(|host| host.to_str().unwrap_or_default().to_string())
+ })
+ .unwrap_or_default(),
+ ),
+ ];
+ metrics.requests_received.add(1, &tags);
+
+ let resp = match handle(remote_addr, req, https_config, proxy_config, &mut tags).await {
Err(e) => {
warn!("Handler error: {}", e);
- Ok(Response::builder()
+ Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{}", e)))
- .unwrap())
+ .unwrap()
}
- Ok(r) => Ok(r),
- }
+ Ok(r) => r,
+ };
+
+ tags.push(KeyValue::new(
+ "response_code",
+ resp.status().as_u16().to_string(),
+ ));
+ metrics.requests_served.add(1, &tags);
+
+ Ok(resp)
}
// Custom echo service, handling two different routes and a
@@ -144,6 +191,7 @@ async fn handle(
req: Request<Body>,
https_config: Arc<HttpsConfig>,
proxy_config: Arc<ProxyConfig>,
+ tags: &mut Vec<KeyValue>,
) -> Result<Response<Body>, anyhow::Error> {
let method = req.method().clone();
let uri = req.uri().to_string();
@@ -184,6 +232,18 @@ async fn handle(
});
if let Some(proxy_to) = best_match {
+ tags.push(KeyValue::new("service_name", proxy_to.service_name.clone()));
+ tags.push(KeyValue::new(
+ "target_addr",
+ proxy_to.target_addr.to_string(),
+ ));
+ tags.push(KeyValue::new(
+ "https_target",
+ proxy_to.https_target.to_string(),
+ ));
+ tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string()));
+ tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string()));
+
proxy_to.calls.fetch_add(1, Ordering::SeqCst);
debug!("{}{} -> {}", host, path, proxy_to);
diff --git a/src/main.rs b/src/main.rs
index edc79b4..cb39c49 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -15,6 +15,7 @@ mod cert_store;
mod consul;
mod http;
mod https;
+mod metrics;
mod proxy_config;
mod reverse_proxy;
mod tls_util;
@@ -80,6 +81,10 @@ struct Opt {
)]
pub https_bind_addr: SocketAddr,
+ /// Bind address for metrics server (Prometheus format over HTTP)
+ #[structopt(long = "metrics-bind-addr", env = "TRICOT_METRICS_BIND_ADDR")]
+ pub metrics_bind_addr: Option<SocketAddr>,
+
/// E-mail address for Let's Encrypt certificate requests
#[structopt(long = "letsencrypt-email", env = "TRICOT_LETSENCRYPT_EMAIL")]
pub letsencrypt_email: String,
@@ -123,6 +128,8 @@ async fn main() {
let _ = provoke_exit.send(true);
};
+ let metrics_server = metrics::MetricsServer::init(opt.metrics_bind_addr);
+
let consul_config = consul::ConsulConfig {
addr: opt.consul_addr.clone(),
ca_cert: opt.consul_ca_cert.clone(),
@@ -143,6 +150,13 @@ async fn main() {
exit_on_err.clone(),
);
+ let metrics_task = tokio::spawn(
+ metrics_server
+ .run(wait_from(exit_signal.clone()))
+ .map_err(exit_on_err.clone())
+ .then(|_| async { info!("Metrics server exited") }),
+ );
+
let http_task = tokio::spawn(
http::serve_http(
opt.http_bind_addr,
@@ -176,6 +190,7 @@ async fn main() {
let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone()));
+ let _ = metrics_task.await.expect("Tokio task await failure");
let _ = http_task.await.expect("Tokio task await failure");
let _ = https_task.await.expect("Tokio task await failure");
let _ = dump_task.await.expect("Tokio task await failure");
diff --git a/src/metrics.rs b/src/metrics.rs
new file mode 100644
index 0000000..c635269
--- /dev/null
+++ b/src/metrics.rs
@@ -0,0 +1,83 @@
+use std::convert::Infallible;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::future::*;
+use log::*;
+
+use hyper::{
+ header::CONTENT_TYPE,
+ service::{make_service_fn, service_fn},
+ Body, Method, Request, Response, Server,
+};
+use opentelemetry_prometheus::PrometheusExporter;
+use prometheus::{Encoder, TextEncoder};
+
+pub struct MetricsServer {
+ bind_addr: Option<SocketAddr>,
+ exporter: PrometheusExporter,
+}
+
+impl MetricsServer {
+ pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
+ let exporter = opentelemetry_prometheus::exporter().init();
+ Self {
+ bind_addr,
+ exporter,
+ }
+ }
+
+ pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> {
+ if let Some(addr) = self.bind_addr {
+ let metrics_server = Arc::new(self);
+
+ let make_svc = make_service_fn(move |_conn| {
+ let metrics_server = metrics_server.clone();
+ async move {
+ Ok::<_, Infallible>(service_fn(move |req| {
+ metrics_server.clone().serve_req(req)
+ }))
+ }
+ });
+
+ let server = Server::bind(&addr).serve(make_svc);
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ info!("Metrics server listening on http://{}", addr);
+
+ graceful.await?;
+ } else {
+ info!("Metrics server is disabled");
+ }
+
+ Ok(())
+ }
+
+ async fn serve_req(
+ self: Arc<MetricsServer>,
+ req: Request<Body>,
+ ) -> Result<Response<Body>, hyper::Error> {
+ debug!("{} {}", req.method(), req.uri());
+
+ let response = match (req.method(), req.uri().path()) {
+ (&Method::GET, "/metrics") => {
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+ let metric_families = self.exporter.registry().gather();
+ encoder.encode(&metric_families, &mut buffer).unwrap();
+
+ Response::builder()
+ .status(200)
+ .header(CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))
+ .unwrap()
+ }
+ _ => Response::builder()
+ .status(404)
+ .body(Body::from("Not implemented"))
+ .unwrap(),
+ };
+
+ Ok(response)
+ }
+}
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index e45cc7b..24ade8b 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -4,6 +4,7 @@ use std::sync::{atomic, Arc};
use std::{cmp, time::Duration};
use anyhow::Result;
+use opentelemetry::{metrics, KeyValue};
use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
@@ -38,6 +39,15 @@ impl HostDescription {
}
}
+impl std::fmt::Display for HostDescription {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ HostDescription::Hostname(h) => write!(f, "{}", h),
+ HostDescription::Pattern(p) => write!(f, "Pattern('{}')", p.as_str()),
+ }
+ }
+}
+
#[derive(Debug)]
pub struct ProxyEntry {
/// Publicly exposed TLS hostnames for matching this rule
@@ -47,6 +57,8 @@ pub struct ProxyEntry {
/// Priority with which this rule is considered (highest first)
pub priority: u32,
+ /// Consul service name
+ pub service_name: String,
/// Node address (ip+port) to handle requests that match this entry
pub target_addr: SocketAddr,
/// Is the target serving HTTPS instead of HTTP?
@@ -75,14 +87,11 @@ impl std::fmt::Display for ProxyEntry {
write!(f, "https://")?;
}
write!(f, "{} ", self.target_addr)?;
- match &self.host {
- HostDescription::Hostname(h) => write!(f, "{}", h)?,
- HostDescription::Pattern(p) => write!(f, "Pattern('{}')", p.as_str())?,
- }
write!(
f,
- "{} {}",
- self.path_prefix.as_ref().unwrap_or(&String::new()),
+ "{}{} {}",
+ self.host,
+ self.path_prefix.as_deref().unwrap_or_default(),
self.priority
)?;
if self.same_node {
@@ -113,6 +122,7 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
}
fn parse_tricot_tag(
+ service_name: String,
tag: &str,
target_addr: SocketAddr,
add_headers: &[(String, String)],
@@ -148,6 +158,7 @@ fn parse_tricot_tag(
};
Some(ProxyEntry {
+ service_name,
target_addr,
https_target: (splits[0] == "tricot-https"),
host,
@@ -178,7 +189,7 @@ fn parse_consul_catalog(
let mut entries = vec![];
- for (_, svc) in catalog.services.iter() {
+ for (service_name, svc) in catalog.services.iter() {
let ip_addr = match svc.address.parse() {
Ok(ip) => ip,
_ => match catalog.node.address.parse() {
@@ -210,7 +221,14 @@ fn parse_consul_catalog(
}
for tag in svc.tags.iter() {
- if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..], same_node, same_site) {
+ if let Some(ent) = parse_tricot_tag(
+ service_name.clone(),
+ tag,
+ addr,
+ &add_headers[..],
+ same_node,
+ same_site,
+ ) {
entries.push(ent);
}
}
@@ -239,6 +257,7 @@ pub fn spawn_proxy_config_task(
entries: Vec::new(),
}));
+ let metrics = ProxyConfigMetrics::new(rx.clone());
let consul = Arc::new(consul);
tokio::spawn(async move {
@@ -348,11 +367,46 @@ pub fn spawn_proxy_config_task(
tx.send(Arc::new(config)).expect("Internal error");
}
+
+ drop(metrics); // ensure Metrics lives up to here
});
rx
}
+// ----
+
+struct ProxyConfigMetrics {
+ _proxy_config_entries: metrics::ValueObserver<u64>,
+}
+
+impl ProxyConfigMetrics {
+ fn new(rx: watch::Receiver<Arc<ProxyConfig>>) -> Self {
+ let meter = opentelemetry::global::meter("tricot");
+ Self {
+ _proxy_config_entries: meter
+ .u64_value_observer("proxy_config_entries", move |observer| {
+ let mut patterns = HashMap::new();
+ for ent in rx.borrow().entries.iter() {
+ let pat = format!(
+ "{}{}",
+ ent.host,
+ ent.path_prefix.as_deref().unwrap_or_default()
+ );
+ *patterns.entry(pat).or_default() += 1;
+ }
+ for (pat, num) in patterns {
+ observer.observe(num, &[KeyValue::new("host", pat)]);
+ }
+ })
+ .with_description("Number of proxy entries (back-ends) configured in Tricot")
+ .init(),
+ }
+ }
+}
+
+// ----
+
#[cfg(test)]
mod tests {
use super::*;