diff options
Diffstat (limited to 'src/web/web_server.rs')
-rw-r--r-- | src/web/web_server.rs | 409 |
1 files changed, 211 insertions, 198 deletions
diff --git a/src/web/web_server.rs b/src/web/web_server.rs index c30d8957..c2322073 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -57,90 +57,226 @@ impl WebMetrics { } } -/// Run a web server -pub async fn run_web_server( +pub struct WebServer { garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), GarageError> { - let addr = &garage.config.s3_web.bind_addr; + metrics: Arc<WebMetrics>, + root_domain: String, +} - let metrics = Arc::new(WebMetrics::new()); +impl WebServer { + /// Run a web server + pub async fn run( + garage: Arc<Garage>, + addr: SocketAddr, + root_domain: String, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let metrics = Arc::new(WebMetrics::new()); + let web_server = Arc::new(WebServer { + garage, + metrics, + root_domain, + }); + + let service = make_service_fn(|conn: &AddrStream| { + let web_server = web_server.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, Error>(service_fn(move |req: Request<Body>| { + let web_server = web_server.clone(); + + web_server.handle_request(req, client_addr) + })) + } + }); - let service = make_service_fn(|conn: &AddrStream| { - let garage = garage.clone(); - let metrics = metrics.clone(); + let server = Server::bind(&addr).serve(service); + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!("Web server listening on http://{}", addr); - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - let metrics = metrics.clone(); + graceful.await?; + Ok(()) + } - handle_request(garage, metrics, req, client_addr) - })) + async fn handle_request( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, Infallible> { + info!("{} {} {}", addr, req.method(), req.uri()); + + // Lots of instrumentation + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("Web {} request", req.method())) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; + + // The actual handler + let res = self + .serve_file(&req) + .with_context(Context::current_with_span(span)) + .record_duration(&self.metrics.request_duration, &metrics_tags[..]) + .await; + + // More instrumentation + self.metrics.request_counter.add(1, &metrics_tags[..]); + + // Returning the result + match res { + Ok(res) => { + debug!("{} {} {}", req.method(), res.status(), req.uri()); + Ok(res) + } + Err(error) => { + info!( + "{} {} {} {}", + req.method(), + error.http_status_code(), + req.uri(), + error + ); + self.metrics.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", error.http_status_code().to_string()), + ], + ); + Ok(error_to_res(error)) + } } - }); + } - let server = Server::bind(addr).serve(service); - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("Web server listening on http://{}", addr); + async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> { + // Get http authority string (eg. [::1]:3902 or garage.tld:80) + let authority = req + .headers() + .get(HOST) + .ok_or_bad_request("HOST header required")? + .to_str()?; + + // Get bucket + let host = authority_to_host(authority)?; + + let bucket_name = host_to_bucket(&host, &self.root_domain).unwrap_or(&host); + let bucket_id = self + .garage + .bucket_alias_table + .get(&EmptyKey, &bucket_name.to_string()) + .await? + .and_then(|x| x.state.take()) + .ok_or(Error::NotFound)?; + + // Check bucket isn't deleted and has website access enabled + let bucket = self + .garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .ok_or(Error::NotFound)?; + + let website_config = bucket + .params() + .ok_or(Error::NotFound)? + .website_config + .get() + .as_ref() + .ok_or(Error::NotFound)?; + + // Get path + let path = req.uri().path().to_string(); + let index = &website_config.index_document; + let key = path_to_key(&path, index)?; + + debug!( + "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", + bucket_name, bucket_id, key + ); + + let ret_doc = match *req.method() { + Method::OPTIONS => handle_options_for_bucket(req, &bucket), + Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, + Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + _ => Err(ApiError::bad_request("HTTP method not supported")), + } + .map_err(Error::from); + + match ret_doc { + Err(error) => { + // For a HEAD or OPTIONS method, and for non-4xx errors, + // we don't return the error document as content, + // we return above and just return the error message + // by relying on err_to_res that is called when we return an Err. + if *req.method() == Method::HEAD + || *req.method() == Method::OPTIONS + || !error.http_status_code().is_client_error() + { + return Err(error); + } - graceful.await?; - Ok(()) -} + // If no error document is set: just return the error directly + let error_document = match &website_config.error_document { + Some(ed) => ed.trim_start_matches('/').to_owned(), + None => return Err(error), + }; + + // We want to return the error document + // Create a fake HTTP request with path = the error document + let req2 = Request::builder() + .uri(format!("http://{}/{}", host, &error_document)) + .body(Body::empty()) + .unwrap(); + + match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await + { + Ok(mut error_doc) => { + // The error won't be logged back in handle_request, + // so log it here + info!( + "{} {} {} {}", + req.method(), + req.uri(), + error.http_status_code(), + error + ); + + *error_doc.status_mut() = error.http_status_code(); + error.add_headers(error_doc.headers_mut()); + + // Preserve error message in a special header + for error_line in error.to_string().split('\n') { + if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { + error_doc.headers_mut().append("X-Garage-Error", v); + } + } -async fn handle_request( - garage: Arc<Garage>, - metrics: Arc<WebMetrics>, - req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, Infallible> { - info!("{} {} {}", addr, req.method(), req.uri()); - - // Lots of instrumentation - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("Web {} request", req.method())) - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; - - // The actual handler - let res = serve_file(garage, &req) - .with_context(Context::current_with_span(span)) - .record_duration(&metrics.request_duration, &metrics_tags[..]) - .await; - - // More instrumentation - metrics.request_counter.add(1, &metrics_tags[..]); - - // Returning the result - match res { - Ok(res) => { - debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) - } - Err(error) => { - info!( - "{} {} {} {}", - req.method(), - error.http_status_code(), - req.uri(), - error - ); - metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", error.http_status_code().to_string()), - ], - ); - Ok(error_to_res(error)) + Ok(error_doc) + } + Err(error_doc_error) => { + warn!( + "Couldn't get error document {} for bucket {:?}: {}", + error_document, bucket_id, error_doc_error + ); + Err(error) + } + } + } + Ok(mut resp) => { + // Maybe add CORS headers + if let Some(rule) = find_matching_cors_rule(&bucket, req)? { + add_cors_headers(&mut resp, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + Ok(resp) + } } } } @@ -160,129 +296,6 @@ fn error_to_res(e: Error) -> Response<Body> { http_error } -async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response<Body>, Error> { - // Get http authority string (eg. [::1]:3902 or garage.tld:80) - let authority = req - .headers() - .get(HOST) - .ok_or_bad_request("HOST header required")? - .to_str()?; - - // Get bucket - let host = authority_to_host(authority)?; - let root = &garage.config.s3_web.root_domain; - - let bucket_name = host_to_bucket(&host, root).unwrap_or(&host); - let bucket_id = garage - .bucket_alias_table - .get(&EmptyKey, &bucket_name.to_string()) - .await? - .and_then(|x| x.state.take()) - .ok_or(Error::NotFound)?; - - // Check bucket isn't deleted and has website access enabled - let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NotFound)?; - - let website_config = bucket - .params() - .ok_or(Error::NotFound)? - .website_config - .get() - .as_ref() - .ok_or(Error::NotFound)?; - - // Get path - let path = req.uri().path().to_string(); - let index = &website_config.index_document; - let key = path_to_key(&path, index)?; - - debug!( - "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", - bucket_name, bucket_id, key - ); - - let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await, - _ => Err(ApiError::bad_request("HTTP method not supported")), - } - .map_err(Error::from); - - match ret_doc { - Err(error) => { - // For a HEAD or OPTIONS method, and for non-4xx errors, - // we don't return the error document as content, - // we return above and just return the error message - // by relying on err_to_res that is called when we return an Err. - if *req.method() == Method::HEAD - || *req.method() == Method::OPTIONS - || !error.http_status_code().is_client_error() - { - return Err(error); - } - - // If no error document is set: just return the error directly - let error_document = match &website_config.error_document { - Some(ed) => ed.trim_start_matches('/').to_owned(), - None => return Err(error), - }; - - // We want to return the error document - // Create a fake HTTP request with path = the error document - let req2 = Request::builder() - .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) - .unwrap(); - - match handle_get(garage, &req2, bucket_id, &error_document, None).await { - Ok(mut error_doc) => { - // The error won't be logged back in handle_request, - // so log it here - info!( - "{} {} {} {}", - req.method(), - req.uri(), - error.http_status_code(), - error - ); - - *error_doc.status_mut() = error.http_status_code(); - error.add_headers(error_doc.headers_mut()); - - // Preserve error message in a special header - for error_line in error.to_string().split('\n') { - if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) { - error_doc.headers_mut().append("X-Garage-Error", v); - } - } - - Ok(error_doc) - } - Err(error_doc_error) => { - warn!( - "Couldn't get error document {} for bucket {:?}: {}", - error_document, bucket_id, error_doc_error - ); - Err(error) - } - } - } - Ok(mut resp) => { - // Maybe add CORS headers - if let Some(rule) = find_matching_cors_rule(&bucket, req)? { - add_cors_headers(&mut resp, rule) - .ok_or_internal_error("Invalid bucket CORS configuration")?; - } - Ok(resp) - } - } -} - /// Path to key /// /// Convert the provided path to the internal key |