From 6086a3fa072571c9f2a9903175a9ca36ee497f0d Mon Sep 17 00:00:00 2001 From: networkException Date: Fri, 29 Sep 2023 18:36:50 +0200 Subject: cargo: add hyperlocal as a dependency --- src/api/Cargo.toml | 1 + src/web/Cargo.toml | 3 +++ 2 files changed, 4 insertions(+) (limited to 'src') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 6425591f..cb9e2e55 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -45,6 +45,7 @@ http = "0.2" httpdate = "1.0" http-range = "0.1" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } +hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } multer = "2.0" percent-encoding = "2.1.0" roxmltree = "0.18" diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 6d0eba3a..eec47bcd 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -27,5 +27,8 @@ futures = "0.3" http = "0.2" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } +hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } + +tokio = { version = "1.0", default-features = false, features = ["net"] } opentelemetry = "0.17" -- cgit v1.2.3 From 10195f1567e82a630e808bc2700424bce28238c0 Mon Sep 17 00:00:00 2001 From: networkException Date: Fri, 29 Sep 2023 18:37:36 +0200 Subject: util: add helper sum type for unix and tcp socket addresses this patch introduces a new sum type that can represent either a tcp socket address or a unix domain socket path. --- src/util/lib.rs | 1 + src/util/socket_address.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 src/util/socket_address.rs (limited to 'src') diff --git a/src/util/lib.rs b/src/util/lib.rs index 15f0f829..7df77959 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -14,6 +14,7 @@ pub mod forwarded_headers; pub mod metrics; pub mod migrate; pub mod persister; +pub mod socket_address; pub mod time; pub mod tranquilizer; pub mod version; diff --git a/src/util/socket_address.rs b/src/util/socket_address.rs new file mode 100644 index 00000000..f01225f6 --- /dev/null +++ b/src/util/socket_address.rs @@ -0,0 +1,44 @@ +use std::fmt::{Debug, Display, Formatter}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; + +use serde::de::Error; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, Clone)] +pub enum UnixOrTCPSocketAddress { + TCPSocket(SocketAddr), + UnixSocket(PathBuf), +} + +impl Display for UnixOrTCPSocketAddress { + fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { + match self { + UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address), + UnixOrTCPSocketAddress::UnixSocket(path) => { + write!(formatter, "http+unix://{}", path.to_string_lossy()) + } + } + } +} + +impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let string = String::deserialize(deserializer)?; + let string = string.as_str(); + + if string.starts_with("/") { + Ok(UnixOrTCPSocketAddress::UnixSocket( + PathBuf::from_str(string).map_err(Error::custom)?, + )) + } else { + Ok(UnixOrTCPSocketAddress::TCPSocket( + SocketAddr::from_str(string).map_err(Error::custom)?, + )) + } + } +} -- cgit v1.2.3 From 7353038a64cc53cc01c4ec7f21671d3443177707 Mon Sep 17 00:00:00 2001 From: networkException Date: Fri, 29 Sep 2023 18:38:30 +0200 Subject: config: allow using paths for unix domain sockets in various places this patch updates the config format to also allow paths in bind addresses for unix domain sockets. this has been added to all apis except rpc. --- src/util/config.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/util/config.rs b/src/util/config.rs index 724f9404..ebcb5fbe 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use serde::{de, Deserialize}; use crate::error::Error; +use crate::socket_address::UnixOrTCPSocketAddress; /// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] @@ -102,7 +103,7 @@ pub struct Config { #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: Option, + pub api_bind_addr: Option, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -114,14 +115,14 @@ pub struct S3ApiConfig { #[derive(Deserialize, Debug, Clone)] pub struct K2VApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: UnixOrTCPSocketAddress, } /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { /// Address and port to bind for web serving - pub bind_addr: SocketAddr, + pub bind_addr: UnixOrTCPSocketAddress, /// Suffix to remove from domain name to find bucket pub root_domain: String, } @@ -130,7 +131,7 @@ pub struct WebConfig { #[derive(Deserialize, Debug, Clone, Default)] pub struct AdminConfig { /// Address and port to bind for admin API serving - pub api_bind_addr: Option, + pub api_bind_addr: Option, /// Bearer token to use to scrape metrics pub metrics_token: Option, -- cgit v1.2.3 From 8ec6a53b35e5bb742640a77ff282f2474c63af54 Mon Sep 17 00:00:00 2001 From: networkException Date: Fri, 29 Sep 2023 18:41:00 +0200 Subject: everywhere: support unix sockets when binding in various places this patch implements binding to paths as a unix socket for generic server and web server. --- src/api/admin/api_server.rs | 4 ++-- src/api/generic_server.rs | 56 ++++++++++++++++++++++++++++++++++++------- src/api/k2v/api_server.rs | 4 ++-- src/api/s3/api_server.rs | 4 ++-- src/garage/server.rs | 10 ++++---- src/web/web_server.rs | 58 ++++++++++++++++++++++++++++++++++++++------- 6 files changed, 108 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 50c79120..43497bad 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -18,6 +17,7 @@ use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::generic_server::*; @@ -61,7 +61,7 @@ impl AdminApiServer { pub async fn run( self, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, shutdown_signal: impl Future, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 757b85ec..42c65ab7 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -1,4 +1,5 @@ -use std::net::SocketAddr; +use std::fs::{self, Permissions}; +use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use async_trait::async_trait; @@ -11,6 +12,10 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use hyper::{HeaderMap, StatusCode}; +use hyperlocal::UnixServerExt; + +use tokio::net::UnixStream; + use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -21,6 +26,7 @@ use opentelemetry::{ use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; +use garage_util::socket_address::UnixOrTCPSocketAddress; pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; @@ -91,10 +97,10 @@ impl ApiServer { pub async fn run_server( self: Arc, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, shutdown_signal: impl Future, ) -> Result<(), GarageError> { - let service = make_service_fn(|conn: &AddrStream| { + let tcp_service = make_service_fn(|conn: &AddrStream| { let this = self.clone(); let client_addr = conn.remote_addr(); @@ -102,28 +108,60 @@ impl ApiServer { Ok::<_, GarageError>(service_fn(move |req: Request| { let this = this.clone(); - this.handler(req, client_addr) + this.handler(req, client_addr.to_string()) })) } }); - let server = Server::bind(&bind_addr).serve(service); + let unix_service = make_service_fn(|_: &UnixStream| { + let this = self.clone(); + + let path = bind_addr.to_string(); + async move { + Ok::<_, GarageError>(service_fn(move |req: Request| { + let this = this.clone(); + + this.handler(req, path.clone()) + })) + } + }); - let graceful = server.with_graceful_shutdown(shutdown_signal); info!( - "{} API server listening on http://{}", + "{} API server listening on {}", A::API_NAME_DISPLAY, bind_addr ); - graceful.await?; + match bind_addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + Server::bind(&addr) + .serve(tcp_service) + .with_graceful_shutdown(shutdown_signal) + .await? + } + UnixOrTCPSocketAddress::UnixSocket(ref path) => { + if path.exists() { + fs::remove_file(path)? + } + + let bound = Server::bind_unix(path)?; + + fs::set_permissions(path, Permissions::from_mode(0o222))?; + + bound + .serve(unix_service) + .with_graceful_shutdown(shutdown_signal) + .await?; + } + }; + Ok(()) } async fn handler( self: Arc, req: Request, - addr: SocketAddr, + addr: String, ) -> Result, GarageError> { let uri = req.uri().clone(); diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index bb85b2e7..413fd61c 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,4 +1,3 @@ -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -9,6 +8,7 @@ use hyper::{Body, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; @@ -37,7 +37,7 @@ pub(crate) struct K2VApiEndpoint { impl K2VApiServer { pub async fn run( garage: Arc, - bind_addr: SocketAddr, + bind_addr: UnixOrTCPSocketAddress, s3_region: String, shutdown_signal: impl Future, ) -> Result<(), GarageError> { diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 27837297..cc2091b8 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,4 +1,3 @@ -use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -10,6 +9,7 @@ use hyper::{Body, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; use garage_util::error::Error as GarageError; +use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_model::garage::Garage; use garage_model::key_table::Key; @@ -44,7 +44,7 @@ pub(crate) struct S3ApiEndpoint { impl S3ApiServer { pub async fn run( garage: Arc, - addr: SocketAddr, + addr: UnixOrTCPSocketAddress, s3_region: String, shutdown_signal: impl Future, ) -> Result<(), GarageError> { diff --git a/src/garage/server.rs b/src/garage/server.rs index 472616c7..3ad10b72 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -79,7 +79,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "S3 API", tokio::spawn(S3ApiServer::run( garage.clone(), - *s3_bind_addr, + s3_bind_addr.clone(), config.s3_api.s3_region.clone(), wait_from(watch_cancel.clone()), )), @@ -94,7 +94,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "K2V API", tokio::spawn(K2VApiServer::run( garage.clone(), - config.k2v_api.as_ref().unwrap().api_bind_addr, + config.k2v_api.as_ref().unwrap().api_bind_addr.clone(), config.s3_api.s3_region.clone(), wait_from(watch_cancel.clone()), )), @@ -110,7 +110,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er "Web", tokio::spawn(WebServer::run( garage.clone(), - web_config.bind_addr, + web_config.bind_addr.clone(), web_config.root_domain.clone(), wait_from(watch_cancel.clone()), )), @@ -121,7 +121,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching Admin API server..."); servers.push(( "Admin", - tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))), + tokio::spawn( + admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())), + ), )); } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 287aef1a..73780efb 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -1,4 +1,6 @@ -use std::{convert::Infallible, net::SocketAddr, sync::Arc}; +use std::fs::{self, Permissions}; +use std::os::unix::prelude::PermissionsExt; +use std::{convert::Infallible, sync::Arc}; use futures::future::Future; @@ -9,6 +11,10 @@ use hyper::{ Body, Method, Request, Response, Server, StatusCode, }; +use hyperlocal::UnixServerExt; + +use tokio::net::UnixStream; + use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -32,6 +38,7 @@ use garage_util::data::Uuid; use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; +use garage_util::socket_address::UnixOrTCPSocketAddress; struct WebMetrics { request_counter: Counter, @@ -69,7 +76,7 @@ impl WebServer { /// Run a web server pub async fn run( garage: Arc, - addr: SocketAddr, + addr: UnixOrTCPSocketAddress, root_domain: String, shutdown_signal: impl Future, ) -> Result<(), GarageError> { @@ -80,7 +87,7 @@ impl WebServer { root_domain, }); - let service = make_service_fn(|conn: &AddrStream| { + let tcp_service = make_service_fn(|conn: &AddrStream| { let web_server = web_server.clone(); let client_addr = conn.remote_addr(); @@ -88,23 +95,56 @@ impl WebServer { Ok::<_, Error>(service_fn(move |req: Request| { let web_server = web_server.clone(); - web_server.handle_request(req, client_addr) + web_server.handle_request(req, client_addr.to_string()) + })) + } + }); + + let unix_service = make_service_fn(|_: &UnixStream| { + let web_server = web_server.clone(); + + let path = addr.to_string(); + async move { + Ok::<_, Error>(service_fn(move |req: Request| { + let web_server = web_server.clone(); + + web_server.handle_request(req, path.clone()) })) } }); - let server = Server::bind(&addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Web server listening on http://{}", addr); + info!("Web server listening on {}", addr); + + match addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + Server::bind(&addr) + .serve(tcp_service) + .with_graceful_shutdown(shutdown_signal) + .await? + } + UnixOrTCPSocketAddress::UnixSocket(ref path) => { + if path.exists() { + fs::remove_file(path)? + } + + let bound = Server::bind_unix(path)?; + + fs::set_permissions(path, Permissions::from_mode(0o222))?; + + bound + .serve(unix_service) + .with_graceful_shutdown(shutdown_signal) + .await?; + } + }; - graceful.await?; Ok(()) } async fn handle_request( self: Arc, req: Request, - addr: SocketAddr, + addr: String, ) -> Result, Infallible> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) -- cgit v1.2.3 From 7907a09acc812f857f283d64c988c69d08acd041 Mon Sep 17 00:00:00 2001 From: networkException Date: Tue, 3 Oct 2023 17:31:40 +0200 Subject: api: allow custom unix bind mode and use 0o220 for admin server --- src/api/admin/api_server.rs | 2 +- src/api/generic_server.rs | 6 +++++- src/api/k2v/api_server.rs | 2 +- src/api/s3/api_server.rs | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 43497bad..6f1e44e5 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -66,7 +66,7 @@ impl AdminApiServer { ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); ApiServer::new(region, self) - .run_server(bind_addr, shutdown_signal) + .run_server(bind_addr, Some(0o220), shutdown_signal) .await } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 42c65ab7..fa346f48 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -98,6 +98,7 @@ impl ApiServer { pub async fn run_server( self: Arc, bind_addr: UnixOrTCPSocketAddress, + unix_bind_addr_mode: Option, shutdown_signal: impl Future, ) -> Result<(), GarageError> { let tcp_service = make_service_fn(|conn: &AddrStream| { @@ -146,7 +147,10 @@ impl ApiServer { let bound = Server::bind_unix(path)?; - fs::set_permissions(path, Permissions::from_mode(0o222))?; + fs::set_permissions( + path, + Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), + )?; bound .serve(unix_service) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 413fd61c..3a032aba 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -42,7 +42,7 @@ impl K2VApiServer { shutdown_signal: impl Future, ) -> Result<(), GarageError> { ApiServer::new(s3_region, K2VApiServer { garage }) - .run_server(bind_addr, shutdown_signal) + .run_server(bind_addr, None, shutdown_signal) .await } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index cc2091b8..ecfb48b6 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -49,7 +49,7 @@ impl S3ApiServer { shutdown_signal: impl Future, ) -> Result<(), GarageError> { ApiServer::new(s3_region, S3ApiServer { garage }) - .run_server(addr, shutdown_signal) + .run_server(addr, None, shutdown_signal) .await } -- cgit v1.2.3