From 8d1162f20694d5d8551879e7ba9b34c817f0caed Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Dec 2022 14:02:32 +0100 Subject: Change scheduling algo to deprioritize slow backends + refactoring --- src/https.rs | 139 +++++++++++++++++++++++++++++----------------------- src/main.rs | 2 + src/proxy_config.rs | 13 ++--- 3 files changed, 87 insertions(+), 67 deletions(-) (limited to 'src') diff --git a/src/https.rs b/src/https.rs index 807dbb8..ce9c61f 100644 --- a/src/https.rs +++ b/src/https.rs @@ -24,7 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use opentelemetry::{metrics, KeyValue}; use crate::cert_store::{CertStore, StoreResolver}; -use crate::proxy_config::ProxyConfig; +use crate::proxy_config::{ProxyConfig, ProxyEntry}; use crate::reverse_proxy; const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600); @@ -33,6 +33,7 @@ pub struct HttpsConfig { pub bind_addr: SocketAddr, pub enable_compression: bool, pub compress_mime_types: Vec, + pub time_origin: Instant, } struct HttpsMetrics { @@ -110,7 +111,13 @@ pub async fn serve_https( let proxy_config: Arc = rx_proxy_config.borrow().clone(); let metrics = metrics.clone(); - handle_outer(remote_addr, req, https_config, proxy_config, metrics) + handle_request( + remote_addr, + req, + https_config, + proxy_config, + metrics, + ) }), ) .with_upgrades(); @@ -145,7 +152,7 @@ pub async fn serve_https( Ok(()) } -async fn handle_outer( +async fn handle_request( remote_addr: SocketAddr, req: Request, https_config: Arc, @@ -169,25 +176,15 @@ async fn handle_outer( ]; metrics.requests_received.add(1, &tags); - let resp = match handle( + let resp = select_target_and_proxy( + &https_config, + &proxy_config, + &metrics, remote_addr, req, - https_config, - proxy_config, &mut tags, - &metrics, ) - .await - { - Err(e) => { - warn!("Handler error: {}", e); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("{}", e))) - .unwrap() - } - Ok(r) => r, - }; + .await; tags.push(KeyValue::new( "status_code", @@ -200,14 +197,14 @@ async fn handle_outer( // Custom echo service, handling two different routes and a // catch-all 404 responder. -async fn handle( +async fn select_target_and_proxy( + https_config: &HttpsConfig, + proxy_config: &ProxyConfig, + metrics: &HttpsMetrics, remote_addr: SocketAddr, req: Request, - https_config: Arc, - proxy_config: Arc, tags: &mut Vec, - metrics: &HttpsMetrics, -) -> Result, anyhow::Error> { +) -> Response { let received_time = Instant::now(); let method = req.method().clone(); @@ -216,13 +213,17 @@ async fn handle( let host = if let Some(auth) = req.uri().authority() { auth.as_str() } else { - req.headers() - .get("host") - .ok_or_else(|| anyhow!("Missing host header"))? - .to_str()? + match req.headers().get("host").map(|x| x.to_str().ok()).flatten() { + Some(host) => host, + None => { + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Missing Host header")) + .unwrap(); + } + } }; let path = req.uri().path(); - let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]); let best_match = proxy_config .entries @@ -244,7 +245,8 @@ async fn handle( .unwrap_or(0), ent.same_node, ent.same_site, - -ent.calls.load(Ordering::SeqCst), + -ent.calls_in_progress.load(Ordering::SeqCst), + -ent.last_call.load(Ordering::SeqCst), ) }); @@ -257,59 +259,74 @@ async fn handle( tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string())); tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string())); - proxy_to.calls.fetch_add(1, Ordering::SeqCst); + proxy_to.last_call.fetch_max( + (received_time - https_config.time_origin).as_millis() as i64, + Ordering::Relaxed, + ); + proxy_to.calls_in_progress.fetch_add(1, Ordering::SeqCst); debug!("{}{} -> {}", host, path, proxy_to); trace!("Request: {:?}", req); - let mut response = if proxy_to.https_target { - let to_addr = format!("https://{}", proxy_to.target_addr); - handle_error(reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await) - } else { - let to_addr = format!("http://{}", proxy_to.target_addr); - handle_error(reverse_proxy::call(remote_addr.ip(), &to_addr, req).await) + let response = match do_proxy(&https_config, remote_addr, req, proxy_to).await { + Ok(resp) => resp, + Err(e) => Response::builder() + .status(StatusCode::BAD_GATEWAY) + .body(Body::from(format!("Proxy error: {}", e))) + .unwrap(), }; + proxy_to.calls_in_progress.fetch_sub(1, Ordering::SeqCst); metrics .request_proxy_duration .record(received_time.elapsed().as_secs_f64(), &tags); - if response.status().is_success() { - // (TODO: maybe we want to add these headers even if it's not a success?) - for (header, value) in proxy_to.add_headers.iter() { - response.headers_mut().insert( - HeaderName::from_bytes(header.as_bytes())?, - HeaderValue::from_str(value)?, - ); - } - } - - if https_config.enable_compression { - response = - try_compress(response, method.clone(), accept_encoding, &https_config).await? - }; - trace!("Final response: {:?}", response); info!("{} {} {}", method, response.status().as_u16(), uri); - Ok(response) + response } else { debug!("{}{} -> NOT FOUND", host, path); info!("{} 404 {}", method, uri); - Ok(Response::builder() + Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::from("No matching proxy entry"))?) + .body(Body::from("No matching proxy entry")) + .unwrap() } } -fn handle_error(resp: Result>) -> Response { - match resp { - Ok(resp) => resp, - Err(e) => Response::builder() - .status(StatusCode::BAD_GATEWAY) - .body(Body::from(format!("Proxy error: {}", e))) - .unwrap(), +async fn do_proxy( + https_config: &HttpsConfig, + remote_addr: SocketAddr, + req: Request, + proxy_to: &ProxyEntry, +) -> Result> { + let method = req.method().clone(); + let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]); + + let mut response = if proxy_to.https_target { + let to_addr = format!("https://{}", proxy_to.target_addr); + reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await? + } else { + let to_addr = format!("http://{}", proxy_to.target_addr); + reverse_proxy::call(remote_addr.ip(), &to_addr, req).await? + }; + + if response.status().is_success() { + // (TODO: maybe we want to add these headers even if it's not a success?) + for (header, value) in proxy_to.add_headers.iter() { + response.headers_mut().insert( + HeaderName::from_bytes(header.as_bytes())?, + HeaderValue::from_str(value)?, + ); + } } + + if https_config.enable_compression { + response = try_compress(response, method, accept_encoding, &https_config).await? + }; + + Ok(response) } async fn try_compress( diff --git a/src/main.rs b/src/main.rs index cb39c49..34c80f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ extern crate anyhow; use log::*; use std::sync::Arc; +use std::time::Instant; use futures::{FutureExt, TryFutureExt}; use std::net::SocketAddr; @@ -175,6 +176,7 @@ async fn main() { .split(',') .map(|x| x.to_string()) .collect(), + time_origin: Instant::now(), }; let https_task = tokio::spawn( diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 2ce462e..ac37229 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -75,10 +75,10 @@ pub struct ProxyEntry { /// when matching this rule pub add_headers: Vec<(String, String)>, - // Counts the number of times this proxy server has been called to - // This implements a round-robin load balancer if there are multiple - // entries for the same host and same path prefix. - pub calls: atomic::AtomicI64, + /// Number of calls in progress, used to deprioritize slow back-ends + pub calls_in_progress: atomic::AtomicI64, + /// Time of last call, used for round-robin selection + pub last_call: atomic::AtomicI64, } impl std::fmt::Display for ProxyEntry { @@ -102,7 +102,7 @@ impl std::fmt::Display for ProxyEntry { if !self.add_headers.is_empty() { write!(f, " +Headers: {:?}", self.add_headers)?; } - write!(f, " ({})", self.calls.load(atomic::Ordering::Relaxed)) + Ok(()) } } @@ -167,7 +167,8 @@ fn parse_tricot_tag( path_prefix, priority, add_headers: add_headers.to_vec(), - calls: atomic::AtomicI64::from(0), + last_call: atomic::AtomicI64::from(0), + calls_in_progress: atomic::AtomicI64::from(0), }) } -- cgit v1.2.3