aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/api_server.rs113
1 files changed, 94 insertions, 19 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 1bab3aaa..8502d9d8 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
+ global,
+ metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::metrics::RecordDuration;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
@@ -35,6 +38,34 @@ use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*;
+struct ApiMetrics {
+ request_counter: Counter<u64>,
+ error_counter: Counter<u64>,
+ request_duration: ValueRecorder<f64>,
+}
+
+impl ApiMetrics {
+ fn new() -> Self {
+ let meter = global::meter("garage/api");
+ Self {
+ request_counter: meter
+ .u64_counter("api.request_counter")
+ .with_description("Number of API calls to the various S3 API endpoints")
+ .init(),
+ error_counter: meter
+ .u64_counter("api.error_counter")
+ .with_description(
+ "Number of API calls to the various S3 API endpoints that resulted in errors",
+ )
+ .init(),
+ request_duration: meter
+ .f64_value_recorder("api.request_duration")
+ .with_description("Duration of API calls to the various S3 API endpoints")
+ .init(),
+ }
+ }
+}
+
/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
@@ -42,30 +73,19 @@ pub async fn run_api_server(
) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr;
+ let metrics = Arc::new(ApiMetrics::new());
+
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
+ let metrics = metrics.clone();
+
let client_addr = conn.remote_addr();
async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone();
+ let metrics = metrics.clone();
- let tracer = opentelemetry::global::tracer("garage");
- let trace_id = gen_uuid();
- let span = tracer
- .span_builder("S3 API call (unknown)")
- .with_trace_id(
- opentelemetry::trace::TraceId::from_hex(&hex::encode(
- &trace_id.as_slice()[..16],
- ))
- .unwrap(),
- )
- .with_attributes(vec![
- KeyValue::new("method", format!("{}", req.method())),
- KeyValue::new("uri", req.uri().path().to_string()),
- ])
- .start(&tracer);
-
- handler(garage, req, client_addr).with_context(Context::current_with_span(span))
+ handler(garage, metrics, req, client_addr)
}))
}
});
@@ -81,13 +101,33 @@ pub async fn run_api_server(
async fn handler(
garage: Arc<Garage>,
+ metrics: Arc<ApiMetrics>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
info!("{} {} {}", addr, req.method(), uri);
debug!("{:?}", req);
- match handler_inner(garage.clone(), req).await {
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let trace_id = gen_uuid();
+ let span = tracer
+ .span_builder("S3 API call (unknown)")
+ .with_trace_id(
+ opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16]))
+ .unwrap(),
+ )
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().path().to_string()),
+ ])
+ .start(&tracer);
+
+ let res = handler_stage2(garage.clone(), metrics, req)
+ .with_context(Context::current_with_span(span))
+ .await;
+
+ match res {
Ok(x) => {
debug!("{} {:?}", x.status(), x.headers());
Ok(x)
@@ -114,7 +154,11 @@ async fn handler(
}
}
-async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
+async fn handler_stage2(
+ garage: Arc<Garage>,
+ metrics: Arc<ApiMetrics>,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
let authority = req
.headers()
.get(header::HOST)
@@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.span()
.update_name::<String>(format!("S3 API {}", endpoint.name()));
+ let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
+
+ let res = handler_stage3(garage, req, endpoint, bucket_name)
+ .record_duration(&metrics.request_duration, &metrics_tags[..])
+ .await;
+
+ metrics.request_counter.add(1, &metrics_tags[..]);
+
+ let status_code = match &res {
+ Ok(r) => r.status(),
+ Err(e) => e.http_status_code(),
+ };
+ if status_code.is_client_error() || status_code.is_server_error() {
+ metrics.error_counter.add(
+ 1,
+ &[
+ metrics_tags[0].clone(),
+ KeyValue::new("status_code", status_code.as_str().to_string()),
+ ],
+ );
+ }
+
+ res
+}
+
+async fn handler_stage3(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ bucket_name: Option<String>,
+) -> Result<Response<Body>, Error> {
// Some endpoints are processed early, before we even check for an API key
if let Endpoint::PostObject = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await;