aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-22 14:52:41 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:53:36 +0100
commit2a5609b292de019085f93a79b7b73f7a8341bf51 (patch)
tree7175cbeb1fc4eabcec936ccb8aee2945a92b1c60
parent818daa5c786813fdf50fecb6022e29b18e509b62 (diff)
downloadgarage-2a5609b292de019085f93a79b7b73f7a8341bf51.tar.gz
garage-2a5609b292de019085f93a79b7b73f7a8341bf51.zip
Add metrics to API endpoint
-rw-r--r--src/admin/metrics.rs2
-rw-r--r--src/api/api_server.rs113
-rw-r--r--src/garage/server.rs5
-rw-r--r--src/model/block.rs11
-rw-r--r--src/rpc/rpc_helper.rs7
-rw-r--r--src/table/table.rs6
-rw-r--r--src/util/metrics.rs29
7 files changed, 135 insertions, 38 deletions
diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs
index 64721af2..02549fe9 100644
--- a/src/admin/metrics.rs
+++ b/src/admin/metrics.rs
@@ -1,7 +1,7 @@
use std::convert::Infallible;
+use std::net::SocketAddr;
use std::sync::Arc;
use std::time::SystemTime;
-use std::net::SocketAddr;
use futures::future::*;
use hyper::{
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 1bab3aaa..8502d9d8 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
+ global,
+ metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::metrics::RecordDuration;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
@@ -35,6 +38,34 @@ use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*;
+struct ApiMetrics {
+ request_counter: Counter<u64>,
+ error_counter: Counter<u64>,
+ request_duration: ValueRecorder<f64>,
+}
+
+impl ApiMetrics {
+ fn new() -> Self {
+ let meter = global::meter("garage/api");
+ Self {
+ request_counter: meter
+ .u64_counter("api.request_counter")
+ .with_description("Number of API calls to the various S3 API endpoints")
+ .init(),
+ error_counter: meter
+ .u64_counter("api.error_counter")
+ .with_description(
+ "Number of API calls to the various S3 API endpoints that resulted in errors",
+ )
+ .init(),
+ request_duration: meter
+ .f64_value_recorder("api.request_duration")
+ .with_description("Duration of API calls to the various S3 API endpoints")
+ .init(),
+ }
+ }
+}
+
/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
@@ -42,30 +73,19 @@ pub async fn run_api_server(
) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr;
+ let metrics = Arc::new(ApiMetrics::new());
+
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
+ let metrics = metrics.clone();
+
let client_addr = conn.remote_addr();
async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone();
+ let metrics = metrics.clone();
- let tracer = opentelemetry::global::tracer("garage");
- let trace_id = gen_uuid();
- let span = tracer
- .span_builder("S3 API call (unknown)")
- .with_trace_id(
- opentelemetry::trace::TraceId::from_hex(&hex::encode(
- &trace_id.as_slice()[..16],
- ))
- .unwrap(),
- )
- .with_attributes(vec![
- KeyValue::new("method", format!("{}", req.method())),
- KeyValue::new("uri", req.uri().path().to_string()),
- ])
- .start(&tracer);
-
- handler(garage, req, client_addr).with_context(Context::current_with_span(span))
+ handler(garage, metrics, req, client_addr)
}))
}
});
@@ -81,13 +101,33 @@ pub async fn run_api_server(
async fn handler(
garage: Arc<Garage>,
+ metrics: Arc<ApiMetrics>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
info!("{} {} {}", addr, req.method(), uri);
debug!("{:?}", req);
- match handler_inner(garage.clone(), req).await {
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let trace_id = gen_uuid();
+ let span = tracer
+ .span_builder("S3 API call (unknown)")
+ .with_trace_id(
+ opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16]))
+ .unwrap(),
+ )
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().path().to_string()),
+ ])
+ .start(&tracer);
+
+ let res = handler_stage2(garage.clone(), metrics, req)
+ .with_context(Context::current_with_span(span))
+ .await;
+
+ match res {
Ok(x) => {
debug!("{} {:?}", x.status(), x.headers());
Ok(x)
@@ -114,7 +154,11 @@ async fn handler(
}
}
-async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
+async fn handler_stage2(
+ garage: Arc<Garage>,
+ metrics: Arc<ApiMetrics>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
let authority = req
.headers()
.get(header::HOST)
@@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.span()
.update_name::<String>(format!("S3 API {}", endpoint.name()));
+ let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
+
+ let res = handler_stage3(garage, req, endpoint, bucket_name)
+ .record_duration(&metrics.request_duration, &metrics_tags[..])
+ .await;
+
+ metrics.request_counter.add(1, &metrics_tags[..]);
+
+ let status_code = match &res {
+ Ok(r) => r.status(),
+ Err(e) => e.http_status_code(),
+ };
+ if status_code.is_client_error() || status_code.is_server_error() {
+ metrics.error_counter.add(
+ 1,
+ &[
+ metrics_tags[0].clone(),
+ KeyValue::new("status_code", status_code.as_str().to_string()),
+ ],
+ );
+ }
+
+ res
+}
+
+async fn handler_stage3(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ bucket_name: Option<String>,
+) -> Result<Response<Body>, Error> {
// Some endpoints are processed early, before we even check for an API key
if let Endpoint::PostObject = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 739dedbe..b11f8417 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -69,8 +69,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
));
info!("Configure and run admin web server...");
- let admin_server =
- tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())));
+ let admin_server = tokio::spawn(
+ admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())),
+ );
// Stuff runs
diff --git a/src/model/block.rs b/src/model/block.rs
index 058c71fd..d97e64a8 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -1,7 +1,7 @@
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use std::time::{Duration};
+use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -21,9 +21,9 @@ use opentelemetry::{
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::metrics::RecordDuration;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
-use garage_util::metrics::RecordDuration;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -409,11 +409,14 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
- let data = self.read_block_internal(hash)
+ let data = self
+ .read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
.await?;
- self.metrics.bytes_read.add(data.inner_buffer().len() as u64);
+ self.metrics
+ .bytes_read
+ .add(data.inner_buffer().len() as u64);
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index f8bef47f..099c6429 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,6 +1,6 @@
//! Contain structs related to making RPCs
use std::sync::Arc;
-use std::time::{Duration};
+use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
@@ -134,7 +134,7 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
- let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())];
+ let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
@@ -147,7 +147,8 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
- let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority)
+ let rpc_call = endpoint
+ .call(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! {
diff --git a/src/table/table.rs b/src/table/table.rs
index 9ba243c0..69cac41a 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -161,7 +161,8 @@ where
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
- let res = self.get_internal(partition_key, sort_key)
+ let res = self
+ .get_internal(partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
.await?;
self.data.metrics.get_request_counter.add(1);
@@ -232,7 +233,8 @@ where
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
- let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit)
+ let res = self
+ .get_range_internal(partition_key, begin_sort_key, filter, limit)
.bound_record_duration(&self.data.metrics.get_request_duration)
.await?;
self.data.metrics.get_request_counter.add(1);
diff --git a/src/util/metrics.rs b/src/util/metrics.rs
index b3b1fc3c..cd5aa182 100644
--- a/src/util/metrics.rs
+++ b/src/util/metrics.rs
@@ -2,26 +2,40 @@ use std::time::SystemTime;
use futures::{future::BoxFuture, Future, FutureExt};
-use opentelemetry::{KeyValue, metrics::*};
+use opentelemetry::{metrics::*, KeyValue};
pub trait RecordDuration<'a>: 'a {
type Output;
- fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>;
+ fn record_duration(
+ self,
+ r: &'a ValueRecorder<f64>,
+ attributes: &'a [KeyValue],
+ ) -> BoxFuture<'a, Self::Output>;
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
}
impl<'a, T, O> RecordDuration<'a> for T
-where T: Future<Output=O> + Send + 'a {
+where
+ T: Future<Output = O> + Send + 'a,
+{
type Output = O;
- fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> {
+ fn record_duration(
+ self,
+ r: &'a ValueRecorder<f64>,
+ attributes: &'a [KeyValue],
+ ) -> BoxFuture<'a, Self::Output> {
async move {
let request_start = SystemTime::now();
let res = self.await;
- r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes);
+ r.record(
+ request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
+ attributes,
+ );
res
- }.boxed()
+ }
+ .boxed()
}
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
@@ -30,6 +44,7 @@ where T: Future<Output=O> + Send + 'a {
let res = self.await;
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
res
- }.boxed()
+ }
+ .boxed()
}
}