diff options
author | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
commit | 5e5299a6d0addc6498be12a24451860b9e4c3445 (patch) | |
tree | ee09ba8e40ae0e14af3dbe2ec4e576a6f0b4aea0 /src/https.rs | |
parent | d7511c683d47cdc6eb2d19f4276b359b1c6841f6 (diff) | |
download | tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.tar.gz tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.zip |
Add graceful shutdown and memory tracing
Diffstat (limited to 'src/https.rs')
-rw-r--r-- | src/https.rs | 52 |
1 files changed, 39 insertions, 13 deletions
diff --git a/src/https.rs b/src/https.rs index 34e3f85..6b1f5e7 100644 --- a/src/https.rs +++ b/src/https.rs @@ -7,6 +7,7 @@ use log::*; use accept_encoding_fork::Encoding; use async_compression::tokio::bufread::*; +use futures::stream::FuturesUnordered; use futures::StreamExt; use futures::TryStreamExt; use http::header::{HeaderName, HeaderValue}; @@ -15,6 +16,7 @@ use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{header, Body, Request, Response, StatusCode}; use tokio::net::TcpListener; +use tokio::select; use tokio::sync::watch; use tokio_rustls::TlsAcceptor; use tokio_util::io::{ReaderStream, StreamReader}; @@ -33,6 +35,7 @@ pub async fn serve_https( config: HttpsConfig, cert_store: Arc<CertStore>, rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, + mut must_exit: watch::Receiver<bool>, ) -> Result<()> { let config = Arc::new(config); @@ -47,28 +50,43 @@ pub async fn serve_https( info!("Starting to serve on https://{}.", config.bind_addr); let tcp = TcpListener::bind(config.bind_addr).await?; - loop { - let (socket, remote_addr) = tcp.accept().await?; + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let (socket, remote_addr) = select! { + a = tcp.accept() => a?, + _ = connections.next() => continue, + _ = must_exit.changed() => continue, + }; let rx_proxy_config = rx_proxy_config.clone(); let tls_acceptor = tls_acceptor.clone(); let config = config.clone(); - tokio::spawn(async move { + let mut must_exit_2 = must_exit.clone(); + let conn = tokio::spawn(async move { match tls_acceptor.accept(socket).await { Ok(stream) => { debug!("TLS handshake was successfull"); - let http_result = Http::new() - .serve_connection( - stream, - service_fn(move |req: Request<Body>| { - let https_config = config.clone(); - let proxy_config: Arc<ProxyConfig> = - rx_proxy_config.borrow().clone(); - handle_outer(remote_addr, req, https_config, proxy_config) - }), + let http_conn = Http::new().serve_connection( + stream, + service_fn(move |req: Request<Body>| { + let https_config = config.clone(); + let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone(); + handle_outer(remote_addr, req, https_config, proxy_config) + }), + ); + tokio::pin!(http_conn); + let http_result = loop { + select! ( + r = &mut http_conn => break r, + _ = must_exit_2.changed() => { + if *must_exit_2.borrow() { + http_conn.as_mut().graceful_shutdown(); + } + } ) - .await; + }; if let Err(http_err) = http_result { warn!("HTTP error: {}", http_err); } @@ -76,7 +94,15 @@ pub async fn serve_https( Err(e) => warn!("Error in TLS connection: {}", e), } }); + connections.push(conn); } + + info!("HTTPS server shutting down, draining remaining connections..."); + while !connections.is_empty() { + let _ = connections.next().await; + } + + Ok(()) } async fn handle_outer( |