diff options
author | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-24 19:28:18 +0100 |
commit | 5e5299a6d0addc6498be12a24451860b9e4c3445 (patch) | |
tree | ee09ba8e40ae0e14af3dbe2ec4e576a6f0b4aea0 /src/main.rs | |
parent | d7511c683d47cdc6eb2d19f4276b359b1c6841f6 (diff) | |
download | tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.tar.gz tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.zip |
Add graceful shutdown and memory tracing
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 95 |
1 files changed, 84 insertions, 11 deletions
diff --git a/src/main.rs b/src/main.rs index dcbd187..dada7e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,14 @@ #[macro_use] extern crate anyhow; -use futures::TryFutureExt; +use log::*; +use std::sync::Arc; + +use futures::{FutureExt, TryFutureExt}; use std::net::SocketAddr; use structopt::StructOpt; +use tokio::select; +use tokio::sync::watch; mod cert; mod cert_store; @@ -14,7 +19,11 @@ mod proxy_config; mod reverse_proxy; mod tls_util; -use log::*; +use proxy_config::ProxyConfig; + +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; #[derive(StructOpt, Debug)] #[structopt(name = "tricot")] @@ -86,6 +95,9 @@ struct Opt { #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "tricot=info") } @@ -101,6 +113,12 @@ async fn main() { info!("Starting Tricot"); + let (exit_signal, provoke_exit) = watch_ctrl_c(); + let exit_on_err = move |err: anyhow::Error| { + error!("Error: {}", err); + let _ = provoke_exit.send(true); + }; + let consul_config = consul::ConsulConfig { addr: opt.consul_addr.clone(), ca_cert: opt.consul_ca_cert.clone(), @@ -110,15 +128,25 @@ async fn main() { let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name) .expect("Error creating Consul client"); - let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone()); + let rx_proxy_config = + proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone()); let cert_store = cert_store::CertStore::new( consul.clone(), rx_proxy_config.clone(), opt.letsencrypt_email.clone(), + exit_on_err.clone(), ); - tokio::spawn(http::serve_http(opt.http_bind_addr, consul.clone()).map_err(exit_on_err)); + let http_task = tokio::spawn( + http::serve_http( + opt.http_bind_addr, + consul.clone(), + wait_from(exit_signal.clone()), + ) + .map_err(exit_on_err.clone()) + .then(|_| async { info!("HTTP server exited") }), + ); let https_config = https::HttpsConfig { bind_addr: opt.https_bind_addr, @@ -129,12 +157,38 @@ async fn main() { .map(|x| x.to_string()) .collect(), }; - tokio::spawn( - https::serve_https(https_config, cert_store.clone(), rx_proxy_config.clone()) - .map_err(exit_on_err), + + let https_task = tokio::spawn( + https::serve_https( + https_config, + cert_store.clone(), + rx_proxy_config.clone(), + exit_signal.clone(), + ) + .map_err(exit_on_err.clone()) + .then(|_| async { info!("HTTPS server exited") }), ); - while rx_proxy_config.changed().await.is_ok() { + let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone())); + + let _ = http_task.await.expect("Tokio task await failure"); + let _ = https_task.await.expect("Tokio task await failure"); + let _ = dump_task.await.expect("Tokio task await failure"); +} + +async fn dump_config_on_change( + mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, + mut must_exit: watch::Receiver<bool>, +) { + while !*must_exit.borrow() { + select!( + c = rx_proxy_config.changed() => { + if !c.is_ok() { + break; + } + } + _ = must_exit.changed() => continue, + ); println!("---- PROXY CONFIGURATION ----"); for ent in rx_proxy_config.borrow().entries.iter() { println!(" {}", ent); @@ -143,7 +197,26 @@ async fn main() { } } -fn exit_on_err(e: anyhow::Error) { - error!("{}", e); - std::process::exit(1); +/// Creates a watch that contains `false`, and that changes +/// to `true` when a Ctrl+C signal is received. +pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} + +async fn wait_from(mut chan: watch::Receiver<bool>) { + while !*chan.borrow() { + if chan.changed().await.is_err() { + return; + } + } } |