aboutsummaryrefslogtreecommitdiff
path: root/src/web
diff options
context:
space:
mode:
Diffstat (limited to 'src/web')
-rw-r--r--src/web/Cargo.toml19
-rw-r--r--src/web/web_server.rs134
2 files changed, 75 insertions, 78 deletions
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 9f7720da..49549c9b 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -19,16 +19,17 @@ garage_model.workspace = true
garage_util.workspace = true
garage_table.workspace = true
-err-derive = "0.3"
-tracing = "0.1"
-percent-encoding = "2.1.0"
+err-derive.workspace = true
+tracing.workspace = true
+percent-encoding.workspace = true
-futures = "0.3"
+futures.workspace = true
-http = "0.2"
-hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
-hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
+http.workspace = true
+http-body-util.workspace = true
+hyper.workspace = true
+hyper-util.workspace = true
-tokio = { version = "1.0", default-features = false, features = ["net"] }
+tokio.workspace = true
-opentelemetry = "0.17"
+opentelemetry.workspace = true
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 73780efb..0f9b5dc8 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -2,19 +2,15 @@ use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc};
-use futures::future::Future;
+use tokio::net::{TcpListener, UnixListener};
+use tokio::sync::watch;
use hyper::{
+ body::Incoming as IncomingBody,
header::{HeaderValue, HOST},
- server::conn::AddrStream,
- service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server, StatusCode,
+ Method, Request, Response, StatusCode,
};
-use hyperlocal::UnixServerExt;
-
-use tokio::net::UnixStream;
-
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
@@ -24,7 +20,8 @@ use opentelemetry::{
use crate::error::*;
-use garage_api::helpers::{authority_to_host, host_to_bucket};
+use garage_api::generic_server::{server_loop, UnixListenerOn};
+use garage_api::helpers::*;
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
@@ -74,78 +71,53 @@ pub struct WebServer {
impl WebServer {
/// Run a web server
- pub async fn run(
- garage: Arc<Garage>,
- addr: UnixOrTCPSocketAddress,
- root_domain: String,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), GarageError> {
+ pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
let metrics = Arc::new(WebMetrics::new());
- let web_server = Arc::new(WebServer {
+ Arc::new(WebServer {
garage,
metrics,
root_domain,
- });
-
- let tcp_service = make_service_fn(|conn: &AddrStream| {
- let web_server = web_server.clone();
-
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let web_server = web_server.clone();
-
- web_server.handle_request(req, client_addr.to_string())
- }))
- }
- });
-
- let unix_service = make_service_fn(|_: &UnixStream| {
- let web_server = web_server.clone();
-
- let path = addr.to_string();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let web_server = web_server.clone();
-
- web_server.handle_request(req, path.clone())
- }))
- }
- });
+ })
+ }
- info!("Web server listening on {}", addr);
+ pub async fn run(
+ self: Arc<Self>,
+ bind_addr: UnixOrTCPSocketAddress,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), GarageError> {
+ let server_name = "Web".into();
+ info!("Web server listening on {}", bind_addr);
- match addr {
+ match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
- Server::bind(&addr)
- .serve(tcp_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?
+ let listener = TcpListener::bind(addr).await?;
+
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
fs::remove_file(path)?
}
- let bound = Server::bind_unix(path)?;
+ let listener = UnixListener::bind(path)?;
+ let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(path, Permissions::from_mode(0o222))?;
- bound
- .serve(unix_service)
- .with_graceful_shutdown(shutdown_signal)
- .await?;
+ let handler =
+ move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
+ server_loop(server_name, listener, handler, must_exit).await
}
- };
-
- Ok(())
+ }
}
async fn handle_request(
self: Arc<Self>,
- req: Request<Body>,
+ req: Request<IncomingBody>,
addr: String,
- ) -> Result<Response<Body>, Infallible> {
+ ) -> Result<Response<BoxBody<Error>>, http::Error> {
if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(req.headers())
{
@@ -187,7 +159,8 @@ impl WebServer {
match res {
Ok(res) => {
debug!("{} {} {}", req.method(), res.status(), req.uri());
- Ok(res)
+ Ok(res
+ .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from))))
}
Err(error) => {
info!(
@@ -220,7 +193,10 @@ impl WebServer {
Ok(exists)
}
- async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> {
+ async fn serve_file(
+ self: &Arc<Self>,
+ req: &Request<IncomingBody>,
+ ) -> Result<Response<BoxBody<ApiError>>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
.headers()
@@ -267,9 +243,21 @@ impl WebServer {
);
let ret_doc = match *req.method() {
- Method::OPTIONS => handle_options_for_bucket(req, &bucket),
- Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await,
- Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await,
+ Method::OPTIONS => handle_options_for_bucket(req, &bucket)
+ .map_err(ApiError::from)
+ .map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
+ Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
+ Method::GET => {
+ handle_get(
+ self.garage.clone(),
+ &req,
+ bucket_id,
+ &key,
+ None,
+ Default::default(),
+ )
+ .await
+ }
_ => Err(ApiError::bad_request("HTTP method not supported")),
};
@@ -281,7 +269,7 @@ impl WebServer {
Ok(Response::builder()
.status(StatusCode::FOUND)
.header("Location", url)
- .body(Body::empty())
+ .body(empty_body())
.unwrap())
}
_ => ret_doc,
@@ -310,10 +298,18 @@ impl WebServer {
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
- .body(Body::empty())
+ .body(empty_body::<Infallible>())
.unwrap();
- match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await
+ match handle_get(
+ self.garage.clone(),
+ &req2,
+ bucket_id,
+ &error_document,
+ None,
+ Default::default(),
+ )
+ .await
{
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
@@ -358,7 +354,7 @@ impl WebServer {
}
}
-fn error_to_res(e: Error) -> Response<Body> {
+fn error_to_res(e: Error) -> Response<BoxBody<Error>> {
// If we are here, it is either that:
// - there was an error before trying to get the requested URL
// from the bucket (e.g. bucket not found)
@@ -366,7 +362,7 @@ fn error_to_res(e: Error) -> Response<Body> {
// was a HEAD request or we couldn't get the error document)
// We do NOT enter this code path when returning the bucket's
// error document (this is handled in serve_file)
- let body = Body::from(format!("{}\n", e));
+ let body = string_body(format!("{}\n", e));
let mut http_error = Response::new(body);
*http_error.status_mut() = e.http_status_code();
e.add_headers(http_error.headers_mut());