aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/admin/api_server.rs19
-rw-r--r--src/api/k2v/api_server.rs14
-rw-r--r--src/api/s3/api_server.rs14
-rw-r--r--src/garage/server.rs112
-rw-r--r--src/util/config.rs6
-rw-r--r--src/web/lib.rs2
-rw-r--r--src/web/web_server.rs409
7 files changed, 301 insertions, 275 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index d871d4e2..fb0078cc 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,3 +1,4 @@
+use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -52,15 +53,15 @@ impl AdminApiServer {
}
}
- pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> {
- if let Some(bind_addr) = self.garage.config.admin.api_bind_addr {
- let region = self.garage.config.s3_api.s3_region.clone();
- ApiServer::new(region, self)
- .run_server(bind_addr, shutdown_signal)
- .await
- } else {
- Ok(())
- }
+ pub async fn run(
+ self,
+ bind_addr: SocketAddr,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let region = self.garage.config.s3_api.s3_region.clone();
+ ApiServer::new(region, self)
+ .run_server(bind_addr, shutdown_signal)
+ .await
}
fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index eb0fbdd7..084867b5 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -1,3 +1,4 @@
+use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -36,20 +37,13 @@ pub(crate) struct K2VApiEndpoint {
impl K2VApiServer {
pub async fn run(
garage: Arc<Garage>,
+ bind_addr: SocketAddr,
+ s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
- if let Some(cfg) = &garage.config.k2v_api {
- let bind_addr = cfg.api_bind_addr;
-
- ApiServer::new(
- garage.config.s3_api.s3_region.clone(),
- K2VApiServer { garage },
- )
+ ApiServer::new(s3_region, K2VApiServer { garage })
.run_server(bind_addr, shutdown_signal)
.await
- } else {
- Ok(())
- }
}
}
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 78dfeeac..27837297 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -1,3 +1,4 @@
+use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -43,16 +44,13 @@ pub(crate) struct S3ApiEndpoint {
impl S3ApiServer {
pub async fn run(
garage: Arc<Garage>,
+ addr: SocketAddr,
+ s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
- let addr = garage.config.s3_api.api_bind_addr;
-
- ApiServer::new(
- garage.config.s3_api.s3_region.clone(),
- S3ApiServer { garage },
- )
- .run_server(addr, shutdown_signal)
- .await
+ ApiServer::new(s3_region, S3ApiServer { garage })
+ .run_server(addr, shutdown_signal)
+ .await
}
async fn handle_request_without_bucket(
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 0851738d..fb6d2279 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -9,7 +9,7 @@ use garage_util::error::Error;
use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
-use garage_web::run_web_server;
+use garage_web::WebServer;
#[cfg(feature = "k2v")]
use garage_api::k2v::api_server::K2VApiServer;
@@ -30,6 +30,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
let config = read_config(config_file)?;
+ // ---- Initialize Garage internals ----
+
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@@ -44,7 +46,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?;
#[cfg(not(feature = "telemetry-otlp"))]
- warn!("Garage was built without OTLP exporter, admin.trace_sink is ignored.");
+ error!("Garage was built without OTLP exporter, admin.trace_sink is ignored.");
}
info!("Initialize Admin API server and metrics collector...");
@@ -56,53 +58,73 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone());
- info!("Initializing S3 API server...");
- let s3_api_server = tokio::spawn(S3ApiServer::run(
- garage.clone(),
- wait_from(watch_cancel.clone()),
- ));
-
- #[cfg(feature = "k2v")]
- let k2v_api_server = {
- info!("Initializing K2V API server...");
- tokio::spawn(K2VApiServer::run(
- garage.clone(),
- wait_from(watch_cancel.clone()),
- ))
- };
-
- info!("Initializing web server...");
- let web_server = tokio::spawn(run_web_server(
- garage.clone(),
- wait_from(watch_cancel.clone()),
- ));
-
- info!("Launching Admin API server...");
- let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone())));
-
- // Stuff runs
+ // ---- Launch public-facing API servers ----
+
+ let mut servers = vec![];
+
+ if let Some(s3_bind_addr) = &config.s3_api.api_bind_addr {
+ info!("Initializing S3 API server...");
+ servers.push((
+ "S3 API",
+ tokio::spawn(S3ApiServer::run(
+ garage.clone(),
+ *s3_bind_addr,
+ config.s3_api.s3_region.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
+ }
- // When a cancel signal is sent, stuff stops
- if let Err(e) = s3_api_server.await? {
- warn!("S3 API server exited with error: {}", e);
- } else {
- info!("S3 API server exited without error.");
+ if config.k2v_api.is_some() {
+ #[cfg(feature = "k2v")]
+ {
+ info!("Initializing K2V API server...");
+ servers.push((
+ "K2V API",
+ tokio::spawn(K2VApiServer::run(
+ garage.clone(),
+ config.k2v_api.as_ref().unwrap().api_bind_addr,
+ config.s3_api.s3_region.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
+ }
+ #[cfg(not(feature = "k2v"))]
+ error!("K2V is not enabled in this build, cannot start K2V API server");
}
- #[cfg(feature = "k2v")]
- if let Err(e) = k2v_api_server.await? {
- warn!("K2V API server exited with error: {}", e);
- } else {
- info!("K2V API server exited without error.");
+
+ if let Some(web_config) = &config.s3_web {
+ info!("Initializing web server...");
+ servers.push((
+ "Web",
+ tokio::spawn(WebServer::run(
+ garage.clone(),
+ web_config.bind_addr,
+ web_config.root_domain.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
}
- if let Err(e) = web_server.await? {
- warn!("Web server exited with error: {}", e);
- } else {
- info!("Web server exited without error.");
+
+ if let Some(admin_bind_addr) = &config.admin.api_bind_addr {
+ info!("Launching Admin API server...");
+ servers.push((
+ "Admin",
+ tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))),
+ ));
}
- if let Err(e) = admin_server.await? {
- warn!("Admin web server exited with error: {}", e);
- } else {
- info!("Admin API server exited without error.");
+
+ // Stuff runs
+
+ // When a cancel signal is sent, stuff stops
+
+ // Collect stuff
+ for (desc, join_handle) in servers {
+ if let Err(e) = join_handle.await? {
+ error!("{} server exited with error: {}", desc, e);
+ } else {
+ info!("{} server exited without error.", desc);
+ }
}
// Remove RPC handlers for system to break reference cycles
diff --git a/src/util/config.rs b/src/util/config.rs
index e8ef4fdd..46c5cb9d 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -81,11 +81,10 @@ pub struct Config {
pub s3_api: S3ApiConfig,
/// Configuration for K2V api
- #[cfg(feature = "k2v")]
pub k2v_api: Option<K2VApiConfig>,
/// Configuration for serving files as normal web server
- pub s3_web: WebConfig,
+ pub s3_web: Option<WebConfig>,
/// Configuration for the admin API endpoint
#[serde(default = "Default::default")]
@@ -96,7 +95,7 @@ pub struct Config {
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {
/// Address and port to bind for api serving
- pub api_bind_addr: SocketAddr,
+ pub api_bind_addr: Option<SocketAddr>,
/// S3 region to use
pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None,
@@ -105,7 +104,6 @@ pub struct S3ApiConfig {
}
/// Configuration for K2V api
-#[cfg(feature = "k2v")]
#[derive(Deserialize, Debug, Clone)]
pub struct K2VApiConfig {
/// Address and port to bind for api serving
diff --git a/src/web/lib.rs b/src/web/lib.rs
index 9b7c8573..7207c365 100644
--- a/src/web/lib.rs
+++ b/src/web/lib.rs
@@ -6,4 +6,4 @@ mod error;
pub use error::Error;
mod web_server;
-pub use web_server::run_web_server;
+pub use web_server::WebServer;
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index c30d8957..c2322073 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -57,90 +57,226 @@ impl WebMetrics {
}
}
-/// Run a web server
-pub async fn run_web_server(
+pub struct WebServer {
garage: Arc<Garage>,
- shutdown_signal: impl Future<Output = ()>,
-) -> Result<(), GarageError> {
- let addr = &garage.config.s3_web.bind_addr;
+ metrics: Arc<WebMetrics>,
+ root_domain: String,
+}
- let metrics = Arc::new(WebMetrics::new());
+impl WebServer {
+ /// Run a web server
+ pub async fn run(
+ garage: Arc<Garage>,
+ addr: SocketAddr,
+ root_domain: String,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let metrics = Arc::new(WebMetrics::new());
+ let web_server = Arc::new(WebServer {
+ garage,
+ metrics,
+ root_domain,
+ });
+
+ let service = make_service_fn(|conn: &AddrStream| {
+ let web_server = web_server.clone();
+
+ let client_addr = conn.remote_addr();
+ async move {
+ Ok::<_, Error>(service_fn(move |req: Request<Body>| {
+ let web_server = web_server.clone();
+
+ web_server.handle_request(req, client_addr)
+ }))
+ }
+ });
- let service = make_service_fn(|conn: &AddrStream| {
- let garage = garage.clone();
- let metrics = metrics.clone();
+ let server = Server::bind(&addr).serve(service);
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ info!("Web server listening on http://{}", addr);
- let client_addr = conn.remote_addr();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- let garage = garage.clone();
- let metrics = metrics.clone();
+ graceful.await?;
+ Ok(())
+ }
- handle_request(garage, metrics, req, client_addr)
- }))
+ async fn handle_request(
+ self: Arc<Self>,
+ req: Request<Body>,
+ addr: SocketAddr,
+ ) -> Result<Response<Body>, Infallible> {
+ info!("{} {} {}", addr, req.method(), req.uri());
+
+ // Lots of instrumentation
+ let tracer = opentelemetry::global::tracer("garage");
+ let span = tracer
+ .span_builder(format!("Web {} request", req.method()))
+ .with_trace_id(gen_trace_id())
+ .with_attributes(vec![
+ KeyValue::new("method", format!("{}", req.method())),
+ KeyValue::new("uri", req.uri().to_string()),
+ ])
+ .start(&tracer);
+
+ let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
+
+ // The actual handler
+ let res = self
+ .serve_file(&req)
+ .with_context(Context::current_with_span(span))
+ .record_duration(&self.metrics.request_duration, &metrics_tags[..])
+ .await;
+
+ // More instrumentation
+ self.metrics.request_counter.add(1, &metrics_tags[..]);
+
+ // Returning the result
+ match res {
+ Ok(res) => {
+ debug!("{} {} {}", req.method(), res.status(), req.uri());
+ Ok(res)
+ }
+ Err(error) => {
+ info!(
+ "{} {} {} {}",
+ req.method(),
+ error.http_status_code(),
+ req.uri(),
+ error
+ );
+ self.metrics.error_counter.add(
+ 1,
+ &[
+ metrics_tags[0].clone(),
+ KeyValue::new("status_code", error.http_status_code().to_string()),
+ ],
+ );
+ Ok(error_to_res(error))
+ }
}
- });
+ }
- let server = Server::bind(addr).serve(service);
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("Web server listening on http://{}", addr);
+ async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> {
+ // Get http authority string (eg. [::1]:3902 or garage.tld:80)
+ let authority = req
+ .headers()
+ .get(HOST)
+ .ok_or_bad_request("HOST header required")?
+ .to_str()?;
+
+ // Get bucket
+ let host = authority_to_host(authority)?;
+
+ let bucket_name = host_to_bucket(&host, &self.root_domain).unwrap_or(&host);
+ let bucket_id = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &bucket_name.to_string())
+ .await?
+ .and_then(|x| x.state.take())
+ .ok_or(Error::NotFound)?;
+
+ // Check bucket isn't deleted and has website access enabled
+ let bucket = self
+ .garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .ok_or(Error::NotFound)?;
+
+ let website_config = bucket
+ .params()
+ .ok_or(Error::NotFound)?
+ .website_config
+ .get()
+ .as_ref()
+ .ok_or(Error::NotFound)?;
+
+ // Get path
+ let path = req.uri().path().to_string();
+ let index = &website_config.index_document;
+ let key = path_to_key(&path, index)?;
+
+ debug!(
+ "Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
+ bucket_name, bucket_id, key
+ );
+
+ let ret_doc = match *req.method() {
+ Method::OPTIONS => handle_options_for_bucket(req, &bucket),
+ Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await,
+ Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await,
+ _ => Err(ApiError::bad_request("HTTP method not supported")),
+ }
+ .map_err(Error::from);
+
+ match ret_doc {
+ Err(error) => {
+ // For a HEAD or OPTIONS method, and for non-4xx errors,
+ // we don't return the error document as content,
+ // we return above and just return the error message
+ // by relying on err_to_res that is called when we return an Err.
+ if *req.method() == Method::HEAD
+ || *req.method() == Method::OPTIONS
+ || !error.http_status_code().is_client_error()
+ {
+ return Err(error);
+ }
- graceful.await?;
- Ok(())
-}
+ // If no error document is set: just return the error directly
+ let error_document = match &website_config.error_document {
+ Some(ed) => ed.trim_start_matches('/').to_owned(),
+ None => return Err(error),
+ };
+
+ // We want to return the error document
+ // Create a fake HTTP request with path = the error document
+ let req2 = Request::builder()
+ .uri(format!("http://{}/{}", host, &error_document))
+ .body(Body::empty())
+ .unwrap();
+
+ match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await
+ {
+ Ok(mut error_doc) => {
+ // The error won't be logged back in handle_request,
+ // so log it here
+ info!(
+ "{} {} {} {}",
+ req.method(),
+ req.uri(),
+ error.http_status_code(),
+ error
+ );
+
+ *error_doc.status_mut() = error.http_status_code();
+ error.add_headers(error_doc.headers_mut());
+
+ // Preserve error message in a special header
+ for error_line in error.to_string().split('\n') {
+ if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) {
+ error_doc.headers_mut().append("X-Garage-Error", v);
+ }
+ }
-async fn handle_request(
- garage: Arc<Garage>,
- metrics: Arc<WebMetrics>,
- req: Request<Body>,
- addr: SocketAddr,
-) -> Result<Response<Body>, Infallible> {
- info!("{} {} {}", addr, req.method(), req.uri());
-
- // Lots of instrumentation
- let tracer = opentelemetry::global::tracer("garage");
- let span = tracer
- .span_builder(format!("Web {} request", req.method()))
- .with_trace_id(gen_trace_id())
- .with_attributes(vec![
- KeyValue::new("method", format!("{}", req.method())),
- KeyValue::new("uri", req.uri().to_string()),
- ])
- .start(&tracer);
-
- let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
-
- // The actual handler
- let res = serve_file(garage, &req)
- .with_context(Context::current_with_span(span))
- .record_duration(&metrics.request_duration, &metrics_tags[..])
- .await;
-
- // More instrumentation
- metrics.request_counter.add(1, &metrics_tags[..]);
-
- // Returning the result
- match res {
- Ok(res) => {
- debug!("{} {} {}", req.method(), res.status(), req.uri());
- Ok(res)
- }
- Err(error) => {
- info!(
- "{} {} {} {}",
- req.method(),
- error.http_status_code(),
- req.uri(),
- error
- );
- metrics.error_counter.add(
- 1,
- &[
- metrics_tags[0].clone(),
- KeyValue::new("status_code", error.http_status_code().to_string()),
- ],
- );
- Ok(error_to_res(error))
+ Ok(error_doc)
+ }
+ Err(error_doc_error) => {
+ warn!(
+ "Couldn't get error document {} for bucket {:?}: {}",
+ error_document, bucket_id, error_doc_error
+ );
+ Err(error)
+ }
+ }
+ }
+ Ok(mut resp) => {
+ // Maybe add CORS headers
+ if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
+ add_cors_headers(&mut resp, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+ Ok(resp)
+ }
}
}
}
@@ -160,129 +296,6 @@ fn error_to_res(e: Error) -> Response<Body> {
http_error
}
-async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response<Body>, Error> {
- // Get http authority string (eg. [::1]:3902 or garage.tld:80)
- let authority = req
- .headers()
- .get(HOST)
- .ok_or_bad_request("HOST header required")?
- .to_str()?;
-
- // Get bucket
- let host = authority_to_host(authority)?;
- let root = &garage.config.s3_web.root_domain;
-
- let bucket_name = host_to_bucket(&host, root).unwrap_or(&host);
- let bucket_id = garage
- .bucket_alias_table
- .get(&EmptyKey, &bucket_name.to_string())
- .await?
- .and_then(|x| x.state.take())
- .ok_or(Error::NotFound)?;
-
- // Check bucket isn't deleted and has website access enabled
- let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .ok_or(Error::NotFound)?;
-
- let website_config = bucket
- .params()
- .ok_or(Error::NotFound)?
- .website_config
- .get()
- .as_ref()
- .ok_or(Error::NotFound)?;
-
- // Get path
- let path = req.uri().path().to_string();
- let index = &website_config.index_document;
- let key = path_to_key(&path, index)?;
-
- debug!(
- "Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
- bucket_name, bucket_id, key
- );
-
- let ret_doc = match *req.method() {
- Method::OPTIONS => handle_options_for_bucket(req, &bucket),
- Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
- Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
- _ => Err(ApiError::bad_request("HTTP method not supported")),
- }
- .map_err(Error::from);
-
- match ret_doc {
- Err(error) => {
- // For a HEAD or OPTIONS method, and for non-4xx errors,
- // we don't return the error document as content,
- // we return above and just return the error message
- // by relying on err_to_res that is called when we return an Err.
- if *req.method() == Method::HEAD
- || *req.method() == Method::OPTIONS
- || !error.http_status_code().is_client_error()
- {
- return Err(error);
- }
-
- // If no error document is set: just return the error directly
- let error_document = match &website_config.error_document {
- Some(ed) => ed.trim_start_matches('/').to_owned(),
- None => return Err(error),
- };
-
- // We want to return the error document
- // Create a fake HTTP request with path = the error document
- let req2 = Request::builder()
- .uri(format!("http://{}/{}", host, &error_document))
- .body(Body::empty())
- .unwrap();
-
- match handle_get(garage, &req2, bucket_id, &error_document, None).await {
- Ok(mut error_doc) => {
- // The error won't be logged back in handle_request,
- // so log it here
- info!(
- "{} {} {} {}",
- req.method(),
- req.uri(),
- error.http_status_code(),
- error
- );
-
- *error_doc.status_mut() = error.http_status_code();
- error.add_headers(error_doc.headers_mut());
-
- // Preserve error message in a special header
- for error_line in error.to_string().split('\n') {
- if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) {
- error_doc.headers_mut().append("X-Garage-Error", v);
- }
- }
-
- Ok(error_doc)
- }
- Err(error_doc_error) => {
- warn!(
- "Couldn't get error document {} for bucket {:?}: {}",
- error_document, bucket_id, error_doc_error
- );
- Err(error)
- }
- }
- }
- Ok(mut resp) => {
- // Maybe add CORS headers
- if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
- add_cors_headers(&mut resp, rule)
- .ok_or_internal_error("Invalid bucket CORS configuration")?;
- }
- Ok(resp)
- }
- }
-}
-
/// Path to key
///
/// Convert the provided path to the internal key