diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-07 14:33:07 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-07 14:34:40 +0100 |
commit | fe48d60d2b63a9914297558bd5dbccae8d96c947 (patch) | |
tree | 6666ae0d756a21b0a6b055da42d5bf473dbedd95 /src/web/web_server.rs | |
parent | 22332e6c3536159656e773a85d656352882ffc32 (diff) | |
download | garage-fe48d60d2b63a9914297558bd5dbccae8d96c947.tar.gz garage-fe48d60d2b63a9914297558bd5dbccae8d96c947.zip |
[dep-upgrade-202402] refactor http listener code
Diffstat (limited to 'src/web/web_server.rs')
-rw-r--r-- | src/web/web_server.rs | 75 |
1 files changed, 19 insertions, 56 deletions
diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 668a897a..766e3829 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -4,16 +4,12 @@ use std::{convert::Infallible, sync::Arc}; use futures::future::Future; -use hyper::server::conn::http1; use hyper::{ body::Incoming as IncomingBody, header::{HeaderValue, HOST}, - service::service_fn, Method, Request, Response, StatusCode, }; -use hyper_util::rt::TokioIo; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ @@ -25,6 +21,7 @@ use opentelemetry::{ use crate::error::*; +use garage_api::generic_server::{server_loop, UnixListenerOn}; use garage_api::helpers::*; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::error::{ @@ -75,35 +72,29 @@ pub struct WebServer { impl WebServer { /// Run a web server - pub async fn run( - garage: Arc<Garage>, - bind_addr: UnixOrTCPSocketAddress, - root_domain: String, - shutdown_signal: impl Future<Output = ()>, - ) -> Result<(), GarageError> { + pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> { let metrics = Arc::new(WebMetrics::new()); - let web_server = Arc::new(WebServer { + Arc::new(WebServer { garage, metrics, root_domain, - }); + }) + } + pub async fn run( + self: Arc<Self>, + bind_addr: UnixOrTCPSocketAddress, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { info!("Web server listening on {}", bind_addr); - tokio::pin!(shutdown_signal); - match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; - loop { - let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - web_server.launch_handler(stream, client_addr.to_string()); - } + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(listener, handler, shutdown_signal).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -111,50 +102,22 @@ impl WebServer { } let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions(path, Permissions::from_mode(0o222))?; - loop { - let (stream, _) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - web_server.launch_handler(stream, path.display().to_string()); - } + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(listener, handler, shutdown_signal).await } - }; - - Ok(()) - } - - fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String) - where - S: AsyncRead + AsyncWrite + Send + Sync + 'static, - { - let this = self.clone(); - let io = TokioIo::new(stream); - - let serve = move |req: Request<IncomingBody>| { - this.clone().handle_request(req, client_addr.to_string()) - }; - - tokio::task::spawn(async move { - let io = Box::pin(io); - if let Err(e) = http1::Builder::new() - .serve_connection(io, service_fn(serve)) - .await - { - debug!("Error handling HTTP connection: {}", e); - } - }); + } } async fn handle_request( self: Arc<Self>, req: Request<IncomingBody>, addr: String, - ) -> Result<Response<BoxBody<Error>>, Infallible> { + ) -> Result<Response<BoxBody<Error>>, http::Error> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { |