diff options
author | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
commit | 5e5299a6d0addc6498be12a24451860b9e4c3445 (patch) | |
tree | ee09ba8e40ae0e14af3dbe2ec4e576a6f0b4aea0 | |
parent | d7511c683d47cdc6eb2d19f4276b359b1c6841f6 (diff) | |
download | tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.tar.gz tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.zip |
Add graceful shutdown and memory tracing
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Cargo.lock | 96 | ||||
-rw-r--r-- | Cargo.toml | 7 | ||||
-rw-r--r-- | src/cert_store.rs | 12 | ||||
-rw-r--r-- | src/http.rs | 11 | ||||
-rw-r--r-- | src/https.rs | 52 | ||||
-rw-r--r-- | src/main.rs | 95 | ||||
-rw-r--r-- | src/proxy_config.rs | 23 |
8 files changed, 258 insertions, 39 deletions
@@ -1,2 +1,3 @@ /target run_local.sh +dhat-heap.json @@ -371,6 +371,21 @@ dependencies = [ ] [[package]] +name = "dhat" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47003dc9f6368a88e85956c3b2573a7e6872746a3e5d762a8885da3a136a0381" +dependencies = [ + "backtrace", + "lazy_static", + "parking_lot 0.11.2", + "rustc-hash", + "serde", + "serde_json", + "thousands", +] + +[[package]] name = "discard" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -852,6 +867,15 @@ dependencies = [ ] [[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] name = "iovec" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -922,6 +946,15 @@ dependencies = [ ] [[package]] +name = "lock_api" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +dependencies = [ + "scopeguard", +] + +[[package]] name = "log" version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1129,12 +1162,23 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.3.4", + "parking_lot_core 0.6.2", "rustc_version", ] [[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api 0.4.5", + "parking_lot_core 0.8.5", +] + +[[package]] name = "parking_lot_core" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1143,9 +1187,23 @@ dependencies = [ "cfg-if 0.1.10", "cloudabi", "libc", - "redox_syscall", + "redox_syscall 0.1.57", "rustc_version", - "smallvec", + "smallvec 0.6.14", + "winapi 0.3.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall 0.2.10", + "smallvec 1.8.0", "winapi 0.3.9", ] @@ -1286,6 +1344,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] +name = "redox_syscall" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +dependencies = [ + "bitflags", +] + +[[package]] name = "regex" version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1360,6 +1427,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + +[[package]] name = "rustc_version" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1568,6 +1641,12 @@ dependencies = [ ] [[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] name = "socket2" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1722,6 +1801,12 @@ dependencies = [ ] [[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + +[[package]] name = "time" version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1886,7 +1971,7 @@ dependencies = [ "log", "mio 0.6.23", "num_cpus", - "parking_lot", + "parking_lot 0.9.0", "slab", "tokio-executor", "tokio-io", @@ -2007,6 +2092,7 @@ dependencies = [ "async-compression", "bytes 1.1.0", "chrono", + "dhat", "envy", "futures 0.3.18", "futures-util", @@ -35,3 +35,10 @@ accept-encoding-fork = "0.2.0-alpha.3" async-compression = { version = "0.3", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] } tokio-util = { version = "0.6", features = ["io"] } uuid = { version = "0.8.2", features = ["v4"] } +dhat = { version = "0.3", optional = true } + +[profile.release] +debug = 1 + +[features] +dhat-heap = [ "dhat" ] diff --git a/src/cert_store.rs b/src/cert_store.rs index d561605..c1381db 100644 --- a/src/cert_store.rs +++ b/src/cert_store.rs @@ -4,7 +4,7 @@ use std::time::{Duration, Instant}; use anyhow::Result; use chrono::Utc; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use log::*; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -16,7 +16,6 @@ use rustls::sign::CertifiedKey; use crate::cert::{Cert, CertSer}; use crate::consul::*; -use crate::exit_on_err; use crate::proxy_config::*; pub struct CertStore { @@ -33,6 +32,7 @@ impl CertStore { consul: Consul, rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, letsencrypt_email: String, + exit_on_err: impl Fn(anyhow::Error) + Send + 'static, ) -> Arc<Self> { let (tx, rx) = mpsc::unbounded_channel(); @@ -45,7 +45,13 @@ impl CertStore { tx_need_cert: tx, }); - tokio::spawn(cert_store.clone().certificate_loop(rx).map_err(exit_on_err)); + tokio::spawn( + cert_store + .clone() + .certificate_loop(rx) + .map_err(exit_on_err) + .then(|_| async { info!("Certificate renewal task exited") }), + ); cert_store } diff --git a/src/http.rs b/src/http.rs index 05d7440..973e77f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Result; use log::*; +use futures::future::Future; use http::uri::Authority; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server, StatusCode, Uri}; @@ -12,7 +13,11 @@ use crate::consul::Consul; const CHALLENGE_PREFIX: &str = "/.well-known/acme-challenge/"; -pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> { +pub async fn serve_http( + bind_addr: SocketAddr, + consul: Consul, + shutdown_signal: impl Future<Output = ()>, +) -> Result<()> { let consul = Arc::new(consul); // For every connection, we must make a `Service` to handle all // incoming HTTP requests on said connection. @@ -30,7 +35,9 @@ pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> { }); info!("Listening on http://{}", bind_addr); - let server = Server::bind(&bind_addr).serve(make_svc); + let server = Server::bind(&bind_addr) + .serve(make_svc) + .with_graceful_shutdown(shutdown_signal); server.await?; diff --git a/src/https.rs b/src/https.rs index 34e3f85..6b1f5e7 100644 --- a/src/https.rs +++ b/src/https.rs @@ -7,6 +7,7 @@ use log::*; use accept_encoding_fork::Encoding; use async_compression::tokio::bufread::*; +use futures::stream::FuturesUnordered; use futures::StreamExt; use futures::TryStreamExt; use http::header::{HeaderName, HeaderValue}; @@ -15,6 +16,7 @@ use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{header, Body, Request, Response, StatusCode}; use tokio::net::TcpListener; +use tokio::select; use tokio::sync::watch; use tokio_rustls::TlsAcceptor; use tokio_util::io::{ReaderStream, StreamReader}; @@ -33,6 +35,7 @@ pub async fn serve_https( config: HttpsConfig, cert_store: Arc<CertStore>, rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, + mut must_exit: watch::Receiver<bool>, ) -> Result<()> { let config = Arc::new(config); @@ -47,28 +50,43 @@ pub async fn serve_https( info!("Starting to serve on https://{}.", config.bind_addr); let tcp = TcpListener::bind(config.bind_addr).await?; - loop { - let (socket, remote_addr) = tcp.accept().await?; + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let (socket, remote_addr) = select! { + a = tcp.accept() => a?, + _ = connections.next() => continue, + _ = must_exit.changed() => continue, + }; let rx_proxy_config = rx_proxy_config.clone(); let tls_acceptor = tls_acceptor.clone(); let config = config.clone(); - tokio::spawn(async move { + let mut must_exit_2 = must_exit.clone(); + let conn = tokio::spawn(async move { match tls_acceptor.accept(socket).await { Ok(stream) => { debug!("TLS handshake was successfull"); - let http_result = Http::new() - .serve_connection( - stream, - service_fn(move |req: Request<Body>| { - let https_config = config.clone(); - let proxy_config: Arc<ProxyConfig> = - rx_proxy_config.borrow().clone(); - handle_outer(remote_addr, req, https_config, proxy_config) - }), + let http_conn = Http::new().serve_connection( + stream, + service_fn(move |req: Request<Body>| { + let https_config = config.clone(); + let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone(); + handle_outer(remote_addr, req, https_config, proxy_config) + }), + ); + tokio::pin!(http_conn); + let http_result = loop { + select! ( + r = &mut http_conn => break r, + _ = must_exit_2.changed() => { + if *must_exit_2.borrow() { + http_conn.as_mut().graceful_shutdown(); + } + } ) - .await; + }; if let Err(http_err) = http_result { warn!("HTTP error: {}", http_err); } @@ -76,7 +94,15 @@ pub async fn serve_https( Err(e) => warn!("Error in TLS connection: {}", e), } }); + connections.push(conn); } + + info!("HTTPS server shutting down, draining remaining connections..."); + while !connections.is_empty() { + let _ = connections.next().await; + } + + Ok(()) } async fn handle_outer( diff --git a/src/main.rs b/src/main.rs index dcbd187..dada7e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,14 @@ #[macro_use] extern crate anyhow; -use futures::TryFutureExt; +use log::*; +use std::sync::Arc; + +use futures::{FutureExt, TryFutureExt}; use std::net::SocketAddr; use structopt::StructOpt; +use tokio::select; +use tokio::sync::watch; mod cert; mod cert_store; @@ -14,7 +19,11 @@ mod proxy_config; mod reverse_proxy; mod tls_util; -use log::*; +use proxy_config::ProxyConfig; + +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; #[derive(StructOpt, Debug)] #[structopt(name = "tricot")] @@ -86,6 +95,9 @@ struct Opt { #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "tricot=info") } @@ -101,6 +113,12 @@ async fn main() { info!("Starting Tricot"); + let (exit_signal, provoke_exit) = watch_ctrl_c(); + let exit_on_err = move |err: anyhow::Error| { + error!("Error: {}", err); + let _ = provoke_exit.send(true); + }; + let consul_config = consul::ConsulConfig { addr: opt.consul_addr.clone(), ca_cert: opt.consul_ca_cert.clone(), @@ -110,15 +128,25 @@ async fn main() { let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name) .expect("Error creating Consul client"); - let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone()); + let rx_proxy_config = + proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone()); let cert_store = cert_store::CertStore::new( consul.clone(), rx_proxy_config.clone(), opt.letsencrypt_email.clone(), + exit_on_err.clone(), ); - tokio::spawn(http::serve_http(opt.http_bind_addr, consul.clone()).map_err(exit_on_err)); + let http_task = tokio::spawn( + http::serve_http( + opt.http_bind_addr, + consul.clone(), + wait_from(exit_signal.clone()), + ) + .map_err(exit_on_err.clone()) + .then(|_| async { info!("HTTP server exited") }), + ); let https_config = https::HttpsConfig { bind_addr: opt.https_bind_addr, @@ -129,12 +157,38 @@ async fn main() { .map(|x| x.to_string()) .collect(), }; - tokio::spawn( - https::serve_https(https_config, cert_store.clone(), rx_proxy_config.clone()) - .map_err(exit_on_err), + + let https_task = tokio::spawn( + https::serve_https( + https_config, + cert_store.clone(), + rx_proxy_config.clone(), + exit_signal.clone(), + ) + .map_err(exit_on_err.clone()) + .then(|_| async { info!("HTTPS server exited") }), ); - while rx_proxy_config.changed().await.is_ok() { + let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone())); + + let _ = http_task.await.expect("Tokio task await failure"); + let _ = https_task.await.expect("Tokio task await failure"); + let _ = dump_task.await.expect("Tokio task await failure"); +} + +async fn dump_config_on_change( + mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, + mut must_exit: watch::Receiver<bool>, +) { + while !*must_exit.borrow() { + select!( + c = rx_proxy_config.changed() => { + if !c.is_ok() { + break; + } + } + _ = must_exit.changed() => continue, + ); println!("---- PROXY CONFIGURATION ----"); for ent in rx_proxy_config.borrow().entries.iter() { println!(" {}", ent); @@ -143,7 +197,26 @@ async fn main() { } } -fn exit_on_err(e: anyhow::Error) { - error!("{}", e); - std::process::exit(1); +/// Creates a watch that contains `false`, and that changes +/// to `true` when a Ctrl+C signal is received. +pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} + +async fn wait_from(mut chan: watch::Receiver<bool>) { + while !*chan.borrow() { + if chan.changed().await.is_err() { + return; + } + } } diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 4add98c..e380885 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -9,7 +9,7 @@ use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; use log::*; -use tokio::{sync::watch, time::sleep}; +use tokio::{select, sync::watch, time::sleep}; use crate::consul::*; @@ -231,7 +231,10 @@ struct NodeWatchState { retries: u32, } -pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> { +pub fn spawn_proxy_config_task( + consul: Consul, + mut must_exit: watch::Receiver<bool>, +) -> watch::Receiver<Arc<ProxyConfig>> { let (tx, rx) = watch::channel(Arc::new(ProxyConfig { entries: Vec::new(), })); @@ -244,8 +247,13 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi let mut node_site = HashMap::new(); - loop { - match consul.list_nodes().await { + while !*must_exit.borrow() { + let list_nodes = select! { + ln = consul.list_nodes() => ln, + _ = must_exit.changed() => continue, + }; + + match list_nodes { Ok(consul_nodes) => { info!("Watched consul nodes: {:?}", consul_nodes); for consul_node in consul_nodes { @@ -271,7 +279,12 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi } } - let (node, res): (String, Result<_>) = match watches.next().await { + let next_watch = select! { + nw = watches.next() => nw, + _ = must_exit.changed() => continue, + }; + + let (node, res): (String, Result<_>) = match next_watch { Some(v) => v, None => { warn!("No nodes currently watched in proxy_config.rs"); |