diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-05 10:29:45 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-10 13:25:06 +0200 |
commit | 633958c7b1ce9c83df5159051fd299b484d0d797 (patch) | |
tree | 906246beab76ee6981af03fb31ce04421cd6b6ab /src | |
parent | 5768bf362262f78376af14517c4921941986192e (diff) | |
download | garage-633958c7b1ce9c83df5159051fd299b484d0d797.tar.gz garage-633958c7b1ce9c83df5159051fd299b484d0d797.zip |
Refactor admin API to be in api/admin and use common code
Diffstat (limited to 'src')
-rw-r--r-- | src/admin/Cargo.toml | 29 | ||||
-rw-r--r-- | src/admin/lib.rs | 6 | ||||
-rw-r--r-- | src/admin/metrics.rs | 146 | ||||
-rw-r--r-- | src/api/Cargo.toml | 3 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 128 | ||||
-rw-r--r-- | src/api/admin/mod.rs | 2 | ||||
-rw-r--r-- | src/api/admin/router.rs | 59 | ||||
-rw-r--r-- | src/api/lib.rs | 1 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 6 | ||||
-rw-r--r-- | src/garage/main.rs | 1 | ||||
-rw-r--r-- | src/garage/server.rs | 35 | ||||
-rw-r--r-- | src/garage/tracing_setup.rs (renamed from src/admin/tracing_setup.rs) | 0 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/util/config.rs | 4 |
14 files changed, 221 insertions, 200 deletions
diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml deleted file mode 100644 index 2db4bb08..00000000 --- a/src/admin/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "garage_admin" -version = "0.7.0" -authors = ["Maximilien Richer <code@mricher.fr>"] -edition = "2018" -license = "AGPL-3.0" -description = "Administration and metrics REST HTTP server for Garage" -repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" - -[lib] -path = "lib.rs" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -garage_util = { version = "0.7.0", path = "../util" } - -hex = "0.4" - -futures = "0.3" -futures-util = "0.3" -http = "0.2" -hyper = "0.14" -tracing = "0.1.30" - -opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } -opentelemetry-prometheus = "0.10" -opentelemetry-otlp = "0.10" -prometheus = "0.13" diff --git a/src/admin/lib.rs b/src/admin/lib.rs deleted file mode 100644 index b5b0775b..00000000 --- a/src/admin/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Crate for handling the admin and metric HTTP APIs -#[macro_use] -extern crate tracing; - -pub mod metrics; -pub mod tracing_setup; diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs deleted file mode 100644 index 7edc36c6..00000000 --- a/src/admin/metrics.rs +++ /dev/null @@ -1,146 +0,0 @@ -use std::convert::Infallible; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::SystemTime; - -use futures::future::*; -use hyper::{ - header::CONTENT_TYPE, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, -}; - -use opentelemetry::{ - global, - metrics::{BoundCounter, BoundValueRecorder}, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, -}; -use opentelemetry_prometheus::PrometheusExporter; - -use prometheus::{Encoder, TextEncoder}; - -use garage_util::error::Error as GarageError; -use garage_util::metrics::*; - -// serve_req on metric endpoint -async fn serve_req( - req: Request<Body>, - admin_server: Arc<AdminServer>, -) -> Result<Response<Body>, hyper::Error> { - debug!("Receiving request at path {}", req.uri()); - let request_start = SystemTime::now(); - - admin_server.metrics.http_counter.add(1); - - let response = match (req.method(), req.uri().path()) { - (&Method::GET, "/metrics") => { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - admin_server.exporter.registry().gather() - }); - - encoder.encode(&metric_families, &mut buffer).unwrap(); - admin_server - .metrics - .http_body_gauge - .record(buffer.len() as u64); - - Response::builder() - .status(200) - .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap() - } - _ => Response::builder() - .status(404) - .body(Body::from("Not implemented")) - .unwrap(), - }; - - admin_server - .metrics - .http_req_histogram - .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); - Ok(response) -} - -// AdminServer hold the admin server internal admin_server and the metric exporter -pub struct AdminServer { - exporter: PrometheusExporter, - metrics: AdminServerMetrics, -} - -// GarageMetricadmin_server holds the metrics counter definition for Garage -// FIXME: we would rather have that split up among the different libraries? -struct AdminServerMetrics { - http_counter: BoundCounter<u64>, - http_body_gauge: BoundValueRecorder<u64>, - http_req_histogram: BoundValueRecorder<f64>, -} - -impl AdminServer { - /// init initilialize the AdminServer and background metric server - pub fn init() -> AdminServer { - let exporter = opentelemetry_prometheus::exporter().init(); - let meter = global::meter("garage/admin_server"); - AdminServer { - exporter, - metrics: AdminServerMetrics { - http_counter: meter - .u64_counter("admin.http_requests_total") - .with_description("Total number of HTTP requests made.") - .init() - .bind(&[]), - http_body_gauge: meter - .u64_value_recorder("admin.http_response_size_bytes") - .with_description("The metrics HTTP response sizes in bytes.") - .init() - .bind(&[]), - http_req_histogram: meter - .f64_value_recorder("admin.http_request_duration_seconds") - .with_description("The HTTP request latencies in seconds.") - .init() - .bind(&[]), - }, - } - } - /// run execute the admin server on the designated HTTP port and listen for requests - pub async fn run( - self, - bind_addr: SocketAddr, - shutdown_signal: impl Future<Output = ()>, - ) -> Result<(), GarageError> { - let admin_server = Arc::new(self); - // For every connection, we must make a `Service` to handle all - // incoming HTTP requests on said connection. - let make_svc = make_service_fn(move |_conn| { - let admin_server = admin_server.clone(); - // This is the `Service` that will handle the connection. - // `service_fn` is a helper to convert a function that - // returns a Response into a `Service`. - async move { - Ok::<_, Infallible>(service_fn(move |req| { - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder("admin/request") - .with_trace_id(gen_trace_id()) - .start(&tracer); - - serve_req(req, admin_server.clone()) - .with_context(Context::current_with_span(span)) - })) - } - }); - - let server = Server::bind(&bind_addr).serve(make_svc); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Admin server listening on http://{}", bind_addr); - - graceful.await?; - Ok(()) - } -} diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 29b26e5e..db77cf38 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -54,6 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] } url = "2.1" opentelemetry = "0.17" +opentelemetry-prometheus = "0.10" +opentelemetry-otlp = "0.10" +prometheus = "0.13" [features] k2v = [ "garage_util/k2v", "garage_model/k2v" ] diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs new file mode 100644 index 00000000..836b5158 --- /dev/null +++ b/src/api/admin/api_server.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use http::header::CONTENT_TYPE; +use hyper::{Body, Request, Response}; + +use opentelemetry::trace::{SpanRef, Tracer}; +use opentelemetry_prometheus::PrometheusExporter; +use prometheus::{Encoder, TextEncoder}; + +use garage_model::garage::Garage; +use garage_util::error::Error as GarageError; + +use crate::error::*; +use crate::generic_server::*; + +use crate::admin::router::{Authorization, Endpoint}; + +pub struct AdminApiServer { + garage: Arc<Garage>, + exporter: PrometheusExporter, + metrics_token: Option<String>, + admin_token: Option<String>, +} + +impl AdminApiServer { + pub fn new(garage: Arc<Garage>) -> Self { + let exporter = opentelemetry_prometheus::exporter().init(); + let cfg = &garage.config.admin; + let metrics_token = cfg + .metrics_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + let admin_token = cfg + .admin_token + .as_ref() + .map(|tok| format!("Bearer {}", tok)); + Self { + garage, + exporter, + metrics_token, + admin_token, + } + } + + pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> { + if let Some(bind_addr) = self.garage.config.admin.api_bind_addr { + let region = self.garage.config.s3_api.s3_region.clone(); + ApiServer::new(region, self) + .run_server(bind_addr, shutdown_signal) + .await + } else { + Ok(()) + } + } + + fn handle_metrics(&self) -> Result<Response<Body>, Error> { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + self.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer))?) + } +} + +#[async_trait] +impl ApiHandler for AdminApiServer { + const API_NAME: &'static str = "admin"; + const API_NAME_DISPLAY: &'static str = "Admin"; + + type Endpoint = Endpoint; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> { + Endpoint::from_request(req) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: Endpoint, + ) -> Result<Response<Body>, Error> { + let expected_auth_header = match endpoint.authorization_type() { + Authorization::MetricsToken => self.metrics_token.as_ref(), + Authorization::AdminToken => self.admin_token.as_ref(), + }; + + if let Some(h) = expected_auth_header { + match req.headers().get("Authorization") { + None => Err(Error::Forbidden( + "Authorization token must be provided".into(), + )), + Some(v) if v.to_str().map(|hv| hv == h).unwrap_or(false) => Ok(()), + _ => Err(Error::Forbidden( + "Invalid authorization token provided".into(), + )), + }?; + } + + match endpoint { + Endpoint::Metrics => self.handle_metrics(), + _ => Err(Error::NotImplemented(format!( + "Admin endpoint {} not implemented yet", + endpoint.name() + ))), + } + } +} + +impl ApiEndpoint for Endpoint { + fn name(&self) -> &'static str { + Endpoint::name(self) + } + + fn add_span_attributes(&self, _span: SpanRef<'_>) {} +} diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs new file mode 100644 index 00000000..ff2cf4b1 --- /dev/null +++ b/src/api/admin/mod.rs @@ -0,0 +1,2 @@ +pub mod api_server; +mod router; diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs new file mode 100644 index 00000000..d0b30fc1 --- /dev/null +++ b/src/api/admin/router.rs @@ -0,0 +1,59 @@ +use crate::error::*; + +use hyper::{Method, Request}; + +use crate::router_macros::router_match; + +pub enum Authorization { + MetricsToken, + AdminToken, +} + +router_match! {@func + +/// List of all Admin API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + Metrics, + Options, + GetClusterStatus, + GetClusterLayout, + UpdateClusterLayout, + ApplyClusterLayout, + RevertClusterLayout, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> { + let path = req.uri().path(); + + use Endpoint::*; + let res = match (req.method(), path) { + (&Method::OPTIONS, _) => Options, + (&Method::GET, "/metrics") => Metrics, + (&Method::GET, "/status") => GetClusterStatus, + (&Method::GET, "/layout") => GetClusterLayout, + (&Method::POST, "/layout") => UpdateClusterLayout, + (&Method::POST, "/layout/apply") => ApplyClusterLayout, + (&Method::POST, "/layout/revert") => RevertClusterLayout, + (m, p) => { + return Err(Error::BadRequest(format!( + "Unknown API endpoint: {} {}", + m, p + ))) + } + }; + + Ok(res) + } + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + match self { + Self::Metrics => Authorization::MetricsToken, + _ => Authorization::AdminToken, + } + } +} diff --git a/src/api/lib.rs b/src/api/lib.rs index 0078f7b5..5c522799 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -12,6 +12,7 @@ mod router_macros; /// This mode is public only to help testing. Don't expect stability here pub mod signature; +pub mod admin; #[cfg(feature = "k2v")] pub mod k2v; pub mod s3; diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 3b69d7bc..59566358 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -27,7 +27,6 @@ garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } -garage_admin = { version = "0.7.0", path = "../admin" } bytes = "1.0" git-version = "0.3.4" @@ -54,6 +53,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi #netapp = { version = "0.4", path = "../../../netapp" } netapp = "0.4" +opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } +opentelemetry-prometheus = "0.10" +opentelemetry-otlp = "0.10" +prometheus = "0.13" + [dev-dependencies] aws-sdk-s3 = "0.8" chrono = "0.4" diff --git a/src/garage/main.rs b/src/garage/main.rs index e898e680..69ab1147 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -8,6 +8,7 @@ mod admin; mod cli; mod repair; mod server; +mod tracing_setup; use std::net::SocketAddr; use std::path::PathBuf; diff --git a/src/garage/server.rs b/src/garage/server.rs index 24bb25b3..4c0f8653 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,8 +6,7 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_admin::metrics::*; -use garage_admin::tracing_setup::*; +use garage_api::admin::api_server::AdminApiServer; use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::run_web_server; @@ -16,6 +15,7 @@ use garage_web::run_web_server; use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; +use crate::tracing_setup::*; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .open() .expect("Unable to open sled DB"); - info!("Initialize admin web server and metric backend..."); - let admin_server_init = AdminServer::init(); - info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); @@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { init_tracing(&export_to, garage.system.id)?; } + info!("Initialize Admin API server and metrics collector..."); + let admin_server = AdminApiServer::new(garage.clone()); + let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); @@ -80,32 +80,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { wait_from(watch_cancel.clone()), )); - let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr { - info!("Configure and run admin web server..."); - Some(tokio::spawn( - admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())), - )) - } else { - None - }; + info!("Initializing Admin server..."); + let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); // Stuff runs // When a cancel signal is sent, stuff stops if let Err(e) = s3_api_server.await? { warn!("S3 API server exited with error: {}", e); + } else { + info!("S3 API server exited without error."); } #[cfg(feature = "k2v")] if let Err(e) = k2v_api_server.await? { warn!("K2V API server exited with error: {}", e); + } else { + info!("K2V API server exited without error."); } if let Err(e) = web_server.await? { warn!("Web server exited with error: {}", e); + } else { + info!("Web server exited without error."); } - if let Some(a) = admin_server { - if let Err(e) = a.await? { - warn!("Admin web server exited with error: {}", e); - } + if let Err(e) = admin_server.await? { + warn!("Admin web server exited with error: {}", e); + } else { + info!("Admin API server exited without error."); } // Remove RPC handlers for system to break reference cycles @@ -113,6 +113,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Await for netapp RPC system to end run_system.await?; + info!("Netapp exited"); // Drop all references so that stuff can terminate properly drop(garage); diff --git a/src/admin/tracing_setup.rs b/src/garage/tracing_setup.rs index 55fc4094..55fc4094 100644 --- a/src/admin/tracing_setup.rs +++ b/src/garage/tracing_setup.rs diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index bed7f44a..80a1975c 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,7 +15,6 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.7.0", path = "../util" } -garage_admin = { version = "0.7.0", path = "../admin" } arc-swap = "1.0" bytes = "1.0" diff --git a/src/util/config.rs b/src/util/config.rs index 4d66bfe4..99ebce31 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -121,6 +121,10 @@ pub struct WebConfig { pub struct AdminConfig { /// Address and port to bind for admin API serving pub api_bind_addr: Option<SocketAddr>, + /// Bearer token to use to scrape metrics + pub metrics_token: Option<String>, + /// Bearer token to use to access Admin API endpoints + pub admin_token: Option<String>, /// OTLP server to where to export traces pub trace_sink: Option<String>, } |