aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-01-24 19:28:18 +0100
committerAlex Auvolat <alex@adnab.me>2022-01-24 19:28:18 +0100
commit5e5299a6d0addc6498be12a24451860b9e4c3445 (patch)
treeee09ba8e40ae0e14af3dbe2ec4e576a6f0b4aea0
parentd7511c683d47cdc6eb2d19f4276b359b1c6841f6 (diff)
downloadtricot-5e5299a6d0addc6498be12a24451860b9e4c3445.tar.gz
tricot-5e5299a6d0addc6498be12a24451860b9e4c3445.zip
Add graceful shutdown and memory tracing
-rw-r--r--.gitignore1
-rw-r--r--Cargo.lock96
-rw-r--r--Cargo.toml7
-rw-r--r--src/cert_store.rs12
-rw-r--r--src/http.rs11
-rw-r--r--src/https.rs52
-rw-r--r--src/main.rs95
-rw-r--r--src/proxy_config.rs23
8 files changed, 258 insertions, 39 deletions
diff --git a/.gitignore b/.gitignore
index 1c2b9da..b4e57fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
/target
run_local.sh
+dhat-heap.json
diff --git a/Cargo.lock b/Cargo.lock
index 9d35b3f..a69f72a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -371,6 +371,21 @@ dependencies = [
]
[[package]]
+name = "dhat"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47003dc9f6368a88e85956c3b2573a7e6872746a3e5d762a8885da3a136a0381"
+dependencies = [
+ "backtrace",
+ "lazy_static",
+ "parking_lot 0.11.2",
+ "rustc-hash",
+ "serde",
+ "serde_json",
+ "thousands",
+]
+
+[[package]]
name = "discard"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -852,6 +867,15 @@ dependencies = [
]
[[package]]
+name = "instant"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
+dependencies = [
+ "cfg-if 1.0.0",
+]
+
+[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -922,6 +946,15 @@ dependencies = [
]
[[package]]
+name = "lock_api"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
+dependencies = [
+ "scopeguard",
+]
+
+[[package]]
name = "log"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1129,12 +1162,23 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
dependencies = [
- "lock_api",
- "parking_lot_core",
+ "lock_api 0.3.4",
+ "parking_lot_core 0.6.2",
"rustc_version",
]
[[package]]
+name = "parking_lot"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
+dependencies = [
+ "instant",
+ "lock_api 0.4.5",
+ "parking_lot_core 0.8.5",
+]
+
+[[package]]
name = "parking_lot_core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1143,9 +1187,23 @@ dependencies = [
"cfg-if 0.1.10",
"cloudabi",
"libc",
- "redox_syscall",
+ "redox_syscall 0.1.57",
"rustc_version",
- "smallvec",
+ "smallvec 0.6.14",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
+dependencies = [
+ "cfg-if 1.0.0",
+ "instant",
+ "libc",
+ "redox_syscall 0.2.10",
+ "smallvec 1.8.0",
"winapi 0.3.9",
]
@@ -1286,6 +1344,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
+name = "redox_syscall"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1360,6 +1427,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
+name = "rustc-hash"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
+
+[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1568,6 +1641,12 @@ dependencies = [
]
[[package]]
+name = "smallvec"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
+
+[[package]]
name = "socket2"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1722,6 +1801,12 @@ dependencies = [
]
[[package]]
+name = "thousands"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820"
+
+[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1886,7 +1971,7 @@ dependencies = [
"log",
"mio 0.6.23",
"num_cpus",
- "parking_lot",
+ "parking_lot 0.9.0",
"slab",
"tokio-executor",
"tokio-io",
@@ -2007,6 +2092,7 @@ dependencies = [
"async-compression",
"bytes 1.1.0",
"chrono",
+ "dhat",
"envy",
"futures 0.3.18",
"futures-util",
diff --git a/Cargo.toml b/Cargo.toml
index d970836..dd91560 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,3 +35,10 @@ accept-encoding-fork = "0.2.0-alpha.3"
async-compression = { version = "0.3", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] }
tokio-util = { version = "0.6", features = ["io"] }
uuid = { version = "0.8.2", features = ["v4"] }
+dhat = { version = "0.3", optional = true }
+
+[profile.release]
+debug = 1
+
+[features]
+dhat-heap = [ "dhat" ]
diff --git a/src/cert_store.rs b/src/cert_store.rs
index d561605..c1381db 100644
--- a/src/cert_store.rs
+++ b/src/cert_store.rs
@@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
use anyhow::Result;
use chrono::Utc;
-use futures::TryFutureExt;
+use futures::{FutureExt, TryFutureExt};
use log::*;
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -16,7 +16,6 @@ use rustls::sign::CertifiedKey;
use crate::cert::{Cert, CertSer};
use crate::consul::*;
-use crate::exit_on_err;
use crate::proxy_config::*;
pub struct CertStore {
@@ -33,6 +32,7 @@ impl CertStore {
consul: Consul,
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
letsencrypt_email: String,
+ exit_on_err: impl Fn(anyhow::Error) + Send + 'static,
) -> Arc<Self> {
let (tx, rx) = mpsc::unbounded_channel();
@@ -45,7 +45,13 @@ impl CertStore {
tx_need_cert: tx,
});
- tokio::spawn(cert_store.clone().certificate_loop(rx).map_err(exit_on_err));
+ tokio::spawn(
+ cert_store
+ .clone()
+ .certificate_loop(rx)
+ .map_err(exit_on_err)
+ .then(|_| async { info!("Certificate renewal task exited") }),
+ );
cert_store
}
diff --git a/src/http.rs b/src/http.rs
index 05d7440..973e77f 100644
--- a/src/http.rs
+++ b/src/http.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::Result;
use log::*;
+use futures::future::Future;
use http::uri::Authority;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode, Uri};
@@ -12,7 +13,11 @@ use crate::consul::Consul;
const CHALLENGE_PREFIX: &str = "/.well-known/acme-challenge/";
-pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> {
+pub async fn serve_http(
+ bind_addr: SocketAddr,
+ consul: Consul,
+ shutdown_signal: impl Future<Output = ()>,
+) -> Result<()> {
let consul = Arc::new(consul);
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
@@ -30,7 +35,9 @@ pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> {
});
info!("Listening on http://{}", bind_addr);
- let server = Server::bind(&bind_addr).serve(make_svc);
+ let server = Server::bind(&bind_addr)
+ .serve(make_svc)
+ .with_graceful_shutdown(shutdown_signal);
server.await?;
diff --git a/src/https.rs b/src/https.rs
index 34e3f85..6b1f5e7 100644
--- a/src/https.rs
+++ b/src/https.rs
@@ -7,6 +7,7 @@ use log::*;
use accept_encoding_fork::Encoding;
use async_compression::tokio::bufread::*;
+use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryStreamExt;
use http::header::{HeaderName, HeaderValue};
@@ -15,6 +16,7 @@ use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{header, Body, Request, Response, StatusCode};
use tokio::net::TcpListener;
+use tokio::select;
use tokio::sync::watch;
use tokio_rustls::TlsAcceptor;
use tokio_util::io::{ReaderStream, StreamReader};
@@ -33,6 +35,7 @@ pub async fn serve_https(
config: HttpsConfig,
cert_store: Arc<CertStore>,
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
+ mut must_exit: watch::Receiver<bool>,
) -> Result<()> {
let config = Arc::new(config);
@@ -47,28 +50,43 @@ pub async fn serve_https(
info!("Starting to serve on https://{}.", config.bind_addr);
let tcp = TcpListener::bind(config.bind_addr).await?;
- loop {
- let (socket, remote_addr) = tcp.accept().await?;
+ let mut connections = FuturesUnordered::new();
+
+ while !*must_exit.borrow() {
+ let (socket, remote_addr) = select! {
+ a = tcp.accept() => a?,
+ _ = connections.next() => continue,
+ _ = must_exit.changed() => continue,
+ };
let rx_proxy_config = rx_proxy_config.clone();
let tls_acceptor = tls_acceptor.clone();
let config = config.clone();
- tokio::spawn(async move {
+ let mut must_exit_2 = must_exit.clone();
+ let conn = tokio::spawn(async move {
match tls_acceptor.accept(socket).await {
Ok(stream) => {
debug!("TLS handshake was successfull");
- let http_result = Http::new()
- .serve_connection(
- stream,
- service_fn(move |req: Request<Body>| {
- let https_config = config.clone();
- let proxy_config: Arc<ProxyConfig> =
- rx_proxy_config.borrow().clone();
- handle_outer(remote_addr, req, https_config, proxy_config)
- }),
+ let http_conn = Http::new().serve_connection(
+ stream,
+ service_fn(move |req: Request<Body>| {
+ let https_config = config.clone();
+ let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
+ handle_outer(remote_addr, req, https_config, proxy_config)
+ }),
+ );
+ tokio::pin!(http_conn);
+ let http_result = loop {
+ select! (
+ r = &mut http_conn => break r,
+ _ = must_exit_2.changed() => {
+ if *must_exit_2.borrow() {
+ http_conn.as_mut().graceful_shutdown();
+ }
+ }
)
- .await;
+ };
if let Err(http_err) = http_result {
warn!("HTTP error: {}", http_err);
}
@@ -76,7 +94,15 @@ pub async fn serve_https(
Err(e) => warn!("Error in TLS connection: {}", e),
}
});
+ connections.push(conn);
}
+
+ info!("HTTPS server shutting down, draining remaining connections...");
+ while !connections.is_empty() {
+ let _ = connections.next().await;
+ }
+
+ Ok(())
}
async fn handle_outer(
diff --git a/src/main.rs b/src/main.rs
index dcbd187..dada7e7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,9 +1,14 @@
#[macro_use]
extern crate anyhow;
-use futures::TryFutureExt;
+use log::*;
+use std::sync::Arc;
+
+use futures::{FutureExt, TryFutureExt};
use std::net::SocketAddr;
use structopt::StructOpt;
+use tokio::select;
+use tokio::sync::watch;
mod cert;
mod cert_store;
@@ -14,7 +19,11 @@ mod proxy_config;
mod reverse_proxy;
mod tls_util;
-use log::*;
+use proxy_config::ProxyConfig;
+
+#[cfg(feature = "dhat-heap")]
+#[global_allocator]
+static ALLOC: dhat::Alloc = dhat::Alloc;
#[derive(StructOpt, Debug)]
#[structopt(name = "tricot")]
@@ -86,6 +95,9 @@ struct Opt {
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
+ #[cfg(feature = "dhat-heap")]
+ let _profiler = dhat::Profiler::new_heap();
+
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "tricot=info")
}
@@ -101,6 +113,12 @@ async fn main() {
info!("Starting Tricot");
+ let (exit_signal, provoke_exit) = watch_ctrl_c();
+ let exit_on_err = move |err: anyhow::Error| {
+ error!("Error: {}", err);
+ let _ = provoke_exit.send(true);
+ };
+
let consul_config = consul::ConsulConfig {
addr: opt.consul_addr.clone(),
ca_cert: opt.consul_ca_cert.clone(),
@@ -110,15 +128,25 @@ async fn main() {
let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name)
.expect("Error creating Consul client");
- let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
+ let rx_proxy_config =
+ proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone());
let cert_store = cert_store::CertStore::new(
consul.clone(),
rx_proxy_config.clone(),
opt.letsencrypt_email.clone(),
+ exit_on_err.clone(),
);
- tokio::spawn(http::serve_http(opt.http_bind_addr, consul.clone()).map_err(exit_on_err));
+ let http_task = tokio::spawn(
+ http::serve_http(
+ opt.http_bind_addr,
+ consul.clone(),
+ wait_from(exit_signal.clone()),
+ )
+ .map_err(exit_on_err.clone())
+ .then(|_| async { info!("HTTP server exited") }),
+ );
let https_config = https::HttpsConfig {
bind_addr: opt.https_bind_addr,
@@ -129,12 +157,38 @@ async fn main() {
.map(|x| x.to_string())
.collect(),
};
- tokio::spawn(
- https::serve_https(https_config, cert_store.clone(), rx_proxy_config.clone())
- .map_err(exit_on_err),
+
+ let https_task = tokio::spawn(
+ https::serve_https(
+ https_config,
+ cert_store.clone(),
+ rx_proxy_config.clone(),
+ exit_signal.clone(),
+ )
+ .map_err(exit_on_err.clone())
+ .then(|_| async { info!("HTTPS server exited") }),
);
- while rx_proxy_config.changed().await.is_ok() {
+ let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone()));
+
+ let _ = http_task.await.expect("Tokio task await failure");
+ let _ = https_task.await.expect("Tokio task await failure");
+ let _ = dump_task.await.expect("Tokio task await failure");
+}
+
+async fn dump_config_on_change(
+ mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
+ mut must_exit: watch::Receiver<bool>,
+) {
+ while !*must_exit.borrow() {
+ select!(
+ c = rx_proxy_config.changed() => {
+ if !c.is_ok() {
+ break;
+ }
+ }
+ _ = must_exit.changed() => continue,
+ );
println!("---- PROXY CONFIGURATION ----");
for ent in rx_proxy_config.borrow().entries.iter() {
println!(" {}", ent);
@@ -143,7 +197,26 @@ async fn main() {
}
}
-fn exit_on_err(e: anyhow::Error) {
- error!("{}", e);
- std::process::exit(1);
+/// Creates a watch that contains `false`, and that changes
+/// to `true` when a Ctrl+C signal is received.
+pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ let send_cancel = Arc::new(send_cancel);
+ let send_cancel_2 = send_cancel.clone();
+ tokio::spawn(async move {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ info!("Received CTRL+C, shutting down.");
+ send_cancel.send(true).unwrap();
+ });
+ (watch_cancel, send_cancel_2)
+}
+
+async fn wait_from(mut chan: watch::Receiver<bool>) {
+ while !*chan.borrow() {
+ if chan.changed().await.is_err() {
+ return;
+ }
+ }
}
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index 4add98c..e380885 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -9,7 +9,7 @@ use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use log::*;
-use tokio::{sync::watch, time::sleep};
+use tokio::{select, sync::watch, time::sleep};
use crate::consul::*;
@@ -231,7 +231,10 @@ struct NodeWatchState {
retries: u32,
}
-pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
+pub fn spawn_proxy_config_task(
+ consul: Consul,
+ mut must_exit: watch::Receiver<bool>,
+) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
entries: Vec::new(),
}));
@@ -244,8 +247,13 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
let mut node_site = HashMap::new();
- loop {
- match consul.list_nodes().await {
+ while !*must_exit.borrow() {
+ let list_nodes = select! {
+ ln = consul.list_nodes() => ln,
+ _ = must_exit.changed() => continue,
+ };
+
+ match list_nodes {
Ok(consul_nodes) => {
info!("Watched consul nodes: {:?}", consul_nodes);
for consul_node in consul_nodes {
@@ -271,7 +279,12 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
}
}
- let (node, res): (String, Result<_>) = match watches.next().await {
+ let next_watch = select! {
+ nw = watches.next() => nw,
+ _ = must_exit.changed() => continue,
+ };
+
+ let (node, res): (String, Result<_>) = match next_watch {
Some(v) => v,
None => {
warn!("No nodes currently watched in proxy_config.rs");