diff options
Diffstat (limited to 'src/metrics.rs')
-rw-r--r-- | src/metrics.rs | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..c635269 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,83 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::Result; +use futures::future::*; +use log::*; + +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, +}; +use opentelemetry_prometheus::PrometheusExporter; +use prometheus::{Encoder, TextEncoder}; + +pub struct MetricsServer { + bind_addr: Option<SocketAddr>, + exporter: PrometheusExporter, +} + +impl MetricsServer { + pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer { + let exporter = opentelemetry_prometheus::exporter().init(); + Self { + bind_addr, + exporter, + } + } + + pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> { + if let Some(addr) = self.bind_addr { + let metrics_server = Arc::new(self); + + let make_svc = make_service_fn(move |_conn| { + let metrics_server = metrics_server.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + metrics_server.clone().serve_req(req) + })) + } + }); + + let server = Server::bind(&addr).serve(make_svc); + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("Metrics server listening on http://{}", addr); + + graceful.await?; + } else { + info!("Metrics server is disabled"); + } + + Ok(()) + } + + async fn serve_req( + self: Arc<MetricsServer>, + req: Request<Body>, + ) -> Result<Response<Body>, hyper::Error> { + debug!("{} {}", req.method(), req.uri()); + + let response = match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = self.exporter.registry().gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } + _ => Response::builder() + .status(404) + .body(Body::from("Not implemented")) + .unwrap(), + }; + + Ok(response) + } +} |