aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-05 10:29:45 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-10 13:25:06 +0200
commit633958c7b1ce9c83df5159051fd299b484d0d797 (patch)
tree906246beab76ee6981af03fb31ce04421cd6b6ab /src
parent5768bf362262f78376af14517c4921941986192e (diff)
downloadgarage-633958c7b1ce9c83df5159051fd299b484d0d797.tar.gz
garage-633958c7b1ce9c83df5159051fd299b484d0d797.zip
Refactor admin API to be in api/admin and use common code
Diffstat (limited to 'src')
-rw-r--r--src/admin/Cargo.toml29
-rw-r--r--src/admin/lib.rs6
-rw-r--r--src/admin/metrics.rs146
-rw-r--r--src/api/Cargo.toml3
-rw-r--r--src/api/admin/api_server.rs128
-rw-r--r--src/api/admin/mod.rs2
-rw-r--r--src/api/admin/router.rs59
-rw-r--r--src/api/lib.rs1
-rw-r--r--src/garage/Cargo.toml6
-rw-r--r--src/garage/main.rs1
-rw-r--r--src/garage/server.rs35
-rw-r--r--src/garage/tracing_setup.rs (renamed from src/admin/tracing_setup.rs)0
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/util/config.rs4
14 files changed, 221 insertions, 200 deletions
diff --git a/src/admin/Cargo.toml b/src/admin/Cargo.toml
deleted file mode 100644
index 2db4bb08..00000000
--- a/src/admin/Cargo.toml
+++ /dev/null
@@ -1,29 +0,0 @@
-[package]
-name = "garage_admin"
-version = "0.7.0"
-authors = ["Maximilien Richer <code@mricher.fr>"]
-edition = "2018"
-license = "AGPL-3.0"
-description = "Administration and metrics REST HTTP server for Garage"
-repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
-
-[lib]
-path = "lib.rs"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-garage_util = { version = "0.7.0", path = "../util" }
-
-hex = "0.4"
-
-futures = "0.3"
-futures-util = "0.3"
-http = "0.2"
-hyper = "0.14"
-tracing = "0.1.30"
-
-opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
-opentelemetry-prometheus = "0.10"
-opentelemetry-otlp = "0.10"
-prometheus = "0.13"
diff --git a/src/admin/lib.rs b/src/admin/lib.rs
deleted file mode 100644
index b5b0775b..00000000
--- a/src/admin/lib.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-//! Crate for handling the admin and metric HTTP APIs
-#[macro_use]
-extern crate tracing;
-
-pub mod metrics;
-pub mod tracing_setup;
diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs
deleted file mode 100644
index 7edc36c6..00000000
--- a/src/admin/metrics.rs
+++ /dev/null
@@ -1,146 +0,0 @@
-use std::convert::Infallible;
-use std::net::SocketAddr;
-use std::sync::Arc;
-use std::time::SystemTime;
-
-use futures::future::*;
-use hyper::{
- header::CONTENT_TYPE,
- service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server,
-};
-
-use opentelemetry::{
- global,
- metrics::{BoundCounter, BoundValueRecorder},
- trace::{FutureExt, TraceContextExt, Tracer},
- Context,
-};
-use opentelemetry_prometheus::PrometheusExporter;
-
-use prometheus::{Encoder, TextEncoder};
-
-use garage_util::error::Error as GarageError;
-use garage_util::metrics::*;
-
-// serve_req on metric endpoint
-async fn serve_req(
- req: Request<Body>,
- admin_server: Arc<AdminServer>,
-) -> Result<Response<Body>, hyper::Error> {
- debug!("Receiving request at path {}", req.uri());
- let request_start = SystemTime::now();
-
- admin_server.metrics.http_counter.add(1);
-
- let response = match (req.method(), req.uri().path()) {
- (&Method::GET, "/metrics") => {
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
-
- let tracer = opentelemetry::global::tracer("garage");
- let metric_families = tracer.in_span("admin/gather_metrics", |_| {
- admin_server.exporter.registry().gather()
- });
-
- encoder.encode(&metric_families, &mut buffer).unwrap();
- admin_server
- .metrics
- .http_body_gauge
- .record(buffer.len() as u64);
-
- Response::builder()
- .status(200)
- .header(CONTENT_TYPE, encoder.format_type())
- .body(Body::from(buffer))
- .unwrap()
- }
- _ => Response::builder()
- .status(404)
- .body(Body::from("Not implemented"))
- .unwrap(),
- };
-
- admin_server
- .metrics
- .http_req_histogram
- .record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
- Ok(response)
-}
-
-// AdminServer hold the admin server internal admin_server and the metric exporter
-pub struct AdminServer {
- exporter: PrometheusExporter,
- metrics: AdminServerMetrics,
-}
-
-// GarageMetricadmin_server holds the metrics counter definition for Garage
-// FIXME: we would rather have that split up among the different libraries?
-struct AdminServerMetrics {
- http_counter: BoundCounter<u64>,
- http_body_gauge: BoundValueRecorder<u64>,
- http_req_histogram: BoundValueRecorder<f64>,
-}
-
-impl AdminServer {
- /// init initilialize the AdminServer and background metric server
- pub fn init() -> AdminServer {
- let exporter = opentelemetry_prometheus::exporter().init();
- let meter = global::meter("garage/admin_server");
- AdminServer {
- exporter,
- metrics: AdminServerMetrics {
- http_counter: meter
- .u64_counter("admin.http_requests_total")
- .with_description("Total number of HTTP requests made.")
- .init()
- .bind(&[]),
- http_body_gauge: meter
- .u64_value_recorder("admin.http_response_size_bytes")
- .with_description("The metrics HTTP response sizes in bytes.")
- .init()
- .bind(&[]),
- http_req_histogram: meter
- .f64_value_recorder("admin.http_request_duration_seconds")
- .with_description("The HTTP request latencies in seconds.")
- .init()
- .bind(&[]),
- },
- }
- }
- /// run execute the admin server on the designated HTTP port and listen for requests
- pub async fn run(
- self,
- bind_addr: SocketAddr,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), GarageError> {
- let admin_server = Arc::new(self);
- // For every connection, we must make a `Service` to handle all
- // incoming HTTP requests on said connection.
- let make_svc = make_service_fn(move |_conn| {
- let admin_server = admin_server.clone();
- // This is the `Service` that will handle the connection.
- // `service_fn` is a helper to convert a function that
- // returns a Response into a `Service`.
- async move {
- Ok::<_, Infallible>(service_fn(move |req| {
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder("admin/request")
- .with_trace_id(gen_trace_id())
- .start(&tracer);
-
- serve_req(req, admin_server.clone())
- .with_context(Context::current_with_span(span))
- }))
- }
- });
-
- let server = Server::bind(&bind_addr).serve(make_svc);
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("Admin server listening on http://{}", bind_addr);
-
- graceful.await?;
- Ok(())
- }
-}
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 29b26e5e..db77cf38 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -54,6 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"
+opentelemetry-prometheus = "0.10"
+opentelemetry-otlp = "0.10"
+prometheus = "0.13"
[features]
k2v = [ "garage_util/k2v", "garage_model/k2v" ]
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
new file mode 100644
index 00000000..836b5158
--- /dev/null
+++ b/src/api/admin/api_server.rs
@@ -0,0 +1,128 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use http::header::CONTENT_TYPE;
+use hyper::{Body, Request, Response};
+
+use opentelemetry::trace::{SpanRef, Tracer};
+use opentelemetry_prometheus::PrometheusExporter;
+use prometheus::{Encoder, TextEncoder};
+
+use garage_model::garage::Garage;
+use garage_util::error::Error as GarageError;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::admin::router::{Authorization, Endpoint};
+
+pub struct AdminApiServer {
+ garage: Arc<Garage>,
+ exporter: PrometheusExporter,
+ metrics_token: Option<String>,
+ admin_token: Option<String>,
+}
+
+impl AdminApiServer {
+ pub fn new(garage: Arc<Garage>) -> Self {
+ let exporter = opentelemetry_prometheus::exporter().init();
+ let cfg = &garage.config.admin;
+ let metrics_token = cfg
+ .metrics_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ let admin_token = cfg
+ .admin_token
+ .as_ref()
+ .map(|tok| format!("Bearer {}", tok));
+ Self {
+ garage,
+ exporter,
+ metrics_token,
+ admin_token,
+ }
+ }
+
+ pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> {
+ if let Some(bind_addr) = self.garage.config.admin.api_bind_addr {
+ let region = self.garage.config.s3_api.s3_region.clone();
+ ApiServer::new(region, self)
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ } else {
+ Ok(())
+ }
+ }
+
+ fn handle_metrics(&self) -> Result<Response<Body>, Error> {
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ self.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(200)
+ .header(CONTENT_TYPE, encoder.format_type())
+ .body(Body::from(buffer))?)
+ }
+}
+
+#[async_trait]
+impl ApiHandler for AdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = Endpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
+ Endpoint::from_request(req)
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ let expected_auth_header = match endpoint.authorization_type() {
+ Authorization::MetricsToken => self.metrics_token.as_ref(),
+ Authorization::AdminToken => self.admin_token.as_ref(),
+ };
+
+ if let Some(h) = expected_auth_header {
+ match req.headers().get("Authorization") {
+ None => Err(Error::Forbidden(
+ "Authorization token must be provided".into(),
+ )),
+ Some(v) if v.to_str().map(|hv| hv == h).unwrap_or(false) => Ok(()),
+ _ => Err(Error::Forbidden(
+ "Invalid authorization token provided".into(),
+ )),
+ }?;
+ }
+
+ match endpoint {
+ Endpoint::Metrics => self.handle_metrics(),
+ _ => Err(Error::NotImplemented(format!(
+ "Admin endpoint {} not implemented yet",
+ endpoint.name()
+ ))),
+ }
+ }
+}
+
+impl ApiEndpoint for Endpoint {
+ fn name(&self) -> &'static str {
+ Endpoint::name(self)
+ }
+
+ fn add_span_attributes(&self, _span: SpanRef<'_>) {}
+}
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
new file mode 100644
index 00000000..ff2cf4b1
--- /dev/null
+++ b/src/api/admin/mod.rs
@@ -0,0 +1,2 @@
+pub mod api_server;
+mod router;
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
new file mode 100644
index 00000000..d0b30fc1
--- /dev/null
+++ b/src/api/admin/router.rs
@@ -0,0 +1,59 @@
+use crate::error::*;
+
+use hyper::{Method, Request};
+
+use crate::router_macros::router_match;
+
+pub enum Authorization {
+ MetricsToken,
+ AdminToken,
+}
+
+router_match! {@func
+
+/// List of all Admin API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ Metrics,
+ Options,
+ GetClusterStatus,
+ GetClusterLayout,
+ UpdateClusterLayout,
+ ApplyClusterLayout,
+ RevertClusterLayout,
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
+ let path = req.uri().path();
+
+ use Endpoint::*;
+ let res = match (req.method(), path) {
+ (&Method::OPTIONS, _) => Options,
+ (&Method::GET, "/metrics") => Metrics,
+ (&Method::GET, "/status") => GetClusterStatus,
+ (&Method::GET, "/layout") => GetClusterLayout,
+ (&Method::POST, "/layout") => UpdateClusterLayout,
+ (&Method::POST, "/layout/apply") => ApplyClusterLayout,
+ (&Method::POST, "/layout/revert") => RevertClusterLayout,
+ (m, p) => {
+ return Err(Error::BadRequest(format!(
+ "Unknown API endpoint: {} {}",
+ m, p
+ )))
+ }
+ };
+
+ Ok(res)
+ }
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ match self {
+ Self::Metrics => Authorization::MetricsToken,
+ _ => Authorization::AdminToken,
+ }
+ }
+}
diff --git a/src/api/lib.rs b/src/api/lib.rs
index 0078f7b5..5c522799 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -12,6 +12,7 @@ mod router_macros;
/// This mode is public only to help testing. Don't expect stability here
pub mod signature;
+pub mod admin;
#[cfg(feature = "k2v")]
pub mod k2v;
pub mod s3;
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 3b69d7bc..59566358 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -27,7 +27,6 @@ garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
-garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0"
git-version = "0.3.4"
@@ -54,6 +53,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
+opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
+opentelemetry-prometheus = "0.10"
+opentelemetry-otlp = "0.10"
+prometheus = "0.13"
+
[dev-dependencies]
aws-sdk-s3 = "0.8"
chrono = "0.4"
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e898e680..69ab1147 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -8,6 +8,7 @@ mod admin;
mod cli;
mod repair;
mod server;
+mod tracing_setup;
use std::net::SocketAddr;
use std::path::PathBuf;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 24bb25b3..4c0f8653 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,8 +6,7 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_admin::metrics::*;
-use garage_admin::tracing_setup::*;
+use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::run_web_server;
@@ -16,6 +15,7 @@ use garage_web::run_web_server;
use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
+use crate::tracing_setup::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
- info!("Initialize admin web server and metric backend...");
- let admin_server_init = AdminServer::init();
-
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(&export_to, garage.system.id)?;
}
+ info!("Initialize Admin API server and metrics collector...");
+ let admin_server = AdminApiServer::new(garage.clone());
+
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
@@ -80,32 +80,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()),
));
- let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
- info!("Configure and run admin web server...");
- Some(tokio::spawn(
- admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
- ))
- } else {
- None
- };
+ info!("Initializing Admin server...");
+ let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone())));
// Stuff runs
// When a cancel signal is sent, stuff stops
if let Err(e) = s3_api_server.await? {
warn!("S3 API server exited with error: {}", e);
+ } else {
+ info!("S3 API server exited without error.");
}
#[cfg(feature = "k2v")]
if let Err(e) = k2v_api_server.await? {
warn!("K2V API server exited with error: {}", e);
+ } else {
+ info!("K2V API server exited without error.");
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
+ } else {
+ info!("Web server exited without error.");
}
- if let Some(a) = admin_server {
- if let Err(e) = a.await? {
- warn!("Admin web server exited with error: {}", e);
- }
+ if let Err(e) = admin_server.await? {
+ warn!("Admin web server exited with error: {}", e);
+ } else {
+ info!("Admin API server exited without error.");
}
// Remove RPC handlers for system to break reference cycles
@@ -113,6 +113,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Await for netapp RPC system to end
run_system.await?;
+ info!("Netapp exited");
// Drop all references so that stuff can terminate properly
drop(garage);
diff --git a/src/admin/tracing_setup.rs b/src/garage/tracing_setup.rs
index 55fc4094..55fc4094 100644
--- a/src/admin/tracing_setup.rs
+++ b/src/garage/tracing_setup.rs
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index bed7f44a..80a1975c 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
-garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"
diff --git a/src/util/config.rs b/src/util/config.rs
index 4d66bfe4..99ebce31 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -121,6 +121,10 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
+ /// Bearer token to use to scrape metrics
+ pub metrics_token: Option<String>,
+ /// Bearer token to use to access Admin API endpoints
+ pub admin_token: Option<String>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}