diff options
author | Alex Auvolat <alex@adnab.me> | 2023-10-03 18:40:37 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-10-03 18:40:37 +0200 |
commit | 2e656b541b1dd1492798e1ed764fa40868da4d6a (patch) | |
tree | 66fbf4c11a248d3c92cf44c4d3f18663670acbbe /src | |
parent | 9ac1d5be0eba1b3b35f7fb2f99fe8df549044197 (diff) | |
parent | 1243db87f2090a3302c7c8beb386e68ddf9b66b5 (diff) | |
download | garage-2e656b541b1dd1492798e1ed764fa40868da4d6a.tar.gz garage-2e656b541b1dd1492798e1ed764fa40868da4d6a.zip |
Merge branch 'main' into nextv0.9.0-rc1
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 1 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/generic_server.rs | 60 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 6 | ||||
-rw-r--r-- | src/garage/server.rs | 10 | ||||
-rw-r--r-- | src/util/config.rs | 9 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/socket_address.rs | 44 | ||||
-rw-r--r-- | src/web/Cargo.toml | 3 | ||||
-rw-r--r-- | src/web/web_server.rs | 58 |
11 files changed, 169 insertions, 35 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 6425591f..cb9e2e55 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -45,6 +45,7 @@ http = "0.2" httpdate = "1.0" http-range = "0.1" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } +hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } multer = "2.0" percent-encoding = "2.1.0" roxmltree = "0.18" diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index cc04d81f..53503220 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -18,6 +17,7 @@ use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::generic_server::*; @@ -61,12 +61,12 @@ impl AdminApiServer { pub async fn run( self, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, shutdown_signal) + .run_server(bind_addr, Some(0o220), shutdown_signal) .await } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 757b85ec..fa346f48 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -1,4 +1,5 @@ -use std::net::SocketAddr; +use std::fs::{self, Permissions}; +use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use async_trait::async_trait; @@ -11,6 +12,10 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use hyper::{HeaderMap, StatusCode}; +use hyperlocal::UnixServerExt; + +use tokio::net::UnixStream; + use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -21,6 +26,7 @@ use opentelemetry::{ use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; +use garage_util::socket_address::UnixOrTCPSocketAddress; pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; @@ -91,10 +97,11 @@ impl<A: ApiHandler> ApiServer<A> { pub async fn run_server( self: Arc<Self>, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, + unix_bind_addr_mode: Option<u32>, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - let service = make_service_fn(|conn: &AddrStream| { + let tcp_service = make_service_fn(|conn: &AddrStream| { let this = self.clone(); let client_addr = conn.remote_addr(); @@ -102,28 +109,63 @@ impl<A: ApiHandler> ApiServer<A> { Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { let this = this.clone(); - this.handler(req, client_addr) + this.handler(req, client_addr.to_string()) })) } }); - let server = Server::bind(&bind_addr).serve(service); + let unix_service = make_service_fn(|_: &UnixStream| { + let this = self.clone(); + + let path = bind_addr.to_string(); + async move { + Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { + let this = this.clone(); + + this.handler(req, path.clone()) + })) + } + }); - let graceful = server.with_graceful_shutdown(shutdown_signal); info!( - "{} API server listening on http://{}", + "{} API server listening on {}", A::API_NAME_DISPLAY, bind_addr ); - graceful.await?; + match bind_addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + Server::bind(&addr) + .serve(tcp_service) + .with_graceful_shutdown(shutdown_signal) + .await? + } + UnixOrTCPSocketAddress::UnixSocket(ref path) => { + if path.exists() { + fs::remove_file(path)? + } + + let bound = Server::bind_unix(path)?; + + fs::set_permissions( + path, + Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), + )?; + + bound + .serve(unix_service) + .with_graceful_shutdown(shutdown_signal) + .await?; + } + }; + Ok(()) } async fn handler( self: Arc<Self>, req: Request<Body>, - addr: SocketAddr, + addr: String, ) -> Result<Response<Body>, GarageError> { let uri = req.uri().clone(); diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index bb85b2e7..3a032aba 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,4 +1,3 @@ -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -9,6 +8,7 @@ use hyper::{Body, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; @@ -37,12 +37,12 @@ pub(crate) struct K2VApiEndpoint { impl K2VApiServer { pub async fn run( garage: Arc<Garage>, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, shutdown_signal) + .run_server(bind_addr, None, shutdown_signal) .await } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 3f995d34..d675ab61 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,4 +1,3 @@ -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -10,6 +9,7 @@ use hyper::{Body, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; use garage_model::key_table::Key; @@ -46,12 +46,12 @@ pub(crate) struct S3ApiEndpoint { impl S3ApiServer { pub async fn run( garage: Arc<Garage>, - addr: SocketAddr, + addr: UnixOrTCPSocketAddress, s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, shutdown_signal) + .run_server(addr, None, shutdown_signal) .await } diff --git a/src/garage/server.rs b/src/garage/server.rs index 472616c7..3ad10b72 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -79,7 +79,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "S3 API", tokio::spawn(S3ApiServer::run( garage.clone(), - *s3_bind_addr, + s3_bind_addr.clone(), config.s3_api.s3_region.clone(), wait_from(watch_cancel.clone()), )), @@ -94,7 +94,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "K2V API", tokio::spawn(K2VApiServer::run( garage.clone(), - config.k2v_api.as_ref().unwrap().api_bind_addr, + config.k2v_api.as_ref().unwrap().api_bind_addr.clone(), config.s3_api.s3_region.clone(), wait_from(watch_cancel.clone()), )), @@ -110,7 +110,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "Web", tokio::spawn(WebServer::run( garage.clone(), - web_config.bind_addr, + web_config.bind_addr.clone(), web_config.root_domain.clone(), wait_from(watch_cancel.clone()), )), @@ -121,7 +121,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching Admin API server..."); servers.push(( "Admin", - tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))), + tokio::spawn( + admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())), + ), )); } diff --git a/src/util/config.rs b/src/util/config.rs index cf31c87c..ad5c8e1f 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use serde::{de, Deserialize}; use crate::error::Error; +use crate::socket_address::UnixOrTCPSocketAddress; /// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] @@ -129,7 +130,7 @@ pub struct DataDir { #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: Option<SocketAddr>, + pub api_bind_addr: Option<UnixOrTCPSocketAddress>, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -141,14 +142,14 @@ pub struct S3ApiConfig { #[derive(Deserialize, Debug, Clone)] pub struct K2VApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: UnixOrTCPSocketAddress, } /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { /// Address and port to bind for web serving - pub bind_addr: SocketAddr, + pub bind_addr: UnixOrTCPSocketAddress, /// Suffix to remove from domain name to find bucket pub root_domain: String, } @@ -157,7 +158,7 @@ pub struct WebConfig { #[derive(Deserialize, Debug, Clone, Default)] pub struct AdminConfig { /// Address and port to bind for admin API serving - pub api_bind_addr: Option<SocketAddr>, + pub api_bind_addr: Option<UnixOrTCPSocketAddress>, /// Bearer token to use to scrape metrics pub metrics_token: Option<String>, diff --git a/src/util/lib.rs b/src/util/lib.rs index 15f0f829..7df77959 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -14,6 +14,7 @@ pub mod forwarded_headers; pub mod metrics; pub mod migrate; pub mod persister; +pub mod socket_address; pub mod time; pub mod tranquilizer; pub mod version; diff --git a/src/util/socket_address.rs b/src/util/socket_address.rs new file mode 100644 index 00000000..f01225f6 --- /dev/null +++ b/src/util/socket_address.rs @@ -0,0 +1,44 @@ +use std::fmt::{Debug, Display, Formatter}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; + +use serde::de::Error; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, Clone)] +pub enum UnixOrTCPSocketAddress { + TCPSocket(SocketAddr), + UnixSocket(PathBuf), +} + +impl Display for UnixOrTCPSocketAddress { + fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { + match self { + UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address), + UnixOrTCPSocketAddress::UnixSocket(path) => { + write!(formatter, "http+unix://{}", path.to_string_lossy()) + } + } + } +} + +impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let string = String::deserialize(deserializer)?; + let string = string.as_str(); + + if string.starts_with("/") { + Ok(UnixOrTCPSocketAddress::UnixSocket( + PathBuf::from_str(string).map_err(Error::custom)?, + )) + } else { + Ok(UnixOrTCPSocketAddress::TCPSocket( + SocketAddr::from_str(string).map_err(Error::custom)?, + )) + } + } +} diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 6d0eba3a..eec47bcd 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -27,5 +27,8 @@ futures = "0.3" http = "0.2" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } +hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } + +tokio = { version = "1.0", default-features = false, features = ["net"] } opentelemetry = "0.17" diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 287aef1a..73780efb 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -1,4 +1,6 @@ -use std::{convert::Infallible, net::SocketAddr, sync::Arc}; +use std::fs::{self, Permissions}; +use std::os::unix::prelude::PermissionsExt; +use std::{convert::Infallible, sync::Arc}; use futures::future::Future; @@ -9,6 +11,10 @@ use hyper::{ Body, Method, Request, Response, Server, StatusCode, }; +use hyperlocal::UnixServerExt; + +use tokio::net::UnixStream; + use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -32,6 +38,7 @@ use garage_util::data::Uuid; use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; +use garage_util::socket_address::UnixOrTCPSocketAddress; struct WebMetrics { request_counter: Counter<u64>, @@ -69,7 +76,7 @@ impl WebServer { /// Run a web server pub async fn run( garage: Arc<Garage>, - addr: SocketAddr, + addr: UnixOrTCPSocketAddress, root_domain: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { @@ -80,7 +87,7 @@ impl WebServer { root_domain, }); - let service = make_service_fn(|conn: &AddrStream| { + let tcp_service = make_service_fn(|conn: &AddrStream| { let web_server = web_server.clone(); let client_addr = conn.remote_addr(); @@ -88,23 +95,56 @@ impl WebServer { Ok::<_, Error>(service_fn(move |req: Request<Body>| { let web_server = web_server.clone(); - web_server.handle_request(req, client_addr) + web_server.handle_request(req, client_addr.to_string()) + })) + } + }); + + let unix_service = make_service_fn(|_: &UnixStream| { + let web_server = web_server.clone(); + + let path = addr.to_string(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let web_server = web_server.clone(); + + web_server.handle_request(req, path.clone()) })) } }); - let server = Server::bind(&addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Web server listening on http://{}", addr); + info!("Web server listening on {}", addr); + + match addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + Server::bind(&addr) + .serve(tcp_service) + .with_graceful_shutdown(shutdown_signal) + .await? + } + UnixOrTCPSocketAddress::UnixSocket(ref path) => { + if path.exists() { + fs::remove_file(path)? + } + + let bound = Server::bind_unix(path)?; + + fs::set_permissions(path, Permissions::from_mode(0o222))?; + + bound + .serve(unix_service) + .with_graceful_shutdown(shutdown_signal) + .await?; + } + }; - graceful.await?; Ok(()) } async fn handle_request( self: Arc<Self>, req: Request<Body>, - addr: SocketAddr, + addr: String, ) -> Result<Response<Body>, Infallible> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) |