From ad5ce968d212d951f7459c36bcbdd78ce39be585 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 8 Feb 2024 18:57:18 +0100 Subject: [dep-upgrade-202402] remove useless mut --- src/api/s3/put.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 17424862..5ac5fb6b 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -7,7 +7,7 @@ use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; -use hyper::body::{Body, Bytes}; +use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; -- cgit v1.2.3 From 5c63193d1de909cdecc501b482f5dc269a84874d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 8 Feb 2024 23:43:59 +0100 Subject: [dep-upgrade-202402] fix shutdown issue introduced when upgrading hyper --- src/api/admin/api_server.rs | 6 ++-- src/api/generic_server.rs | 88 ++++++++++++++++++++++++++------------------- src/api/k2v/api_server.rs | 6 ++-- src/api/s3/api_server.rs | 6 ++-- 4 files changed, 61 insertions(+), 45 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index d5e1c777..50813d11 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use tokio::sync::watch; use opentelemetry::trace::SpanRef; @@ -65,11 +65,11 @@ impl AdminApiServer { pub async fn run( self, bind_addr: UnixOrTCPSocketAddress, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, Some(0o220), shutdown_signal) + .run_server(bind_addr, Some(0o220), must_exit) .await } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 7b37417e..9c49fdf3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -18,6 +18,7 @@ use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; +use tokio::sync::watch; use opentelemetry::{ global, @@ -104,20 +105,17 @@ impl ApiServer { self: Arc, bind_addr: UnixOrTCPSocketAddress, unix_bind_addr_mode: Option, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { - info!( - "{} API server listening on {}", - A::API_NAME_DISPLAY, - bind_addr - ); + let server_name = format!("{} API", A::API_NAME_DISPLAY); + info!("{} server listening on {}", server_name, bind_addr); match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -133,7 +131,7 @@ impl ApiServer { )?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } } } @@ -278,9 +276,10 @@ impl Accept for UnixListenerOn { } pub async fn server_loop( + server_name: String, listener: A, handler: H, - shutdown_signal: impl Future, + mut must_exit: watch::Receiver, ) -> Result<(), GarageError> where A: Accept, @@ -288,42 +287,57 @@ where F: Future>, http::Error>> + Send + 'static, E: Send + Sync + std::error::Error + 'static, { - tokio::pin!(shutdown_signal); - let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); - let connection_collector = tokio::spawn(async move { - let mut collection = FuturesUnordered::new(); - loop { - let collect_next = async { - if collection.is_empty() { - futures::future::pending().await - } else { - collection.next().await - } - }; - tokio::select! { - result = collect_next => { - trace!("HTTP connection finished: {:?}", result); - } - new_fut = conn_out.recv() => { - match new_fut { - Some(f) => collection.push(f), - None => break, + let connection_collector = tokio::spawn({ + let server_name = server_name.clone(); + async move { + let mut connections = FuturesUnordered::new(); + loop { + let collect_next = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("{} server: HTTP connection finished: {:?}", server_name, result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => connections.push(f), + None => break, + } } } } + if !connections.is_empty() { + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + while let Some(conn_res) = connections.next().await { + trace!( + "{} server: HTTP connection finished: {:?}", + server_name, + conn_res + ); + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + } + } } - debug!("Collecting last open HTTP connections."); - while let Some(conn_res) = collection.next().await { - trace!("HTTP connection finished: {:?}", conn_res); - } - debug!("No more HTTP connections to collect"); }); - loop { + while !*must_exit.borrow() { let (stream, client_addr) = tokio::select! { acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, + _ = must_exit.changed() => continue, }; let io = TokioIo::new(stream); @@ -343,6 +357,8 @@ where conn_in.send(fut)?; } + info!("{} server exiting", server_name); + drop(conn_in); connection_collector.await?; Ok(()) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 128742c4..e97da2af 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -42,10 +42,10 @@ impl K2VApiServer { garage: Arc, bind_addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, None, shutdown_signal) + .run_server(bind_addr, None, must_exit) .await } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 495c5832..4b815f79 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -51,10 +51,10 @@ impl S3ApiServer { garage: Arc, addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future, + must_exit: watch::Receiver, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, None, shutdown_signal) + .run_server(addr, None, must_exit) .await } -- cgit v1.2.3