aboutsummaryrefslogtreecommitdiff
path: root/src/api/generic_server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-08 23:43:59 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-08 23:43:59 +0100
commit5c63193d1de909cdecc501b482f5dc269a84874d (patch)
treecf34b848df170b3e3289754c77c4a9f61b97a260 /src/api/generic_server.rs
parentbcbd15da84181ad94ece7e794933a6ebd388f5bc (diff)
downloadgarage-5c63193d1de909cdecc501b482f5dc269a84874d.tar.gz
garage-5c63193d1de909cdecc501b482f5dc269a84874d.zip
[dep-upgrade-202402] fix shutdown issue introduced when upgrading hyperdep-upgrade-202402
Diffstat (limited to 'src/api/generic_server.rs')
-rw-r--r--src/api/generic_server.rs88
1 files changed, 52 insertions, 36 deletions
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index 7b37417e..9c49fdf3 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -18,6 +18,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
+use tokio::sync::watch;
use opentelemetry::{
global,
@@ -104,20 +105,17 @@ impl<A: ApiHandler> ApiServer<A> {
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
unix_bind_addr_mode: Option<u32>,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
- info!(
- "{} API server listening on {}",
- A::API_NAME_DISPLAY,
- bind_addr
- );
+ let server_name = format!("{} API", A::API_NAME_DISPLAY);
+ info!("{} server listening on {}", server_name, bind_addr);
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
let listener = TcpListener::bind(addr).await?;
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
@@ -133,7 +131,7 @@ impl<A: ApiHandler> ApiServer<A> {
)?;
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
}
}
@@ -278,9 +276,10 @@ impl Accept for UnixListenerOn {
}
pub async fn server_loop<A, H, F, E>(
+ server_name: String,
listener: A,
handler: H,
- shutdown_signal: impl Future<Output = ()>,
+ mut must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError>
where
A: Accept,
@@ -288,42 +287,57 @@ where
F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
E: Send + Sync + std::error::Error + 'static,
{
- tokio::pin!(shutdown_signal);
-
let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
- let connection_collector = tokio::spawn(async move {
- let mut collection = FuturesUnordered::new();
- loop {
- let collect_next = async {
- if collection.is_empty() {
- futures::future::pending().await
- } else {
- collection.next().await
- }
- };
- tokio::select! {
- result = collect_next => {
- trace!("HTTP connection finished: {:?}", result);
- }
- new_fut = conn_out.recv() => {
- match new_fut {
- Some(f) => collection.push(f),
- None => break,
+ let connection_collector = tokio::spawn({
+ let server_name = server_name.clone();
+ async move {
+ let mut connections = FuturesUnordered::new();
+ loop {
+ let collect_next = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ tokio::select! {
+ result = collect_next => {
+ trace!("{} server: HTTP connection finished: {:?}", server_name, result);
+ }
+ new_fut = conn_out.recv() => {
+ match new_fut {
+ Some(f) => connections.push(f),
+ None => break,
+ }
}
}
}
+ if !connections.is_empty() {
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ while let Some(conn_res) = connections.next().await {
+ trace!(
+ "{} server: HTTP connection finished: {:?}",
+ server_name,
+ conn_res
+ );
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ }
+ }
}
- debug!("Collecting last open HTTP connections.");
- while let Some(conn_res) = collection.next().await {
- trace!("HTTP connection finished: {:?}", conn_res);
- }
- debug!("No more HTTP connections to collect");
});
- loop {
+ while !*must_exit.borrow() {
let (stream, client_addr) = tokio::select! {
acc = listener.accept() => acc?,
- _ = &mut shutdown_signal => break,
+ _ = must_exit.changed() => continue,
};
let io = TokioIo::new(stream);
@@ -343,6 +357,8 @@ where
conn_in.send(fut)?;
}
+ info!("{} server exiting", server_name);
+ drop(conn_in);
connection_collector.await?;
Ok(())