diff options
Diffstat (limited to 'src/api/api_server.rs')
-rw-r--r-- | src/api/api_server.rs | 113 |
1 files changed, 94 insertions, 19 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 1bab3aaa..8502d9d8 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server}; use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, trace::{FutureExt, TraceContextExt, Tracer}, Context, KeyValue, }; use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::metrics::RecordDuration; use garage_model::garage::Garage; use garage_model::key_table::Key; @@ -35,6 +38,34 @@ use crate::s3_put::*; use crate::s3_router::{Authorization, Endpoint}; use crate::s3_website::*; +struct ApiMetrics { + request_counter: Counter<u64>, + error_counter: Counter<u64>, + request_duration: ValueRecorder<f64>, +} + +impl ApiMetrics { + fn new() -> Self { + let meter = global::meter("garage/api"); + Self { + request_counter: meter + .u64_counter("api.request_counter") + .with_description("Number of API calls to the various S3 API endpoints") + .init(), + error_counter: meter + .u64_counter("api.error_counter") + .with_description( + "Number of API calls to the various S3 API endpoints that resulted in errors", + ) + .init(), + request_duration: meter + .f64_value_recorder("api.request_duration") + .with_description("Duration of API calls to the various S3 API endpoints") + .init(), + } + } +} + /// Run the S3 API server pub async fn run_api_server( garage: Arc<Garage>, @@ -42,30 +73,19 @@ pub async fn run_api_server( ) -> Result<(), GarageError> { let addr = &garage.config.s3_api.api_bind_addr; + let metrics = Arc::new(ApiMetrics::new()); + let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); + let metrics = metrics.clone(); + let client_addr = conn.remote_addr(); async move { Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { let garage = garage.clone(); + let metrics = metrics.clone(); - let tracer = opentelemetry::global::tracer("garage"); - let trace_id = gen_uuid(); - let span = tracer - .span_builder("S3 API call (unknown)") - .with_trace_id( - opentelemetry::trace::TraceId::from_hex(&hex::encode( - &trace_id.as_slice()[..16], - )) - .unwrap(), - ) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().path().to_string()), - ]) - .start(&tracer); - - handler(garage, req, client_addr).with_context(Context::current_with_span(span)) + handler(garage, metrics, req, client_addr) })) } }); @@ -81,13 +101,33 @@ pub async fn run_api_server( async fn handler( garage: Arc<Garage>, + metrics: Arc<ApiMetrics>, req: Request<Body>, addr: SocketAddr, ) -> Result<Response<Body>, GarageError> { let uri = req.uri().clone(); info!("{} {} {}", addr, req.method(), uri); debug!("{:?}", req); - match handler_inner(garage.clone(), req).await { + + let tracer = opentelemetry::global::tracer("garage"); + let trace_id = gen_uuid(); + let span = tracer + .span_builder("S3 API call (unknown)") + .with_trace_id( + opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16])) + .unwrap(), + ) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().path().to_string()), + ]) + .start(&tracer); + + let res = handler_stage2(garage.clone(), metrics, req) + .with_context(Context::current_with_span(span)) + .await; + + match res { Ok(x) => { debug!("{} {:?}", x.status(), x.headers()); Ok(x) @@ -114,7 +154,11 @@ async fn handler( } } -async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> { +async fn handler_stage2( + garage: Arc<Garage>, + metrics: Arc<ApiMetrics>, + req: Request<Body>, +) -> Result<Response<Body>, Error> { let authority = req .headers() .get(header::HOST) @@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon .span() .update_name::<String>(format!("S3 API {}", endpoint.name())); + let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; + + let res = handler_stage3(garage, req, endpoint, bucket_name) + .record_duration(&metrics.request_duration, &metrics_tags[..]) + .await; + + metrics.request_counter.add(1, &metrics_tags[..]); + + let status_code = match &res { + Ok(r) => r.status(), + Err(e) => e.http_status_code(), + }; + if status_code.is_client_error() || status_code.is_server_error() { + metrics.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", status_code.as_str().to_string()), + ], + ); + } + + res +} + +async fn handler_stage3( + garage: Arc<Garage>, + req: Request<Body>, + endpoint: Endpoint, + bucket_name: Option<String>, +) -> Result<Response<Body>, Error> { // Some endpoints are processed early, before we even check for an API key if let Endpoint::PostObject = endpoint { return handle_post_object(garage, req, bucket_name.unwrap()).await; |