aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs6
-rw-r--r--src/api/generic_server.rs88
-rw-r--r--src/api/k2v/api_server.rs6
-rw-r--r--src/api/s3/api_server.rs6
-rw-r--r--src/api/s3/put.rs2
-rw-r--r--src/garage/cli/convert_db.rs4
-rw-r--r--src/garage/secrets.rs4
-rw-r--r--src/garage/server.rs13
-rw-r--r--src/garage/tests/common/custom_requester.rs4
-rw-r--r--src/garage/tests/s3/website.rs6
-rw-r--r--src/k2v-client/lib.rs2
-rw-r--r--src/model/helper/bucket.rs14
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/web/web_server.rs12
14 files changed, 90 insertions, 81 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index d5e1c777..50813d11 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -3,9 +3,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
@@ -65,11 +65,11 @@ impl AdminApiServer {
pub async fn run(
self,
bind_addr: UnixOrTCPSocketAddress,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
- .run_server(bind_addr, Some(0o220), shutdown_signal)
+ .run_server(bind_addr, Some(0o220), must_exit)
.await
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index 7b37417e..9c49fdf3 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -18,6 +18,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
+use tokio::sync::watch;
use opentelemetry::{
global,
@@ -104,20 +105,17 @@ impl<A: ApiHandler> ApiServer<A> {
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
unix_bind_addr_mode: Option<u32>,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
- info!(
- "{} API server listening on {}",
- A::API_NAME_DISPLAY,
- bind_addr
- );
+ let server_name = format!("{} API", A::API_NAME_DISPLAY);
+ info!("{} server listening on {}", server_name, bind_addr);
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
let listener = TcpListener::bind(addr).await?;
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
@@ -133,7 +131,7 @@ impl<A: ApiHandler> ApiServer<A> {
)?;
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
}
}
@@ -278,9 +276,10 @@ impl Accept for UnixListenerOn {
}
pub async fn server_loop<A, H, F, E>(
+ server_name: String,
listener: A,
handler: H,
- shutdown_signal: impl Future<Output = ()>,
+ mut must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError>
where
A: Accept,
@@ -288,42 +287,57 @@ where
F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
E: Send + Sync + std::error::Error + 'static,
{
- tokio::pin!(shutdown_signal);
-
let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
- let connection_collector = tokio::spawn(async move {
- let mut collection = FuturesUnordered::new();
- loop {
- let collect_next = async {
- if collection.is_empty() {
- futures::future::pending().await
- } else {
- collection.next().await
- }
- };
- tokio::select! {
- result = collect_next => {
- trace!("HTTP connection finished: {:?}", result);
- }
- new_fut = conn_out.recv() => {
- match new_fut {
- Some(f) => collection.push(f),
- None => break,
+ let connection_collector = tokio::spawn({
+ let server_name = server_name.clone();
+ async move {
+ let mut connections = FuturesUnordered::new();
+ loop {
+ let collect_next = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ tokio::select! {
+ result = collect_next => {
+ trace!("{} server: HTTP connection finished: {:?}", server_name, result);
+ }
+ new_fut = conn_out.recv() => {
+ match new_fut {
+ Some(f) => connections.push(f),
+ None => break,
+ }
}
}
}
+ if !connections.is_empty() {
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ while let Some(conn_res) = connections.next().await {
+ trace!(
+ "{} server: HTTP connection finished: {:?}",
+ server_name,
+ conn_res
+ );
+ info!(
+ "{} server: {} connections still open",
+ server_name,
+ connections.len()
+ );
+ }
+ }
}
- debug!("Collecting last open HTTP connections.");
- while let Some(conn_res) = collection.next().await {
- trace!("HTTP connection finished: {:?}", conn_res);
- }
- debug!("No more HTTP connections to collect");
});
- loop {
+ while !*must_exit.borrow() {
let (stream, client_addr) = tokio::select! {
acc = listener.accept() => acc?,
- _ = &mut shutdown_signal => break,
+ _ = must_exit.changed() => continue,
};
let io = TokioIo::new(stream);
@@ -343,6 +357,8 @@ where
conn_in.send(fut)?;
}
+ info!("{} server exiting", server_name);
+ drop(conn_in);
connection_collector.await?;
Ok(())
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 128742c4..e97da2af 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -2,8 +2,8 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -42,10 +42,10 @@ impl K2VApiServer {
garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, K2VApiServer { garage })
- .run_server(bind_addr, None, shutdown_signal)
+ .run_server(bind_addr, None, must_exit)
.await
}
}
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 495c5832..4b815f79 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::Future;
use hyper::header;
use hyper::{body::Incoming as IncomingBody, Request, Response};
+use tokio::sync::watch;
use opentelemetry::{trace::SpanRef, KeyValue};
@@ -51,10 +51,10 @@ impl S3ApiServer {
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
s3_region: String,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, S3ApiServer { garage })
- .run_server(addr, None, shutdown_signal)
+ .run_server(addr, None, must_exit)
.await
}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 17424862..5ac5fb6b 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -7,7 +7,7 @@ use futures::try_join;
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
-use hyper::body::{Body, Bytes};
+use hyper::body::Bytes;
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/convert_db.rs
index 044ccbb9..6b854ccb 100644
--- a/src/garage/cli/convert_db.rs
+++ b/src/garage/cli/convert_db.rs
@@ -67,8 +67,8 @@ fn open_db(path: PathBuf, engine: Engine, open: &OpenDbOpt) -> Result<Db> {
#[cfg(feature = "sqlite")]
Engine::Sqlite => {
let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
- db.pragma_update(None, "journal_mode", &"WAL")?;
- db.pragma_update(None, "synchronous", &"NORMAL")?;
+ db.pragma_update(None, "journal_mode", "WAL")?;
+ db.pragma_update(None, "synchronous", "NORMAL")?;
Ok(sqlite_adapter::SqliteDb::init(db))
}
#[cfg(feature = "lmdb")]
diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs
index e96be9e4..8c89a262 100644
--- a/src/garage/secrets.rs
+++ b/src/garage/secrets.rs
@@ -213,10 +213,10 @@ mod tests {
};
use std::os::unix::fs::PermissionsExt;
- let metadata = std::fs::metadata(&path_secret_path)?;
+ let metadata = std::fs::metadata(path_secret_path)?;
let mut perm = metadata.permissions();
perm.set_mode(0o660);
- std::fs::set_permissions(&path_secret_path, perm)?;
+ std::fs::set_permissions(path_secret_path, perm)?;
// Config file that just specifies the path
let config = read_config(path_config.to_path_buf())?;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index de8ac9e2..51b06b8e 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -88,7 +88,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
garage.clone(),
s3_bind_addr.clone(),
config.s3_api.s3_region.clone(),
- wait_from(watch_cancel.clone()),
+ watch_cancel.clone(),
)),
));
}
@@ -103,7 +103,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
garage.clone(),
config.k2v_api.as_ref().unwrap().api_bind_addr.clone(),
config.s3_api.s3_region.clone(),
- wait_from(watch_cancel.clone()),
+ watch_cancel.clone(),
)),
));
}
@@ -116,10 +116,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
servers.push((
"Web",
- tokio::spawn(web_server.run(
- web_config.bind_addr.clone(),
- wait_from(watch_cancel.clone()),
- )),
+ tokio::spawn(web_server.run(web_config.bind_addr.clone(), watch_cancel.clone())),
));
}
@@ -127,9 +124,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching Admin API server...");
servers.push((
"Admin",
- tokio::spawn(
- admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())),
- ),
+ tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())),
));
}
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 72fb1a46..e5f4cca1 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -205,8 +205,8 @@ impl<'a> RequestBuilder<'a> {
all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone());
let mut signed_headers = all_headers
- .iter()
- .map(|(k, _)| k.as_ref())
+ .keys()
+ .map(|k| k.as_ref())
.collect::<Vec<&str>>();
signed_headers.sort();
let signed_headers = signed_headers.join(";");
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index 19f53fcd..0cadc388 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -61,8 +61,7 @@ async fn test_website() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
+ ctx.garage.admin_port, BCKT_NAME
))
.body(Body::new(Bytes::new()))
.unwrap()
@@ -136,8 +135,7 @@ async fn test_website() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
+ ctx.garage.admin_port, BCKT_NAME
))
.body(Body::new(Bytes::new()))
.unwrap()
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index 13538909..852274a7 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -10,7 +10,7 @@ use http::header::{ACCEPT, CONTENT_TYPE};
use http::status::StatusCode;
use http::{HeaderName, HeaderValue, Request};
use http_body_util::{BodyExt, Full as FullBody};
-use hyper::{body::Body as BodyTrait, body::Bytes};
+use hyper::body::Bytes;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
use hyper_util::rt::TokioExecutor;
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 576d03f3..e9842a91 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -126,7 +126,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, add alias
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
let alias_ts = increment_logical_clock_2(
bucket_p.aliases.get_timestamp(alias_name),
@@ -163,7 +163,7 @@ impl<'a> BucketHelper<'a> {
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut bucket_state = bucket.state.as_option_mut().unwrap();
+ let bucket_state = bucket.state.as_option_mut().unwrap();
let mut alias = self
.0
@@ -245,7 +245,7 @@ impl<'a> BucketHelper<'a> {
self.0.bucket_alias_table.insert(&alias).await?;
}
- if let Some(mut bucket_state) = bucket.state.as_option_mut() {
+ if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
}
@@ -274,7 +274,7 @@ impl<'a> BucketHelper<'a> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
- let mut key_param = key.state.as_option_mut().unwrap();
+ let key_param = key.state.as_option_mut().unwrap();
if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) {
if *existing_alias != bucket_id {
@@ -283,7 +283,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, add alias
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
@@ -326,7 +326,7 @@ impl<'a> BucketHelper<'a> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
- let mut bucket_p = bucket.state.as_option_mut().unwrap();
+ let bucket_p = bucket.state.as_option_mut().unwrap();
if key
.state
@@ -359,7 +359,7 @@ impl<'a> BucketHelper<'a> {
}
// Checks ok, remove alias
- let mut key_param = key.state.as_option_mut().unwrap();
+ let key_param = key.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
let alias_ts = increment_logical_clock_2(
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index a46c165f..c0bf38d8 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -232,7 +232,7 @@ impl<T: CountedItem> IndexCounter<T> {
let now = now_msec();
for (s, inc) in counts.iter() {
- let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ let ent = entry.values.entry(s.to_string()).or_insert((0, 0));
ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -348,7 +348,7 @@ impl<T: CountedItem> IndexCounter<T> {
},
};
for (s, v) in counts.iter() {
- let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ let tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 += v;
}
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 766e3829..269f37f2 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -2,7 +2,8 @@ use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc};
-use futures::future::Future;
+use tokio::net::{TcpListener, UnixListener};
+use tokio::sync::watch;
use hyper::{
body::Incoming as IncomingBody,
@@ -10,8 +11,6 @@ use hyper::{
Method, Request, Response, StatusCode,
};
-use tokio::net::{TcpListener, UnixListener};
-
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
@@ -84,8 +83,9 @@ impl WebServer {
pub async fn run(
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
- shutdown_signal: impl Future<Output = ()>,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
+ let server_name = "Web".into();
info!("Web server listening on {}", bind_addr);
match bind_addr {
@@ -94,7 +94,7 @@ impl WebServer {
let handler =
move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() {
@@ -108,7 +108,7 @@ impl WebServer {
let handler =
move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
- server_loop(listener, handler, shutdown_signal).await
+ server_loop(server_name, listener, handler, must_exit).await
}
}
}