From fe1af5d98b8e6c87f39f1d94586e61bff100e7d2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 13:02:02 +0100 Subject: [dep-upgrade-202402] refactor dependencies: move all as workspace deps --- src/web/Cargo.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'src/web') diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index cb4b7f2b..8eb35c25 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -19,16 +19,16 @@ garage_model.workspace = true garage_util.workspace = true garage_table.workspace = true -err-derive = "0.3" -tracing = "0.1" -percent-encoding = "2.1.0" +err-derive.workspace = true +tracing.workspace = true +percent-encoding.workspace = true -futures = "0.3" +futures.workspace = true -http = "0.2" -hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } -hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } +http.workspace = true +hyper.workspace = true +hyperlocal.workspace = true -tokio = { version = "1.0", default-features = false, features = ["net"] } +tokio.workspace = true -opentelemetry = "0.17" +opentelemetry.workspace = true -- cgit v1.2.3 From 0bb5b77530ad432e4c77f13b395fe74613812337 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 18:49:54 +0100 Subject: [dep-upgrade-202402] wip: port to http/hyper crates v1 --- src/web/Cargo.toml | 3 +- src/web/web_server.rs | 119 ++++++++++++++++++++++++++++---------------------- 2 files changed, 68 insertions(+), 54 deletions(-) (limited to 'src/web') diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 8eb35c25..3add5200 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -26,8 +26,9 @@ percent-encoding.workspace = true futures.workspace = true http.workspace = true +http-body-util.workspace = true hyper.workspace = true -hyperlocal.workspace = true +hyper-util.workspace = true tokio.workspace = true diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 73780efb..ce438469 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -4,16 +4,17 @@ use std::{convert::Infallible, sync::Arc}; use futures::future::Future; +use hyper::server::conn::http1; use hyper::{ + body::Incoming as IncomingBody, header::{HeaderValue, HOST}, - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, StatusCode, + service::service_fn, + Method, Request, Response, StatusCode, }; +use hyper_util::rt::TokioIo; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ global, @@ -24,7 +25,7 @@ use opentelemetry::{ use crate::error::*; -use garage_api::helpers::{authority_to_host, host_to_bucket}; +use garage_api::helpers::*; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, @@ -76,7 +77,7 @@ impl WebServer { /// Run a web server pub async fn run( garage: Arc, - addr: UnixOrTCPSocketAddress, + bind_addr: UnixOrTCPSocketAddress, root_domain: String, shutdown_signal: impl Future, ) -> Result<(), GarageError> { @@ -87,65 +88,73 @@ impl WebServer { root_domain, }); - let tcp_service = make_service_fn(|conn: &AddrStream| { - let web_server = web_server.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request| { - let web_server = web_server.clone(); - - web_server.handle_request(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let web_server = web_server.clone(); + info!("Web server listening on {}", bind_addr); - let path = addr.to_string(); - async move { - Ok::<_, Error>(service_fn(move |req: Request| { - let web_server = web_server.clone(); + tokio::pin!(shutdown_signal); - web_server.handle_request(req, path.clone()) - })) - } - }); + match bind_addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + let listener = TcpListener::bind(addr).await?; - info!("Web server listening on {}", addr); + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; - match addr { - UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + web_server.launch_handler(stream, client_addr.to_string()); + } } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; fs::set_permissions(path, Permissions::from_mode(0o222))?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + loop { + let (stream, _) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + web_server.launch_handler(stream, path.display().to_string()); + } } }; Ok(()) } + fn launch_handler(self: &Arc, stream: S, client_addr: String) + where + S: AsyncRead + AsyncWrite + Send + Sync + 'static, + { + let this = self.clone(); + let io = TokioIo::new(stream); + + let serve = move |req: Request| { + this.clone().handle_request(req, client_addr.to_string()) + }; + + tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + } + async fn handle_request( self: Arc, - req: Request, + req: Request, addr: String, - ) -> Result, Infallible> { + ) -> Result>, Infallible> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { @@ -187,7 +196,8 @@ impl WebServer { match res { Ok(res) => { debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) + Ok(res + .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from)))) } Err(error) => { info!( @@ -220,7 +230,10 @@ impl WebServer { Ok(exists) } - async fn serve_file(self: &Arc, req: &Request) -> Result, Error> { + async fn serve_file( + self: &Arc, + req: &Request, + ) -> Result>, Error> { // Get http authority string (eg. [::1]:3902 or garage.tld:80) let authority = req .headers() @@ -268,8 +281,8 @@ impl WebServer { let ret_doc = match *req.method() { Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, + Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await, _ => Err(ApiError::bad_request("HTTP method not supported")), }; @@ -281,7 +294,7 @@ impl WebServer { Ok(Response::builder() .status(StatusCode::FOUND) .header("Location", url) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } _ => ret_doc, @@ -310,7 +323,7 @@ impl WebServer { // Create a fake HTTP request with path = the error document let req2 = Request::builder() .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) + .body(empty_body::()) .unwrap(); match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await @@ -358,7 +371,7 @@ impl WebServer { } } -fn error_to_res(e: Error) -> Response { +fn error_to_res(e: Error) -> Response> { // If we are here, it is either that: // - there was an error before trying to get the requested URL // from the bucket (e.g. bucket not found) @@ -366,7 +379,7 @@ fn error_to_res(e: Error) -> Response { // was a HEAD request or we couldn't get the error document) // We do NOT enter this code path when returning the bucket's // error document (this is handled in serve_file) - let body = Body::from(format!("{}\n", e)); + let body = string_body(format!("{}\n", e)); let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); e.add_headers(http_error.headers_mut()); -- cgit v1.2.3 From a22bd319202f05bce4ad13072238c7ba81d518fb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Feb 2024 19:27:12 +0100 Subject: [dep-upgrade-202402] migration to http/hyper 1.0 for k2v api --- src/web/web_server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/web') diff --git a/src/web/web_server.rs b/src/web/web_server.rs index ce438469..668a897a 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -280,7 +280,9 @@ impl WebServer { ); let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), + Method::OPTIONS => handle_options_for_bucket(req, &bucket) + .map_err(ApiError::from) + .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await, _ => Err(ApiError::bad_request("HTTP method not supported")), -- cgit v1.2.3 From fe48d60d2b63a9914297558bd5dbccae8d96c947 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:33:07 +0100 Subject: [dep-upgrade-202402] refactor http listener code --- src/web/web_server.rs | 75 +++++++++++++-------------------------------------- 1 file changed, 19 insertions(+), 56 deletions(-) (limited to 'src/web') diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 668a897a..766e3829 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -4,16 +4,12 @@ use std::{convert::Infallible, sync::Arc}; use futures::future::Future; -use hyper::server::conn::http1; use hyper::{ body::Incoming as IncomingBody, header::{HeaderValue, HOST}, - service::service_fn, Method, Request, Response, StatusCode, }; -use hyper_util::rt::TokioIo; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ @@ -25,6 +21,7 @@ use opentelemetry::{ use crate::error::*; +use garage_api::generic_server::{server_loop, UnixListenerOn}; use garage_api::helpers::*; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::error::{ @@ -75,35 +72,29 @@ pub struct WebServer { impl WebServer { /// Run a web server - pub async fn run( - garage: Arc, - bind_addr: UnixOrTCPSocketAddress, - root_domain: String, - shutdown_signal: impl Future, - ) -> Result<(), GarageError> { + pub fn new(garage: Arc, root_domain: String) -> Arc { let metrics = Arc::new(WebMetrics::new()); - let web_server = Arc::new(WebServer { + Arc::new(WebServer { garage, metrics, root_domain, - }); + }) + } + pub async fn run( + self: Arc, + bind_addr: UnixOrTCPSocketAddress, + shutdown_signal: impl Future, + ) -> Result<(), GarageError> { info!("Web server listening on {}", bind_addr); - tokio::pin!(shutdown_signal); - match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; - loop { - let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - web_server.launch_handler(stream, client_addr.to_string()); - } + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(listener, handler, shutdown_signal).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -111,50 +102,22 @@ impl WebServer { } let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions(path, Permissions::from_mode(0o222))?; - loop { - let (stream, _) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - web_server.launch_handler(stream, path.display().to_string()); - } + let handler = + move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); + server_loop(listener, handler, shutdown_signal).await } - }; - - Ok(()) - } - - fn launch_handler(self: &Arc, stream: S, client_addr: String) - where - S: AsyncRead + AsyncWrite + Send + Sync + 'static, - { - let this = self.clone(); - let io = TokioIo::new(stream); - - let serve = move |req: Request| { - this.clone().handle_request(req, client_addr.to_string()) - }; - - tokio::task::spawn(async move { - let io = Box::pin(io); - if let Err(e) = http1::Builder::new() - .serve_connection(io, service_fn(serve)) - .await - { - debug!("Error handling HTTP connection: {}", e); - } - }); + } } async fn handle_request( self: Arc, req: Request, addr: String, - ) -> Result>, Infallible> { + ) -> Result>, http::Error> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { -- cgit v1.2.3