aboutsummaryrefslogtreecommitdiff
path: root/src/metrics.rs
blob: 16720f0967b52c95421c7c4532c2d042ae50f345 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Result;
use futures::future::*;
use tracing::*;

use hyper::{
	header::CONTENT_TYPE,
	service::{make_service_fn, service_fn},
	Body, Method, Request, Response, Server,
};
use opentelemetry::sdk::metrics;
use prometheus::{Encoder, TextEncoder};

pub struct MetricsServer {
	bind_addr: Option<SocketAddr>,
	registry: prometheus::Registry,
}

impl MetricsServer {
	pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
		let registry = prometheus::Registry::new();
		let exporter = opentelemetry_prometheus::exporter()
			.with_registry(registry.clone())
			.with_aggregation_selector(AggregationSelector)
			.without_counter_suffixes()
			.build()
			.expect("build prometheus registry");
		let mp = metrics::MeterProvider::builder()
			.with_reader(exporter)
			.build();
		opentelemetry::global::set_meter_provider(mp);
		Self {
			bind_addr,
			registry,
		}
	}

	pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> {
		if let Some(addr) = self.bind_addr {
			let metrics_server = Arc::new(self);

			let make_svc = make_service_fn(move |_conn| {
				let metrics_server = metrics_server.clone();
				async move {
					Ok::<_, Infallible>(service_fn(move |req| {
						metrics_server.clone().serve_req(req)
					}))
				}
			});

			let server = Server::bind(&addr).serve(make_svc);
			let graceful = server.with_graceful_shutdown(shutdown_signal);
			info!("Metrics server listening on http://{}", addr);

			graceful.await?;
		} else {
			info!("Metrics server is disabled");
		}

		Ok(())
	}

	async fn serve_req(
		self: Arc<MetricsServer>,
		req: Request<Body>,
	) -> Result<Response<Body>, hyper::Error> {
		debug!("{} {}", req.method(), req.uri());

		let response = match (req.method(), req.uri().path()) {
            (&Method::GET, "/hprof") => {
                let buff = common_mem_prof::dump_profile().await.unwrap();
                tokio::fs::write("memdump.hprof", buff).await.unwrap();
                Response::builder().status(204).body(Body::from(vec![])).unwrap()
            },
			(&Method::GET, "/metrics") => {
				let mut buffer = vec![];
				let encoder = TextEncoder::new();
				let metric_families = self.registry.gather();
				encoder.encode(&metric_families, &mut buffer).unwrap();

				Response::builder()
					.status(200)
					.header(CONTENT_TYPE, encoder.format_type())
					.body(Body::from(buffer))
					.unwrap()
			}
			_ => Response::builder()
				.status(404)
				.body(Body::from("Not implemented"))
				.unwrap(),
		};

		Ok(response)
	}
}

struct AggregationSelector;

impl metrics::reader::AggregationSelector for AggregationSelector {
	fn aggregation(&self, kind: metrics::InstrumentKind) -> metrics::Aggregation {
		match kind {
			metrics::InstrumentKind::Histogram => metrics::Aggregation::ExplicitBucketHistogram {
				boundaries: vec![
					0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07,
					0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30.,
					40., 50., 60., 70., 100.,
				],
				record_min_max: true,
			},
			_ => metrics::reader::DefaultAggregationSelector::new().aggregation(kind),
		}
	}
}