diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-01-31 18:18:04 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-01-31 18:18:29 +0100 |
commit | 9fa20d45bebab2a3f66b9721c3643dbd607d944d (patch) | |
tree | 4c5cc3dee19f7cbd9e146a90ef5cbddb052716d5 /src/api/generic_server.rs | |
parent | 9330fd79d3466051394f6d419a247d46da8f5151 (diff) | |
download | garage-9fa20d45bebab2a3f66b9721c3643dbd607d944d.tar.gz garage-9fa20d45bebab2a3f66b9721c3643dbd607d944d.zip |
wip: split garage_api into garage_api_{common,s3,k2v,admin}
Diffstat (limited to 'src/api/generic_server.rs')
-rw-r--r-- | src/api/generic_server.rs | 378 |
1 files changed, 0 insertions, 378 deletions
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs deleted file mode 100644 index 283abdd4..00000000 --- a/src/api/generic_server.rs +++ /dev/null @@ -1,378 +0,0 @@ -use std::convert::Infallible; -use std::fs::{self, Permissions}; -use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; - -use futures::future::Future; -use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; - -use http_body_util::BodyExt; -use hyper::header::HeaderValue; -use hyper::server::conn::http1; -use hyper::service::service_fn; -use hyper::{body::Incoming as IncomingBody, Request, Response}; -use hyper::{HeaderMap, StatusCode}; -use hyper_util::rt::TokioIo; - -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; -use tokio::sync::watch; -use tokio::time::{sleep_until, Instant}; - -use opentelemetry::{ - global, - metrics::{Counter, ValueRecorder}, - trace::{FutureExt, SpanRef, TraceContextExt, Tracer}, - Context, KeyValue, -}; - -use garage_util::error::Error as GarageError; -use garage_util::forwarded_headers; -use garage_util::metrics::{gen_trace_id, RecordDuration}; -use garage_util::socket_address::UnixOrTCPSocketAddress; - -use crate::helpers::{BoxBody, ErrorBody}; - -pub(crate) trait ApiEndpoint: Send + Sync + 'static { - fn name(&self) -> &'static str; - fn add_span_attributes(&self, span: SpanRef<'_>); -} - -pub trait ApiError: std::error::Error + Send + Sync + 'static { - fn http_status_code(&self) -> StatusCode; - fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); - fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; -} - -#[async_trait] -pub(crate) trait ApiHandler: Send + Sync + 'static { - const API_NAME: &'static str; - const API_NAME_DISPLAY: &'static str; - - type Endpoint: ApiEndpoint; - type Error: ApiError; - - fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; - async fn handle( - &self, - req: Request<IncomingBody>, - endpoint: Self::Endpoint, - ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; -} - -pub(crate) struct ApiServer<A: ApiHandler> { - region: String, - api_handler: A, - - // Metrics - request_counter: Counter<u64>, - error_counter: Counter<u64>, - request_duration: ValueRecorder<f64>, -} - -impl<A: ApiHandler> ApiServer<A> { - pub fn new(region: String, api_handler: A) -> Arc<Self> { - let meter = global::meter("garage/api"); - Arc::new(Self { - region, - api_handler, - request_counter: meter - .u64_counter(format!("api.{}.request_counter", A::API_NAME)) - .with_description(format!( - "Number of API calls to the various {} API endpoints", - A::API_NAME_DISPLAY - )) - .init(), - error_counter: meter - .u64_counter(format!("api.{}.error_counter", A::API_NAME)) - .with_description(format!( - "Number of API calls to the various {} API endpoints that resulted in errors", - A::API_NAME_DISPLAY - )) - .init(), - request_duration: meter - .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME)) - .with_description(format!( - "Duration of API calls to the various {} API endpoints", - A::API_NAME_DISPLAY - )) - .init(), - }) - } - - pub async fn run_server( - self: Arc<Self>, - bind_addr: UnixOrTCPSocketAddress, - unix_bind_addr_mode: Option<u32>, - must_exit: watch::Receiver<bool>, - ) -> Result<(), GarageError> { - let server_name = format!("{} API", A::API_NAME_DISPLAY); - info!("{} server listening on {}", server_name, bind_addr); - - match bind_addr { - UnixOrTCPSocketAddress::TCPSocket(addr) => { - let listener = TcpListener::bind(addr).await?; - - let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(server_name, listener, handler, must_exit).await - } - UnixOrTCPSocketAddress::UnixSocket(ref path) => { - if path.exists() { - fs::remove_file(path)? - } - - let listener = UnixListener::bind(path)?; - let listener = UnixListenerOn(listener, path.display().to_string()); - - fs::set_permissions( - path, - Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), - )?; - - let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(server_name, listener, handler, must_exit).await - } - } - } - - async fn handler( - self: Arc<Self>, - req: Request<IncomingBody>, - addr: String, - ) -> Result<Response<BoxBody<A::Error>>, http::Error> { - let uri = req.uri().clone(); - - if let Ok(forwarded_for_ip_addr) = - forwarded_headers::handle_forwarded_for_headers(req.headers()) - { - info!( - "{} (via {}) {} {}", - forwarded_for_ip_addr, - addr, - req.method(), - uri - ); - } else { - info!("{} {} {}", addr, req.method(), uri); - } - debug!("{:?}", req); - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY)) - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let res = self - .handler_stage2(req) - .with_context(Context::current_with_span(span)) - .await; - - match res { - Ok(x) => { - debug!("{} {:?}", x.status(), x.headers()); - Ok(x) - } - Err(e) => { - let body = e.http_body(&self.region, uri.path()); - let mut http_error_builder = Response::builder().status(e.http_status_code()); - - if let Some(header_map) = http_error_builder.headers_mut() { - e.add_http_headers(header_map) - } - - let http_error = http_error_builder.body(body)?; - - if e.http_status_code().is_server_error() { - warn!("Response: error {}, {}", e.http_status_code(), e); - } else { - info!("Response: error {}, {}", e.http_status_code(), e); - } - Ok(http_error - .map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!())))) - } - } - } - - async fn handler_stage2( - &self, - req: Request<IncomingBody>, - ) -> Result<Response<BoxBody<A::Error>>, A::Error> { - let endpoint = self.api_handler.parse_endpoint(&req)?; - debug!("Endpoint: {}", endpoint.name()); - - let current_context = Context::current(); - let current_span = current_context.span(); - current_span.update_name::<String>(format!( - "{} API {}", - A::API_NAME_DISPLAY, - endpoint.name() - )); - current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); - endpoint.add_span_attributes(current_span); - - let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; - - let res = self - .api_handler - .handle(req, endpoint) - .record_duration(&self.request_duration, &metrics_tags[..]) - .await; - - self.request_counter.add(1, &metrics_tags[..]); - - let status_code = match &res { - Ok(r) => r.status(), - Err(e) => e.http_status_code(), - }; - if status_code.is_client_error() || status_code.is_server_error() { - self.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", status_code.as_str().to_string()), - ], - ); - } - - res - } -} - -// ==== helper functions ==== - -#[async_trait] -pub trait Accept: Send + Sync + 'static { - type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; -} - -#[async_trait] -impl Accept for TcpListener { - type Stream = TcpStream; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { - self.accept() - .await - .map(|(stream, addr)| (stream, addr.to_string())) - } -} - -pub struct UnixListenerOn(pub UnixListener, pub String); - -#[async_trait] -impl Accept for UnixListenerOn { - type Stream = UnixStream; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { - self.0 - .accept() - .await - .map(|(stream, _addr)| (stream, self.1.clone())) - } -} - -pub async fn server_loop<A, H, F, E>( - server_name: String, - listener: A, - handler: H, - mut must_exit: watch::Receiver<bool>, -) -> Result<(), GarageError> -where - A: Accept, - H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static, - F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static, - E: Send + Sync + std::error::Error + 'static, -{ - let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); - let connection_collector = tokio::spawn({ - let server_name = server_name.clone(); - async move { - let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new(); - loop { - let collect_next = async { - if connections.is_empty() { - futures::future::pending().await - } else { - connections.next().await - } - }; - tokio::select! { - result = collect_next => { - trace!("{} server: HTTP connection finished: {:?}", server_name, result); - } - new_fut = conn_out.recv() => { - match new_fut { - Some(f) => connections.push(f), - None => break, - } - } - } - } - let deadline = Instant::now() + Duration::from_secs(10); - while !connections.is_empty() { - info!( - "{} server: {} connections still open, deadline in {:.2}s", - server_name, - connections.len(), - (deadline - Instant::now()).as_secs_f32(), - ); - tokio::select! { - conn_res = connections.next() => { - trace!( - "{} server: HTTP connection finished: {:?}", - server_name, - conn_res.unwrap(), - ); - } - _ = sleep_until(deadline) => { - warn!("{} server: exit deadline reached with {} connections still open, killing them now", - server_name, - connections.len()); - for conn in connections.iter() { - conn.abort(); - } - for conn in connections { - assert!(conn.await.unwrap_err().is_cancelled()); - } - break; - } - } - } - } - }); - - while !*must_exit.borrow() { - let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, - _ = must_exit.changed() => continue, - }; - - let io = TokioIo::new(stream); - - let handler = handler.clone(); - let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone()); - - let fut = tokio::task::spawn(async move { - let io = Box::pin(io); - if let Err(e) = http1::Builder::new() - .serve_connection(io, service_fn(serve)) - .await - { - debug!("Error handling HTTP connection: {}", e); - } - }); - conn_in.send(fut)?; - } - - info!("{} server exiting", server_name); - drop(conn_in); - connection_collector.await?; - - Ok(()) -} |