diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-07 17:54:16 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-07 17:54:16 +0200 |
commit | 2559f63e9bb58a66da70f33e852ebbd5f909876e (patch) | |
tree | c052d8d8978acd1396e3559015d0212fb1dbeb33 | |
parent | 28d86e76021bed674ca78684b9522cfb664a8ae2 (diff) | |
download | garage-2559f63e9bb58a66da70f33e852ebbd5f909876e.tar.gz garage-2559f63e9bb58a66da70f33e852ebbd5f909876e.zip |
Make all HTTP services optionnal
-rw-r--r-- | src/api/admin/api_server.rs | 19 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 14 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 14 | ||||
-rw-r--r-- | src/garage/server.rs | 112 | ||||
-rw-r--r-- | src/util/config.rs | 6 | ||||
-rw-r--r-- | src/web/lib.rs | 2 | ||||
-rw-r--r-- | src/web/web_server.rs | 409 |
7 files changed, 301 insertions, 275 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index d871d4e2..fb0078cc 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -52,15 +53,15 @@ impl AdminApiServer { } } - pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> { - if let Some(bind_addr) = self.garage.config.admin.api_bind_addr { - let region = self.garage.config.s3_api.s3_region.clone(); - ApiServer::new(region, self) - .run_server(bind_addr, shutdown_signal) - .await - } else { - Ok(()) - } + pub async fn run( + self, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let region = self.garage.config.s3_api.s3_region.clone(); + ApiServer::new(region, self) + .run_server(bind_addr, shutdown_signal) + .await } fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb0fbdd7..084867b5 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -36,20 +37,13 @@ pub(crate) struct K2VApiEndpoint { impl K2VApiServer { pub async fn run( garage: Arc<Garage>, + bind_addr: SocketAddr, + s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - if let Some(cfg) = &garage.config.k2v_api { - let bind_addr = cfg.api_bind_addr; - - ApiServer::new( - garage.config.s3_api.s3_region.clone(), - K2VApiServer { garage }, - ) + ApiServer::new(s3_region, K2VApiServer { garage }) .run_server(bind_addr, shutdown_signal) .await - } else { - Ok(()) - } } } diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 78dfeeac..27837297 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -43,16 +44,13 @@ pub(crate) struct S3ApiEndpoint { impl S3ApiServer { pub async fn run( garage: Arc<Garage>, + addr: SocketAddr, + s3_region: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - let addr = garage.config.s3_api.api_bind_addr; - - ApiServer::new( - garage.config.s3_api.s3_region.clone(), - S3ApiServer { garage }, - ) - .run_server(addr, shutdown_signal) - .await + ApiServer::new(s3_region, S3ApiServer { garage }) + .run_server(addr, shutdown_signal) + .await } async fn handle_request_without_bucket( diff --git a/src/garage/server.rs b/src/garage/server.rs index 0851738d..fb6d2279 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -9,7 +9,7 @@ use garage_util::error::Error; use garage_api::admin::api_server::AdminApiServer; use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; -use garage_web::run_web_server; +use garage_web::WebServer; #[cfg(feature = "k2v")] use garage_api::k2v::api_server::K2VApiServer; @@ -30,6 +30,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); let config = read_config(config_file)?; + // ---- Initialize Garage internals ---- + info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); @@ -44,7 +46,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?; #[cfg(not(feature = "telemetry-otlp"))] - warn!("Garage was built without OTLP exporter, admin.trace_sink is ignored."); + error!("Garage was built without OTLP exporter, admin.trace_sink is ignored."); } info!("Initialize Admin API server and metrics collector..."); @@ -56,53 +58,73 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Create admin RPC handler..."); AdminRpcHandler::new(garage.clone()); - info!("Initializing S3 API server..."); - let s3_api_server = tokio::spawn(S3ApiServer::run( - garage.clone(), - wait_from(watch_cancel.clone()), - )); - - #[cfg(feature = "k2v")] - let k2v_api_server = { - info!("Initializing K2V API server..."); - tokio::spawn(K2VApiServer::run( - garage.clone(), - wait_from(watch_cancel.clone()), - )) - }; - - info!("Initializing web server..."); - let web_server = tokio::spawn(run_web_server( - garage.clone(), - wait_from(watch_cancel.clone()), - )); - - info!("Launching Admin API server..."); - let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); - - // Stuff runs + // ---- Launch public-facing API servers ---- + + let mut servers = vec![]; + + if let Some(s3_bind_addr) = &config.s3_api.api_bind_addr { + info!("Initializing S3 API server..."); + servers.push(( + "S3 API", + tokio::spawn(S3ApiServer::run( + garage.clone(), + *s3_bind_addr, + config.s3_api.s3_region.clone(), + wait_from(watch_cancel.clone()), + )), + )); + } - // When a cancel signal is sent, stuff stops - if let Err(e) = s3_api_server.await? { - warn!("S3 API server exited with error: {}", e); - } else { - info!("S3 API server exited without error."); + if config.k2v_api.is_some() { + #[cfg(feature = "k2v")] + { + info!("Initializing K2V API server..."); + servers.push(( + "K2V API", + tokio::spawn(K2VApiServer::run( + garage.clone(), + config.k2v_api.as_ref().unwrap().api_bind_addr, + config.s3_api.s3_region.clone(), + wait_from(watch_cancel.clone()), + )), + )); + } + #[cfg(not(feature = "k2v"))] + error!("K2V is not enabled in this build, cannot start K2V API server"); } - #[cfg(feature = "k2v")] - if let Err(e) = k2v_api_server.await? { - warn!("K2V API server exited with error: {}", e); - } else { - info!("K2V API server exited without error."); + + if let Some(web_config) = &config.s3_web { + info!("Initializing web server..."); + servers.push(( + "Web", + tokio::spawn(WebServer::run( + garage.clone(), + web_config.bind_addr, + web_config.root_domain.clone(), + wait_from(watch_cancel.clone()), + )), + )); } - if let Err(e) = web_server.await? { - warn!("Web server exited with error: {}", e); - } else { - info!("Web server exited without error."); + + if let Some(admin_bind_addr) = &config.admin.api_bind_addr { + info!("Launching Admin API server..."); + servers.push(( + "Admin", + tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))), + )); } - if let Err(e) = admin_server.await? { - warn!("Admin web server exited with error: {}", e); - } else { - info!("Admin API server exited without error."); + + // Stuff runs + + // When a cancel signal is sent, stuff stops + + // Collect stuff + for (desc, join_handle) in servers { + if let Err(e) = join_handle.await? { + error!("{} server exited with error: {}", desc, e); + } else { + info!("{} server exited without error.", desc); + } } // Remove RPC handlers for system to break reference cycles diff --git a/src/util/config.rs b/src/util/config.rs index e8ef4fdd..46c5cb9d 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -81,11 +81,10 @@ pub struct Config { pub s3_api: S3ApiConfig, /// Configuration for K2V api - #[cfg(feature = "k2v")] pub k2v_api: Option<K2VApiConfig>, /// Configuration for serving files as normal web server - pub s3_web: WebConfig, + pub s3_web: Option<WebConfig>, /// Configuration for the admin API endpoint #[serde(default = "Default::default")] @@ -96,7 +95,7 @@ pub struct Config { #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: Option<SocketAddr>, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -105,7 +104,6 @@ pub struct S3ApiConfig { } /// Configuration for K2V api -#[cfg(feature = "k2v")] #[derive(Deserialize, Debug, Clone)] pub struct K2VApiConfig { /// Address and port to bind for api serving diff --git a/src/web/lib.rs b/src/web/lib.rs index 9b7c8573..7207c365 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -6,4 +6,4 @@ mod error; pub use error::Error; mod web_server; -pub use web_server::run_web_server; +pub use web_server::WebServer; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index c30d8957..c2322073 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -57,90 +57,226 @@ impl WebMetrics { } } -/// Run a web server -pub async fn run_web_server( +pub struct WebServer { garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), GarageError> { - let addr = &garage.config.s3_web.bind_addr; + metrics: Arc<WebMetrics>, + root_domain: String, +} - let metrics = Arc::new(WebMetrics::new()); +impl WebServer { + /// Run a web server + pub async fn run( + garage: Arc<Garage>, + addr: SocketAddr, + root_domain: String, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let metrics = Arc::new(WebMetrics::new()); + let web_server = Arc::new(WebServer { + garage, + metrics, + root_domain, + }); + + let service = make_service_fn(|conn: &AddrStream| { + let web_server = web_server.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let web_server = web_server.clone(); + + web_server.handle_request(req, client_addr) + })) + } + }); - let service = make_service_fn(|conn: &AddrStream| { - let garage = garage.clone(); - let metrics = metrics.clone(); + let server = Server::bind(&addr).serve(service); + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("Web server listening on http://{}", addr); - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - let metrics = metrics.clone(); + graceful.await?; + Ok(()) + } - handle_request(garage, metrics, req, client_addr) - })) + async fn handle_request( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, Infallible> { + info!("{} {} {}", addr, req.method(), req.uri()); + + // Lots of instrumentation + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("Web {} request", req.method())) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; + + // The actual handler + let res = self + .serve_file(&req) + .with_context(Context::current_with_span(span)) + .record_duration(&self.metrics.request_duration, &metrics_tags[..]) + .await; + + // More instrumentation + self.metrics.request_counter.add(1, &metrics_tags[..]); + + // Returning the result + match res { + Ok(res) => { + debug!("{} {} {}", req.method(), res.status(), req.uri()); + Ok(res) + } + Err(error) => { + info!( + "{} {} {} {}", + req.method(), + error.http_status_code(), + req.uri(), + error + ); + self.metrics.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", error.http_status_code().to_string()), + ], + ); + Ok(error_to_res(error)) + } } - }); + } - let server = Server::bind(addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Web server listening on http://{}", addr); + async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> { + // Get http authority string (eg. [::1]:3902 or garage.tld:80) + let authority = req + .headers() + .get(HOST) + .ok_or_bad_request("HOST header required")? + .to_str()?; + + // Get bucket + let host = authority_to_host(authority)?; + + let bucket_name = host_to_bucket(&host, &self.root_domain).unwrap_or(&host); + let bucket_id = self + .garage + .bucket_alias_table + .get(&EmptyKey, &bucket_name.to_string()) + .await? + .and_then(|x| x.state.take()) + .ok_or(Error::NotFound)?; + + // Check bucket isn't deleted and has website access enabled + let bucket = self + .garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .ok_or(Error::NotFound)?; + + let website_config = bucket + .params() + .ok_or(Error::NotFound)? + .website_config + .get() + .as_ref() + .ok_or(Error::NotFound)?; + + // Get path + let path = req.uri().path().to_string(); + let index = &website_config.index_document; + let key = path_to_key(&path, index)?; + + debug!( + "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", + bucket_name, bucket_id, key + ); + + let ret_doc = match *req.method() { + Method::OPTIONS => handle_options_for_bucket(req, &bucket), + Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, + Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + _ => Err(ApiError::bad_request("HTTP method not supported")), + } + .map_err(Error::from); + + match ret_doc { + Err(error) => { + // For a HEAD or OPTIONS method, and for non-4xx errors, + // we don't return the error document as content, + // we return above and just return the error message + // by relying on err_to_res that is called when we return an Err. + if *req.method() == Method::HEAD + || *req.method() == Method::OPTIONS + || !error.http_status_code().is_client_error() + { + return Err(error); + } - graceful.await?; - Ok(()) -} + // If no error document is set: just return the error directly + let error_document = match &website_config.error_document { + Some(ed) => ed.trim_start_matches('/').to_owned(), + None => return Err(error), + }; + + // We want to return the error document + // Create a fake HTTP request with path = the error document + let req2 = Request::builder() + .uri(format!("http://{}/{}", host, &error_document)) + .body(Body::empty()) + .unwrap(); + + match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await + { + Ok(mut error_doc) => { + // The error won't be logged back in handle_request, + // so log it here + info!( + "{} {} {} {}", + req.method(), + req.uri(), + error.http_status_code(), + error + ); + + *error_doc.status_mut() = error.http_status_code(); + error.add_headers(error_doc.headers_mut()); + + // Preserve error message in a special header + for error_line in error.to_string().split('\n') { + if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { + error_doc.headers_mut().append("X-Garage-Error", v); + } + } -async fn handle_request( - garage: Arc<Garage>, - metrics: Arc<WebMetrics>, - req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, Infallible> { - info!("{} {} {}", addr, req.method(), req.uri()); - - // Lots of instrumentation - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("Web {} request", req.method())) - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; - - // The actual handler - let res = serve_file(garage, &req) - .with_context(Context::current_with_span(span)) - .record_duration(&metrics.request_duration, &metrics_tags[..]) - .await; - - // More instrumentation - metrics.request_counter.add(1, &metrics_tags[..]); - - // Returning the result - match res { - Ok(res) => { - debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) - } - Err(error) => { - info!( - "{} {} {} {}", - req.method(), - error.http_status_code(), - req.uri(), - error - ); - metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", error.http_status_code().to_string()), - ], - ); - Ok(error_to_res(error)) + Ok(error_doc) + } + Err(error_doc_error) => { + warn!( + "Couldn't get error document {} for bucket {:?}: {}", + error_document, bucket_id, error_doc_error + ); + Err(error) + } + } + } + Ok(mut resp) => { + // Maybe add CORS headers + if let Some(rule) = find_matching_cors_rule(&bucket, req)? { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) + } } } } @@ -160,129 +296,6 @@ fn error_to_res(e: Error) -> Response<Body> { http_error } -async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response<Body>, Error> { - // Get http authority string (eg. [::1]:3902 or garage.tld:80) - let authority = req - .headers() - .get(HOST) - .ok_or_bad_request("HOST header required")? - .to_str()?; - - // Get bucket - let host = authority_to_host(authority)?; - let root = &garage.config.s3_web.root_domain; - - let bucket_name = host_to_bucket(&host, root).unwrap_or(&host); - let bucket_id = garage - .bucket_alias_table - .get(&EmptyKey, &bucket_name.to_string()) - .await? - .and_then(|x| x.state.take()) - .ok_or(Error::NotFound)?; - - // Check bucket isn't deleted and has website access enabled - let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NotFound)?; - - let website_config = bucket - .params() - .ok_or(Error::NotFound)? - .website_config - .get() - .as_ref() - .ok_or(Error::NotFound)?; - - // Get path - let path = req.uri().path().to_string(); - let index = &website_config.index_document; - let key = path_to_key(&path, index)?; - - debug!( - "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", - bucket_name, bucket_id, key - ); - - let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await, - _ => Err(ApiError::bad_request("HTTP method not supported")), - } - .map_err(Error::from); - - match ret_doc { - Err(error) => { - // For a HEAD or OPTIONS method, and for non-4xx errors, - // we don't return the error document as content, - // we return above and just return the error message - // by relying on err_to_res that is called when we return an Err. - if *req.method() == Method::HEAD - || *req.method() == Method::OPTIONS - || !error.http_status_code().is_client_error() - { - return Err(error); - } - - // If no error document is set: just return the error directly - let error_document = match &website_config.error_document { - Some(ed) => ed.trim_start_matches('/').to_owned(), - None => return Err(error), - }; - - // We want to return the error document - // Create a fake HTTP request with path = the error document - let req2 = Request::builder() - .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) - .unwrap(); - - match handle_get(garage, &req2, bucket_id, &error_document, None).await { - Ok(mut error_doc) => { - // The error won't be logged back in handle_request, - // so log it here - info!( - "{} {} {} {}", - req.method(), - req.uri(), - error.http_status_code(), - error - ); - - *error_doc.status_mut() = error.http_status_code(); - error.add_headers(error_doc.headers_mut()); - - // Preserve error message in a special header - for error_line in error.to_string().split('\n') { - if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { - error_doc.headers_mut().append("X-Garage-Error", v); - } - } - - Ok(error_doc) - } - Err(error_doc_error) => { - warn!( - "Couldn't get error document {} for bucket {:?}: {}", - error_document, bucket_id, error_doc_error - ); - Err(error) - } - } - } - Ok(mut resp) => { - // Maybe add CORS headers - if let Some(rule) = find_matching_cors_rule(&bucket, req)? { - add_cors_headers(&mut resp, rule) - .ok_or_internal_error("Invalid bucket CORS configuration")?; - } - Ok(resp) - } - } -} - /// Path to key /// /// Convert the provided path to the internal key |