aboutsummaryrefslogblamecommitdiff
path: root/src/api/admin/api_server.rs
blob: 9beeda1f43d8803ac8e7c53ee1651617513ad872 (plain) (tree)
1
2
3
4
5
6
7
8
9

                              
                         




                             
                                                                                     
                                                 
 


                                  
                                                 
                           


                                       

                                  











                                                    
                                   





                                      



                                                                         










                                                              
                                                   
                                 




                                      








                                                                         



                                                                                         
                                                       





                                                                                   























































































                                                                                                              
                                                                   
















                                                                                          
                                                       






                                                                                          





















                                                                                  
                                                            






















                                                                                                                           
                                                                 























                                                                                                               
                                                                                                           


































                                                                                                       
use std::collections::HashMap;
use std::fmt::Write;
use std::net::SocketAddr;
use std::sync::Arc;

use async_trait::async_trait;

use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{Body, Request, Response, StatusCode};

use opentelemetry::trace::SpanRef;

#[cfg(feature = "metrics")]
use opentelemetry_prometheus::PrometheusExporter;
#[cfg(feature = "metrics")]
use prometheus::{Encoder, TextEncoder};

use garage_model::garage::Garage;
use garage_rpc::layout::NodeRoleV;
use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;

use crate::generic_server::*;

use crate::admin::bucket::*;
use crate::admin::cluster::*;
use crate::admin::error::*;
use crate::admin::key::*;
use crate::admin::router::{Authorization, Endpoint};

pub struct AdminApiServer {
	garage: Arc<Garage>,
	#[cfg(feature = "metrics")]
	exporter: PrometheusExporter,
	metrics_token: Option<String>,
	admin_token: Option<String>,
}

impl AdminApiServer {
	pub fn new(
		garage: Arc<Garage>,
		#[cfg(feature = "metrics")] exporter: PrometheusExporter,
	) -> Self {
		let cfg = &garage.config.admin;
		let metrics_token = cfg
			.metrics_token
			.as_ref()
			.map(|tok| format!("Bearer {}", tok));
		let admin_token = cfg
			.admin_token
			.as_ref()
			.map(|tok| format!("Bearer {}", tok));
		Self {
			garage,
			#[cfg(feature = "metrics")]
			exporter,
			metrics_token,
			admin_token,
		}
	}

	pub async fn run(
		self,
		bind_addr: SocketAddr,
		shutdown_signal: impl Future<Output = ()>,
	) -> Result<(), GarageError> {
		let region = self.garage.config.s3_api.s3_region.clone();
		ApiServer::new(region, self)
			.run_server(bind_addr, shutdown_signal)
			.await
	}

	fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
		Ok(Response::builder()
			.status(StatusCode::NO_CONTENT)
			.header(ALLOW, "OPTIONS, GET, POST")
			.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
			.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
			.body(Body::empty())?)
	}

	fn handle_health(&self) -> Result<Response<Body>, Error> {
		let ring: Arc<_> = self.garage.system.ring.borrow().clone();
		let quorum = self.garage.replication_mode.write_quorum();
		let replication_factor = self.garage.replication_mode.replication_factor();

		let nodes = self
			.garage
			.system
			.get_known_nodes()
			.into_iter()
			.map(|n| (n.id, n))
			.collect::<HashMap<Uuid, _>>();
		let n_nodes_connected = nodes.iter().filter(|(_, n)| n.is_up).count();

		let storage_nodes = ring
			.layout
			.roles
			.items()
			.iter()
			.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
			.collect::<Vec<_>>();
		let n_storage_nodes_ok = storage_nodes
			.iter()
			.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
			.count();

		let partitions = ring.partitions();
		let partitions_n_up = partitions
			.iter()
			.map(|(_, h)| {
				let pn = ring.get_nodes(h, ring.replication_factor);
				pn.iter()
					.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
					.count()
			})
			.collect::<Vec<usize>>();
		let n_partitions_full_ok = partitions_n_up
			.iter()
			.filter(|c| **c == replication_factor)
			.count();
		let n_partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();

		let (status, status_str) = if n_partitions_quorum == partitions.len()
			&& n_storage_nodes_ok == storage_nodes.len()
		{
			(StatusCode::OK, "Garage is fully operational")
		} else if n_partitions_quorum == partitions.len() {
			(
				StatusCode::OK,
				"Garage is operational but some storage nodes are unavailable",
			)
		} else {
			(
				StatusCode::SERVICE_UNAVAILABLE,
				"Quorum is not available for some/all partitions, reads and writes will fail",
			)
		};

		let mut buf = status_str.to_string();
		writeln!(
			&mut buf,
			"\nAll nodes: {} connected, {} known",
			n_nodes_connected,
			nodes.len()
		)
		.unwrap();
		writeln!(
			&mut buf,
			"Storage nodes: {} connected, {} in layout",
			n_storage_nodes_ok,
			storage_nodes.len()
		)
		.unwrap();
		writeln!(&mut buf, "Number of partitions: {}", partitions.len()).unwrap();
		writeln!(&mut buf, "Partitions with quorum: {}", n_partitions_quorum).unwrap();
		writeln!(
			&mut buf,
			"Partitions with all nodes available: {}",
			n_partitions_full_ok
		)
		.unwrap();

		Ok(Response::builder()
			.status(status)
			.header(http::header::CONTENT_TYPE, "text/plain")
			.body(Body::from(buf))?)
	}

	fn handle_metrics(&self) -> Result<Response<Body>, Error> {
		#[cfg(feature = "metrics")]
		{
			use opentelemetry::trace::Tracer;

			let mut buffer = vec![];
			let encoder = TextEncoder::new();

			let tracer = opentelemetry::global::tracer("garage");
			let metric_families = tracer.in_span("admin/gather_metrics", |_| {
				self.exporter.registry().gather()
			});

			encoder
				.encode(&metric_families, &mut buffer)
				.ok_or_internal_error("Could not serialize metrics")?;

			Ok(Response::builder()
				.status(StatusCode::OK)
				.header(http::header::CONTENT_TYPE, encoder.format_type())
				.body(Body::from(buffer))?)
		}
		#[cfg(not(feature = "metrics"))]
		Err(Error::bad_request(
			"Garage was built without the metrics feature".to_string(),
		))
	}
}

