aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/admin/api_server.rs6
-rw-r--r--src/api/generic_server.rs60
-rw-r--r--src/api/k2v/api_server.rs6
-rw-r--r--src/api/s3/api_server.rs6
5 files changed, 61 insertions, 18 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 6425591f..cb9e2e55 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -45,6 +45,7 @@ http = "0.2"
httpdate = "1.0"
http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
+hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
multer = "2.0"
percent-encoding = "2.1.0"
roxmltree = "0.18"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 50c79120..6f1e44e5 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,5 +1,4 @@
use std::collections::HashMap;
-use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -18,6 +17,7 @@ use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError;
+use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::generic_server::*;
@@ -61,12 +61,12 @@ impl AdminApiServer {
pub async fn run(
self,
- bind_addr: SocketAddr,
+ bind_addr: UnixOrTCPSocketAddress,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
- .run_server(bind_addr, shutdown_signal)
+ .run_server(bind_addr, Some(0o220), shutdown_signal)
.await
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index 757b85ec..fa346f48 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -1,4 +1,5 @@
-use std::net::SocketAddr;
+use std::fs::{self, Permissions};
+use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use async_trait::async_trait;
@@ -11,6 +12,10 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use hyper::{HeaderMap, StatusCode};
+use hyperlocal::UnixServerExt;
+
+use tokio::net::UnixStream;
+
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
@@ -21,6 +26,7 @@ use opentelemetry::{
use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
+use garage_util::socket_address::UnixOrTCPSocketAddress;
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
@@ -91,10 +97,11 @@ impl<A: ApiHandler> ApiServer<A> {
pub async fn run_server(
self: Arc<Self>,
- bind_addr: SocketAddr,
+ bind_addr: UnixOrTCPSocketAddress,
+ unix_bind_addr_mode: Option<u32>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
- let service = make_service_fn(|conn: &AddrStream| {
+ let tcp_service = make_service_fn(|conn: &AddrStream| {
let this = self.clone();
let client_addr = conn.remote_addr();
@@ -102,28 +109,63 @@ impl<A: ApiHandler> ApiServer<A> {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let this = this.clone();
- this.handler(req, client_addr)
+ this.handler(req, client_addr.to_string())
}))
}
});
- let server = Server::bind(&bind_addr).serve(service);
+ let unix_service = make_service_fn(|_: &UnixStream| {
+ let this = self.clone();
+
+ let path = bind_addr.to_string();
+ async move {
+ Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
+ let this = this.clone();
+
+ this.handler(req, path.clone())
+ }))
+ }
+ });
- let graceful = server.with_graceful_shutdown(shutdown_signal);
info!(
- "{} API server listening on http://{}",
+ "{} API server listening on {}",
A::API_NAME_DISPLAY,
bind_addr
);
- graceful.await?;
+ match bind_addr {
+ UnixOrTCPSocketAddress::TCPSocket(addr) => {
+ Server::bind(&addr)
+ .serve(tcp_service)
+ .with_graceful_shutdown(shutdown_signal)
+ .await?
+ }
+ UnixOrTCPSocketAddress::UnixSocket(ref path) => {
+ if path.exists() {
+ fs::remove_file(path)?
+ }
+
+ let bound = Server::bind_unix(path)?;
+
+ fs::set_permissions(
+ path,
+ Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
+ )?;
+
+ bound
+ .serve(unix_service)
+ .with_graceful_shutdown(shutdown_signal)
+ .await?;
+ }
+ };
+
Ok(())
}
async fn handler(
self: Arc<Self>,
req: Request<Body>,
- addr: SocketAddr,
+ addr: String,
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index bb85b2e7..3a032aba 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -1,4 +1,3 @@
-use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -9,6 +8,7 @@ use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError;
+use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
@@ -37,12 +37,12 @@ pub(crate) struct K2VApiEndpoint {
impl K2VApiServer {
pub async fn run(
garage: Arc<Garage>,
- bind_addr: SocketAddr,
+ bind_addr: UnixOrTCPSocketAddress,
s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, K2VApiServer { garage })
- .run_server(bind_addr, shutdown_signal)
+ .run_server(bind_addr, None, shutdown_signal)
.await
}
}
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 27837297..ecfb48b6 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -1,4 +1,3 @@
-use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -10,6 +9,7 @@ use hyper::{Body, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError;
+use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
@@ -44,12 +44,12 @@ pub(crate) struct S3ApiEndpoint {
impl S3ApiServer {
pub async fn run(
garage: Arc<Garage>,
- addr: SocketAddr,
+ addr: UnixOrTCPSocketAddress,
s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
ApiServer::new(s3_region, S3ApiServer { garage })
- .run_server(addr, shutdown_signal)
+ .run_server(addr, None, shutdown_signal)
.await
}