aboutsummaryrefslogtreecommitdiff
path: root/src/metrics.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/metrics.rs')
-rw-r--r--src/metrics.rs83
1 files changed, 83 insertions, 0 deletions
diff --git a/src/metrics.rs b/src/metrics.rs
new file mode 100644
index 0000000..c635269
--- /dev/null
+++ b/src/metrics.rs
@@ -0,0 +1,83 @@
+use std::convert::Infallible;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::future::*;
+use log::*;
+
+use hyper::{
+ header::CONTENT_TYPE,
+ service::{make_service_fn, service_fn},
+ Body, Method, Request, Response, Server,
+};
+use opentelemetry_prometheus::PrometheusExporter;
+use prometheus::{Encoder, TextEncoder};
+
+pub struct MetricsServer {
+ bind_addr: Option<SocketAddr>,
+ exporter: PrometheusExporter,
+}
+
+impl MetricsServer {
+ pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
+ let exporter = opentelemetry_prometheus::exporter().init();
+ Self {
+ bind_addr,
+ exporter,
+ }
+ }
+
+ pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> {
+ if let Some(addr) = self.bind_addr {
+ let metrics_server = Arc::new(self);
+
+ let make_svc = make_service_fn(move |_conn| {
+ let metrics_server = metrics_server.clone();
+ async move {
+ Ok::<_, Infallible>(service_fn(move |req| {
+ metrics_server.clone().serve_req(req)
+ }))
+ }
+ });
+
+ let server = Server::bind(&addr).serve(make_svc);
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ info!("Metrics server listening on http://{}", addr);
+
+ graceful.await?;
+ } else {
+ info!("Metrics server is disabled");
+ }
+
+ Ok(())
+ }
+
+ async fn serve_req(
+ self: Arc<MetricsServer>,
+ req: Request<Body>,
+ ) -> Result<Response<Body>, hyper::Error> {
+ debug!("{} {}", req.method(), req.uri());
+
+ let response = match (req.method(), req.uri().path()) {
+ (&Method::GET, "/metrics") => {
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+ let metric_families = self.exporter.registry().gather();
+ encoder.encode(&metric_families, &mut buffer).unwrap();
+
+ Response::builder()
+ .status(200)
+ .header(CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))
+ .unwrap()
+ }
+ _ => Response::builder()
+ .status(404)
+ .body(Body::from("Not implemented"))
+ .unwrap(),
+ };
+
+ Ok(response)
+ }
+}