diff options
author | Alex Auvolat <alex@adnab.me> | 2024-08-25 20:04:56 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-08-25 20:04:56 +0200 |
commit | baf32c95759dac1873eaf30bbfa018150f56003a (patch) | |
tree | cb98fd446980292933945dab54ada881ff8971c5 /src/api | |
parent | 3dda1ee4f6ab85d668aaf810667e611832fccc0c (diff) | |
download | garage-baf32c95759dac1873eaf30bbfa018150f56003a.tar.gz garage-baf32c95759dac1873eaf30bbfa018150f56003a.zip |
api servers: kill opened connections after SIGINT after 10s deadline (fix #806)
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/generic_server.rs | 43 |
1 files changed, 28 insertions, 15 deletions
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 9c49fdf3..283abdd4 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -2,6 +2,7 @@ use std::convert::Infallible; use std::fs::{self, Permissions}; use std::os::unix::fs::PermissionsExt; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; @@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use tokio::sync::watch; +use tokio::time::{sleep_until, Instant}; use opentelemetry::{ global, @@ -291,7 +293,7 @@ where let connection_collector = tokio::spawn({ let server_name = server_name.clone(); async move { - let mut connections = FuturesUnordered::new(); + let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new(); loop { let collect_next = async { if connections.is_empty() { @@ -312,23 +314,34 @@ where } } } - if !connections.is_empty() { + let deadline = Instant::now() + Duration::from_secs(10); + while !connections.is_empty() { info!( - "{} server: {} connections still open", + "{} server: {} connections still open, deadline in {:.2}s", server_name, - connections.len() + connections.len(), + (deadline - Instant::now()).as_secs_f32(), ); - while let Some(conn_res) = connections.next().await { - trace!( - "{} server: HTTP connection finished: {:?}", - server_name, - conn_res - ); - info!( - "{} server: {} connections still open", - server_name, - connections.len() - ); + tokio::select! { + conn_res = connections.next() => { + trace!( + "{} server: HTTP connection finished: {:?}", + server_name, + conn_res.unwrap(), + ); + } + _ = sleep_until(deadline) => { + warn!("{} server: exit deadline reached with {} connections still open, killing them now", + server_name, + connections.len()); + for conn in connections.iter() { + conn.abort(); + } + for conn in connections { + assert!(conn.await.unwrap_err().is_cancelled()); + } + break; + } } } } |