From fe48d60d2b63a9914297558bd5dbccae8d96c947 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Feb 2024 14:33:07 +0100 Subject: [dep-upgrade-202402] refactor http listener code --- src/api/generic_server.rs | 155 +++++++++++++++++++++++++++++++++------------- 1 file changed, 111 insertions(+), 44 deletions(-) (limited to 'src/api') diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 832f2da3..e3005f8a 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; +use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; use http_body_util::BodyExt; use hyper::header::HeaderValue; @@ -15,7 +16,7 @@ use hyper::{HeaderMap, StatusCode}; use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::{TcpListener, UnixListener}; +use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use opentelemetry::{ global, @@ -110,20 +111,12 @@ impl ApiServer { bind_addr ); - tokio::pin!(shutdown_signal); - match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; - loop { - let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - self.launch_handler(stream, client_addr.to_string()); - } + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(listener, handler, shutdown_signal).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -131,52 +124,24 @@ impl ApiServer { } let listener = UnixListener::bind(path)?; + let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; - loop { - let (stream, _) = tokio::select! { - acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, - }; - - self.launch_handler(stream, path.display().to_string()); - } - } - }; - - Ok(()) - } - - fn launch_handler(self: &Arc, stream: S, client_addr: String) - where - S: AsyncRead + AsyncWrite + Send + Sync + 'static, - { - let this = self.clone(); - let io = TokioIo::new(stream); - - let serve = - move |req: Request| this.clone().handler(req, client_addr.to_string()); - - tokio::task::spawn(async move { - let io = Box::pin(io); - if let Err(e) = http1::Builder::new() - .serve_connection(io, service_fn(serve)) - .await - { - debug!("Error handling HTTP connection: {}", e); + let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); + server_loop(listener, handler, shutdown_signal).await } - }); + } } async fn handler( self: Arc, req: Request, addr: String, - ) -> Result>, GarageError> { + ) -> Result>, http::Error> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = @@ -278,3 +243,105 @@ impl ApiServer { res } } + +// ==== helper functions ==== + +#[async_trait] +pub trait Accept: Send + Sync + 'static { + type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; +} + +#[async_trait] +impl Accept for TcpListener { + type Stream = TcpStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.accept() + .await + .map(|(stream, addr)| (stream, addr.to_string())) + } +} + +pub struct UnixListenerOn(pub UnixListener, pub String); + +#[async_trait] +impl Accept for UnixListenerOn { + type Stream = UnixStream; + async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { + self.0 + .accept() + .await + .map(|(stream, _addr)| (stream, self.1.clone())) + } +} + +pub async fn server_loop( + listener: A, + handler: H, + shutdown_signal: impl Future, +) -> Result<(), GarageError> +where + A: Accept, + H: Fn(Request, String) -> F + Send + Sync + Clone + 'static, + F: Future>, http::Error>> + Send + 'static, + E: Send + Sync + std::error::Error + 'static, +{ + tokio::pin!(shutdown_signal); + + let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); + let connection_collector = tokio::spawn(async move { + let mut collection = FuturesUnordered::new(); + loop { + let collect_next = async { + if collection.is_empty() { + futures::future::pending().await + } else { + collection.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("HTTP connection finished: {:?}", result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => collection.push(f), + None => break, + } + } + } + } + debug!("Collecting last open HTTP connections."); + while let Some(conn_res) = collection.next().await { + trace!("HTTP connection finished: {:?}", conn_res); + } + debug!("No more HTTP connections to collect"); + }); + + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + let io = TokioIo::new(stream); + + let handler = handler.clone(); + let serve = move |req: Request| handler(req, client_addr.clone()); + + let fut = tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + conn_in.send(fut)?; + } + + connection_collector.await?; + + Ok(()) +} -- cgit v1.2.3