#[async_trait]
impl ApiHandler for AdminApiServer {
	const API_NAME: &'static str = "admin";
	const API_NAME_DISPLAY: &'static str = "Admin";

	type Endpoint = Endpoint;
	type Error = Error;

	fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
		Endpoint::from_request(req)
	}

	async fn handle(
		&self,
		req: Request<Body>,
		endpoint: Endpoint,
	) -> Result<Response<Body>, Error> {
		let expected_auth_header =
			match endpoint.authorization_type() {
				Authorization::None => None,
				Authorization::MetricsToken => self.metrics_token.as_ref(),
				Authorization::AdminToken => match &self.admin_token {
					None => return Err(Error::forbidden(
						"Admin token isn't configured, admin API access is disabled for security.",
					)),
					Some(t) => Some(t),
				},
			};

		if let Some(h) = expected_auth_header {
			match req.headers().get("Authorization") {
				None => return Err(Error::forbidden("Authorization token must be provided")),
				Some(v) => {
					let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false);
					if !authorized {
						return Err(Error::forbidden("Invalid authorization token provided"));
					}
				}
			}
		}

		match endpoint {
			Endpoint::Options => self.handle_options(&req),
			Endpoint::Health => self.handle_health(),
			Endpoint::Metrics => self.handle_metrics(),
			Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
			Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
			// Layout
			Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
			Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
			Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
			Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
			// Keys
			Endpoint::ListKeys => handle_list_keys(&self.garage).await,
			Endpoint::GetKeyInfo { id, search } => {
				handle_get_key_info(&self.garage, id, search).await
			}
			Endpoint::CreateKey => handle_create_key(&self.garage, req).await,
			Endpoint::ImportKey => handle_import_key(&self.garage, req).await,
			Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await,
			Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await,
			// Buckets
			Endpoint::ListBuckets => handle_list_buckets(&self.garage).await,
			Endpoint::GetBucketInfo { id, global_alias } => {
				handle_get_bucket_info(&self.garage, id, global_alias).await
			}
			Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
			Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
			Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await,
			// Bucket-key permissions
			Endpoint::BucketAllowKey => {
				handle_bucket_change_key_perm(&self.garage, req, true).await
			}
			Endpoint::BucketDenyKey => {
				handle_bucket_change_key_perm(&self.garage, req, false).await
			}
			// Bucket aliasing
			Endpoint::GlobalAliasBucket { id, alias } => {
				handle_global_alias_bucket(&self.garage, id, alias).await
			}
			Endpoint::GlobalUnaliasBucket { id, alias } => {
				handle_global_unalias_bucket(&self.garage, id, alias).await
			}
			Endpoint::LocalAliasBucket {
				id,
				access_key_id,
				alias,
			} => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await,
			Endpoint::LocalUnaliasBucket {
				id,
				access_key_id,
				alias,
			} => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await,
		}
	}
}

impl ApiEndpoint for Endpoint {
	fn name(&self) -> &'static str {
		Endpoint::name(self)
	}

	fn add_span_attributes(&self, _span: SpanRef<'_>) {}
}