diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/admin/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/generic_server.rs | 88 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/s3/put.rs | 2 | ||||
-rw-r--r-- | src/garage/cli/convert_db.rs | 4 | ||||
-rw-r--r-- | src/garage/secrets.rs | 4 | ||||
-rw-r--r-- | src/garage/server.rs | 13 | ||||
-rw-r--r-- | src/garage/tests/common/custom_requester.rs | 4 | ||||
-rw-r--r-- | src/garage/tests/s3/website.rs | 6 | ||||
-rw-r--r-- | src/k2v-client/lib.rs | 2 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 14 | ||||
-rw-r--r-- | src/model/index_counter.rs | 4 | ||||
-rw-r--r-- | src/web/web_server.rs | 12 |
14 files changed, 90 insertions, 81 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index d5e1c777..50813d11 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use tokio::sync::watch; use opentelemetry::trace::SpanRef; @@ -65,11 +65,11 @@ impl AdminApiServer { pub async fn run( self, bind_addr: UnixOrTCPSocketAddress, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, Some(0o220), shutdown_signal) + .run_server(bind_addr, Some(0o220), must_exit) .await } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 7b37417e..9c49fdf3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -18,6 +18,7 @@ use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; +use tokio::sync::watch; use opentelemetry::{ global, @@ -104,20 +105,17 @@ impl<A: ApiHandler> ApiServer<A> { self: Arc<Self>, bind_addr: UnixOrTCPSocketAddress, unix_bind_addr_mode: Option<u32>, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { - info!( - "{} API server listening on {}", - A::API_NAME_DISPLAY, - bind_addr - ); + let server_name = format!("{} API", A::API_NAME_DISPLAY); + info!("{} server listening on {}", server_name, bind_addr); match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -133,7 +131,7 @@ impl<A: ApiHandler> ApiServer<A> { )?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } } } @@ -278,9 +276,10 @@ impl Accept for UnixListenerOn { } pub async fn server_loop<A, H, F, E>( + server_name: String, listener: A, handler: H, - shutdown_signal: impl Future<Output = ()>, + mut must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> where A: Accept, @@ -288,42 +287,57 @@ where F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static, E: Send + Sync + std::error::Error + 'static, { - tokio::pin!(shutdown_signal); - let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); - let connection_collector = tokio::spawn(async move { - let mut collection = FuturesUnordered::new(); - loop { - let collect_next = async { - if collection.is_empty() { - futures::future::pending().await - } else { - collection.next().await - } - }; - tokio::select! { - result = collect_next => { - trace!("HTTP connection finished: {:?}", result); - } - new_fut = conn_out.recv() => { - match new_fut { - Some(f) => collection.push(f), - None => break, + let connection_collector = tokio::spawn({ + let server_name = server_name.clone(); + async move { + let mut connections = FuturesUnordered::new(); + loop { + let collect_next = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + tokio::select! { + result = collect_next => { + trace!("{} server: HTTP connection finished: {:?}", server_name, result); + } + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => connections.push(f), + None => break, + } } } } + if !connections.is_empty() { + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + while let Some(conn_res) = connections.next().await { + trace!( + "{} server: HTTP connection finished: {:?}", + server_name, + conn_res + ); + info!( + "{} server: {} connections still open", + server_name, + connections.len() + ); + } + } } - debug!("Collecting last open HTTP connections."); - while let Some(conn_res) = collection.next().await { - trace!("HTTP connection finished: {:?}", conn_res); - } - debug!("No more HTTP connections to collect"); }); - loop { + while !*must_exit.borrow() { let (stream, client_addr) = tokio::select! { acc = listener.accept() => acc?, - _ = &mut shutdown_signal => break, + _ = must_exit.changed() => continue, }; let io = TokioIo::new(stream); @@ -343,6 +357,8 @@ where conn_in.send(fut)?; } + info!("{} server exiting", server_name); + drop(conn_in); connection_collector.await?; Ok(()) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 128742c4..e97da2af 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -42,10 +42,10 @@ impl K2VApiServer { garage: Arc<Garage>, bind_addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, None, shutdown_signal) + .run_server(bind_addr, None, must_exit) .await } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 495c5832..4b815f79 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::future::Future; use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; +use tokio::sync::watch; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -51,10 +51,10 @@ impl S3ApiServer { garage: Arc<Garage>, addr: UnixOrTCPSocketAddress, s3_region: String, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, None, shutdown_signal) + .run_server(addr, None, must_exit) .await } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 17424862..5ac5fb6b 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -7,7 +7,7 @@ use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; -use hyper::body::{Body, Bytes}; +use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/convert_db.rs index 044ccbb9..6b854ccb 100644 --- a/src/garage/cli/convert_db.rs +++ b/src/garage/cli/convert_db.rs @@ -67,8 +67,8 @@ fn open_db(path: PathBuf, engine: Engine, open: &OpenDbOpt) -> Result<Db> { #[cfg(feature = "sqlite")] Engine::Sqlite => { let db = sqlite_adapter::rusqlite::Connection::open(&path)?; - db.pragma_update(None, "journal_mode", &"WAL")?; - db.pragma_update(None, "synchronous", &"NORMAL")?; + db.pragma_update(None, "journal_mode", "WAL")?; + db.pragma_update(None, "synchronous", "NORMAL")?; Ok(sqlite_adapter::SqliteDb::init(db)) } #[cfg(feature = "lmdb")] diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs index e96be9e4..8c89a262 100644 --- a/src/garage/secrets.rs +++ b/src/garage/secrets.rs @@ -213,10 +213,10 @@ mod tests { }; use std::os::unix::fs::PermissionsExt; - let metadata = std::fs::metadata(&path_secret_path)?; + let metadata = std::fs::metadata(path_secret_path)?; let mut perm = metadata.permissions(); perm.set_mode(0o660); - std::fs::set_permissions(&path_secret_path, perm)?; + std::fs::set_permissions(path_secret_path, perm)?; // Config file that just specifies the path let config = read_config(path_config.to_path_buf())?; diff --git a/src/garage/server.rs b/src/garage/server.rs index de8ac9e2..51b06b8e 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -88,7 +88,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er garage.clone(), s3_bind_addr.clone(), config.s3_api.s3_region.clone(), - wait_from(watch_cancel.clone()), + watch_cancel.clone(), )), )); } @@ -103,7 +103,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er garage.clone(), config.k2v_api.as_ref().unwrap().api_bind_addr.clone(), config.s3_api.s3_region.clone(), - wait_from(watch_cancel.clone()), + watch_cancel.clone(), )), )); } @@ -116,10 +116,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone()); servers.push(( "Web", - tokio::spawn(web_server.run( - web_config.bind_addr.clone(), - wait_from(watch_cancel.clone()), - )), + tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())), )); } @@ -127,9 +124,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching Admin API server..."); servers.push(( "Admin", - tokio::spawn( - admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())), - ), + tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())), )); } diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 72fb1a46..e5f4cca1 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -205,8 +205,8 @@ impl<'a> RequestBuilder<'a> { all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone()); let mut signed_headers = all_headers - .iter() - .map(|(k, _)| k.as_ref()) + .keys() + .map(|k| k.as_ref()) .collect::<Vec<&str>>(); signed_headers.sort(); let signed_headers = signed_headers.join(";"); diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs index 19f53fcd..0cadc388 100644 --- a/src/garage/tests/s3/website.rs +++ b/src/garage/tests/s3/website.rs @@ -61,8 +61,7 @@ async fn test_website() { .method("GET") .uri(format!( "http://127.0.0.1:{0}/check?domain={1}", - ctx.garage.admin_port, - BCKT_NAME.to_string() + ctx.garage.admin_port, BCKT_NAME )) .body(Body::new(Bytes::new())) .unwrap() @@ -136,8 +135,7 @@ async fn test_website() { .method("GET") .uri(format!( "http://127.0.0.1:{0}/check?domain={1}", - ctx.garage.admin_port, - BCKT_NAME.to_string() + ctx.garage.admin_port, BCKT_NAME )) .body(Body::new(Bytes::new())) .unwrap() diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index 13538909..852274a7 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -10,7 +10,7 @@ use http::header::{ACCEPT, CONTENT_TYPE}; use http::status::StatusCode; use http::{HeaderName, HeaderValue, Request}; use http_body_util::{BodyExt, Full as FullBody}; -use hyper::{body::Body as BodyTrait, body::Bytes}; +use hyper::body::Bytes; use hyper_rustls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; use hyper_util::rt::TokioExecutor; diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 576d03f3..e9842a91 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -126,7 +126,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); let alias_ts = increment_logical_clock_2( bucket_p.aliases.get_timestamp(alias_name), @@ -163,7 +163,7 @@ impl<'a> BucketHelper<'a> { alias_name: &String, ) -> Result<(), Error> { let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut bucket_state = bucket.state.as_option_mut().unwrap(); + let bucket_state = bucket.state.as_option_mut().unwrap(); let mut alias = self .0 @@ -245,7 +245,7 @@ impl<'a> BucketHelper<'a> { self.0.bucket_alias_table.insert(&alias).await?; } - if let Some(mut bucket_state) = bucket.state.as_option_mut() { + if let Some(bucket_state) = bucket.state.as_option_mut() { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); self.0.bucket_table.insert(&bucket).await?; } @@ -274,7 +274,7 @@ impl<'a> BucketHelper<'a> { let mut bucket = self.get_existing_bucket(bucket_id).await?; let mut key = key_helper.get_existing_key(key_id).await?; - let mut key_param = key.state.as_option_mut().unwrap(); + let key_param = key.state.as_option_mut().unwrap(); if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) { if *existing_alias != bucket_id { @@ -283,7 +283,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, add alias - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); // Calculate the timestamp to assign to this aliasing in the two local_aliases maps @@ -326,7 +326,7 @@ impl<'a> BucketHelper<'a> { let mut bucket = self.get_existing_bucket(bucket_id).await?; let mut key = key_helper.get_existing_key(key_id).await?; - let mut bucket_p = bucket.state.as_option_mut().unwrap(); + let bucket_p = bucket.state.as_option_mut().unwrap(); if key .state @@ -359,7 +359,7 @@ impl<'a> BucketHelper<'a> { } // Checks ok, remove alias - let mut key_param = key.state.as_option_mut().unwrap(); + let key_param = key.state.as_option_mut().unwrap(); let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone()); let alias_ts = increment_logical_clock_2( diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index a46c165f..c0bf38d8 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -232,7 +232,7 @@ impl<T: CountedItem> IndexCounter<T> { let now = now_msec(); for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + let ent = entry.values.entry(s.to_string()).or_insert((0, 0)); ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -348,7 +348,7 @@ impl<T: CountedItem> IndexCounter<T> { }, }; for (s, v) in counts.iter() { - let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + let tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 += v; } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 766e3829..269f37f2 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -2,7 +2,8 @@ use std::fs::{self, Permissions}; use std::os::unix::prelude::PermissionsExt; use std::{convert::Infallible, sync::Arc}; -use futures::future::Future; +use tokio::net::{TcpListener, UnixListener}; +use tokio::sync::watch; use hyper::{ body::Incoming as IncomingBody, @@ -10,8 +11,6 @@ use hyper::{ Method, Request, Response, StatusCode, }; -use tokio::net::{TcpListener, UnixListener}; - use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -84,8 +83,9 @@ impl WebServer { pub async fn run( self: Arc<Self>, bind_addr: UnixOrTCPSocketAddress, - shutdown_signal: impl Future<Output = ()>, + must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { + let server_name = "Web".into(); info!("Web server listening on {}", bind_addr); match bind_addr { @@ -94,7 +94,7 @@ impl WebServer { let handler = move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { @@ -108,7 +108,7 @@ impl WebServer { let handler = move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); - server_loop(listener, handler, shutdown_signal).await + server_loop(server_name, listener, handler, must_exit).await } } } |