aboutsummaryrefslogtreecommitdiff
path: root/src/web/web_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/web/web_server.rs')
-rw-r--r--src/web/web_server.rs75
1 files changed, 19 insertions, 56 deletions
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 668a897a..766e3829 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -4,16 +4,12 @@ use std::{convert::Infallible, sync::Arc};
use futures::future::Future;
-use hyper::server::conn::http1;
use hyper::{
body::Incoming as IncomingBody,
header::{HeaderValue, HOST},
- service::service_fn,
Method, Request, Response, StatusCode,
};
-use hyper_util::rt::TokioIo;
-use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, UnixListener};
use opentelemetry::{
@@ -25,6 +21,7 @@ use opentelemetry::{
use crate::error::*;
+use garage_api::generic_server::{server_loop, UnixListenerOn};
use garage_api::helpers::*;
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3::error::{
@@ -75,35 +72,29 @@ pub struct WebServer {
impl WebServer {
/// Run a web server
- pub async fn run(
- garage: Arc<Garage>,
- bind_addr: UnixOrTCPSocketAddress,
- root_domain: String,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), GarageError> {
+ pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
let metrics = Arc::new(WebMetrics::new());
- let web_server = Arc::new(WebServer {
+ Arc::new(WebServer {
garage,
metrics,
root_domain,
- });
+ })
+ }
+ pub async fn run(
+ self: Arc<Self>,
+ bind_addr: UnixOrTCPSocketAddress,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
info!("Web server listening on {}", bind_addr);
- tokio::pin!(shutdown_signal);
-
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
let listener = TcpListener::bind(addr).await?;
- loop {
- let (stream, client_addr) = tokio::select! {
- acc = listener.accept() => acc?,
- _ = &mut shutdown_signal => break,
- };
-
- web_server.launch_handler(stream, client_addr.to_string());
- }
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(listener, handler, shutdown_signal).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
@@ -111,50 +102,22 @@ impl WebServer {
}
let listener = UnixListener::bind(path)?;
+ let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(path, Permissions::from_mode(0o222))?;
- loop {
- let (stream, _) = tokio::select! {
- acc = listener.accept() => acc?,
- _ = &mut shutdown_signal => break,
- };
-
- web_server.launch_handler(stream, path.display().to_string());
- }
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(listener, handler, shutdown_signal).await
}
- };
-
- Ok(())
- }
-
- fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String)
- where
- S: AsyncRead + AsyncWrite + Send + Sync + 'static,
- {
- let this = self.clone();
- let io = TokioIo::new(stream);
-
- let serve = move |req: Request<IncomingBody>| {
- this.clone().handle_request(req, client_addr.to_string())
- };
-
- tokio::task::spawn(async move {
- let io = Box::pin(io);
- if let Err(e) = http1::Builder::new()
- .serve_connection(io, service_fn(serve))
- .await
- {
- debug!("Error handling HTTP connection: {}", e);
- }
- });
+ }
}
async fn handle_request(
self: Arc<Self>,
req: Request<IncomingBody>,
addr: String,
- ) -> Result<Response<BoxBody<Error>>, Infallible> {
+ ) -> Result<Response<BoxBody<Error>>, http::Error> {
if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(req.headers())
{