aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-12-06 14:02:32 +0100
committerAlex Auvolat <alex@adnab.me>2022-12-06 14:02:32 +0100
commit8d1162f20694d5d8551879e7ba9b34c817f0caed (patch)
treefdb6f52059554a2e713dff05eff5cbc645eb1a24
parentba5bf133f61c3a56728c2ab73e11abf47ef8348c (diff)
downloadtricot-8d1162f20694d5d8551879e7ba9b34c817f0caed.tar.gz
tricot-8d1162f20694d5d8551879e7ba9b34c817f0caed.zip
Change scheduling algo to deprioritize slow backends + refactoring
-rw-r--r--Dockerfile2
-rw-r--r--src/https.rs139
-rw-r--r--src/main.rs2
-rw-r--r--src/proxy_config.rs13
4 files changed, 88 insertions, 68 deletions
diff --git a/Dockerfile b/Dockerfile
index 3a6ebe7..e1eb70d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM rust:1.58-buster as builder
+FROM rust:1.65-buster as builder
RUN apt-get update && \
apt-get install -y libssl-dev pkg-config
diff --git a/src/https.rs b/src/https.rs
index 807dbb8..ce9c61f 100644
--- a/src/https.rs
+++ b/src/https.rs
@@ -24,7 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
use opentelemetry::{metrics, KeyValue};
use crate::cert_store::{CertStore, StoreResolver};
-use crate::proxy_config::ProxyConfig;
+use crate::proxy_config::{ProxyConfig, ProxyEntry};
use crate::reverse_proxy;
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600);
@@ -33,6 +33,7 @@ pub struct HttpsConfig {
pub bind_addr: SocketAddr,
pub enable_compression: bool,
pub compress_mime_types: Vec<String>,
+ pub time_origin: Instant,
}
struct HttpsMetrics {
@@ -110,7 +111,13 @@ pub async fn serve_https(
let proxy_config: Arc<ProxyConfig> =
rx_proxy_config.borrow().clone();
let metrics = metrics.clone();
- handle_outer(remote_addr, req, https_config, proxy_config, metrics)
+ handle_request(
+ remote_addr,
+ req,
+ https_config,
+ proxy_config,
+ metrics,
+ )
}),
)
.with_upgrades();
@@ -145,7 +152,7 @@ pub async fn serve_https(
Ok(())
}
-async fn handle_outer(
+async fn handle_request(
remote_addr: SocketAddr,
req: Request<Body>,
https_config: Arc<HttpsConfig>,
@@ -169,25 +176,15 @@ async fn handle_outer(
];
metrics.requests_received.add(1, &tags);
- let resp = match handle(
+ let resp = select_target_and_proxy(
+ &https_config,
+ &proxy_config,
+ &metrics,
remote_addr,
req,
- https_config,
- proxy_config,
&mut tags,
- &metrics,
)
- .await
- {
- Err(e) => {
- warn!("Handler error: {}", e);
- Response::builder()
- .status(StatusCode::INTERNAL_SERVER_ERROR)
- .body(Body::from(format!("{}", e)))
- .unwrap()
- }
- Ok(r) => r,
- };
+ .await;
tags.push(KeyValue::new(
"status_code",
@@ -200,14 +197,14 @@ async fn handle_outer(
// Custom echo service, handling two different routes and a
// catch-all 404 responder.
-async fn handle(
+async fn select_target_and_proxy(
+ https_config: &HttpsConfig,
+ proxy_config: &ProxyConfig,
+ metrics: &HttpsMetrics,
remote_addr: SocketAddr,
req: Request<Body>,
- https_config: Arc<HttpsConfig>,
- proxy_config: Arc<ProxyConfig>,
tags: &mut Vec<KeyValue>,
- metrics: &HttpsMetrics,
-) -> Result<Response<Body>, anyhow::Error> {
+) -> Response<Body> {
let received_time = Instant::now();
let method = req.method().clone();
@@ -216,13 +213,17 @@ async fn handle(
let host = if let Some(auth) = req.uri().authority() {
auth.as_str()
} else {
- req.headers()
- .get("host")
- .ok_or_else(|| anyhow!("Missing host header"))?
- .to_str()?
+ match req.headers().get("host").map(|x| x.to_str().ok()).flatten() {
+ Some(host) => host,
+ None => {
+ return Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::from("Missing Host header"))
+ .unwrap();
+ }
+ }
};
let path = req.uri().path();
- let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]);
let best_match = proxy_config
.entries
@@ -244,7 +245,8 @@ async fn handle(
.unwrap_or(0),
ent.same_node,
ent.same_site,
- -ent.calls.load(Ordering::SeqCst),
+ -ent.calls_in_progress.load(Ordering::SeqCst),
+ -ent.last_call.load(Ordering::SeqCst),
)
});
@@ -257,59 +259,74 @@ async fn handle(
tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string()));
tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string()));
- proxy_to.calls.fetch_add(1, Ordering::SeqCst);
+ proxy_to.last_call.fetch_max(
+ (received_time - https_config.time_origin).as_millis() as i64,
+ Ordering::Relaxed,
+ );
+ proxy_to.calls_in_progress.fetch_add(1, Ordering::SeqCst);
debug!("{}{} -> {}", host, path, proxy_to);
trace!("Request: {:?}", req);
- let mut response = if proxy_to.https_target {
- let to_addr = format!("https://{}", proxy_to.target_addr);
- handle_error(reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await)
- } else {
- let to_addr = format!("http://{}", proxy_to.target_addr);
- handle_error(reverse_proxy::call(remote_addr.ip(), &to_addr, req).await)
+ let response = match do_proxy(&https_config, remote_addr, req, proxy_to).await {
+ Ok(resp) => resp,
+ Err(e) => Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body(Body::from(format!("Proxy error: {}", e)))
+ .unwrap(),
};
+ proxy_to.calls_in_progress.fetch_sub(1, Ordering::SeqCst);
metrics
.request_proxy_duration
.record(received_time.elapsed().as_secs_f64(), &tags);
- if response.status().is_success() {
- // (TODO: maybe we want to add these headers even if it's not a success?)
- for (header, value) in proxy_to.add_headers.iter() {
- response.headers_mut().insert(
- HeaderName::from_bytes(header.as_bytes())?,
- HeaderValue::from_str(value)?,
- );
- }
- }
-
- if https_config.enable_compression {
- response =
- try_compress(response, method.clone(), accept_encoding, &https_config).await?
- };
-
trace!("Final response: {:?}", response);
info!("{} {} {}", method, response.status().as_u16(), uri);
- Ok(response)
+ response
} else {
debug!("{}{} -> NOT FOUND", host, path);
info!("{} 404 {}", method, uri);
- Ok(Response::builder()
+ Response::builder()
.status(StatusCode::NOT_FOUND)
- .body(Body::from("No matching proxy entry"))?)
+ .body(Body::from("No matching proxy entry"))
+ .unwrap()
}
}
-fn handle_error(resp: Result<Response<Body>>) -> Response<Body> {
- match resp {
- Ok(resp) => resp,
- Err(e) => Response::builder()
- .status(StatusCode::BAD_GATEWAY)
- .body(Body::from(format!("Proxy error: {}", e)))
- .unwrap(),
+async fn do_proxy(
+ https_config: &HttpsConfig,
+ remote_addr: SocketAddr,
+ req: Request<Body>,
+ proxy_to: &ProxyEntry,
+) -> Result<Response<Body>> {
+ let method = req.method().clone();
+ let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]);
+
+ let mut response = if proxy_to.https_target {
+ let to_addr = format!("https://{}", proxy_to.target_addr);
+ reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await?
+ } else {
+ let to_addr = format!("http://{}", proxy_to.target_addr);
+ reverse_proxy::call(remote_addr.ip(), &to_addr, req).await?
+ };
+
+ if response.status().is_success() {
+ // (TODO: maybe we want to add these headers even if it's not a success?)
+ for (header, value) in proxy_to.add_headers.iter() {
+ response.headers_mut().insert(
+ HeaderName::from_bytes(header.as_bytes())?,
+ HeaderValue::from_str(value)?,
+ );
+ }
}
+
+ if https_config.enable_compression {
+ response = try_compress(response, method, accept_encoding, &https_config).await?
+ };
+
+ Ok(response)
}
async fn try_compress(
diff --git a/src/main.rs b/src/main.rs
index cb39c49..34c80f8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,6 +3,7 @@ extern crate anyhow;
use log::*;
use std::sync::Arc;
+use std::time::Instant;
use futures::{FutureExt, TryFutureExt};
use std::net::SocketAddr;
@@ -175,6 +176,7 @@ async fn main() {
.split(',')
.map(|x| x.to_string())
.collect(),
+ time_origin: Instant::now(),
};
let https_task = tokio::spawn(
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index 2ce462e..ac37229 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -75,10 +75,10 @@ pub struct ProxyEntry {
/// when matching this rule
pub add_headers: Vec<(String, String)>,
- // Counts the number of times this proxy server has been called to
- // This implements a round-robin load balancer if there are multiple
- // entries for the same host and same path prefix.
- pub calls: atomic::AtomicI64,
+ /// Number of calls in progress, used to deprioritize slow back-ends
+ pub calls_in_progress: atomic::AtomicI64,
+ /// Time of last call, used for round-robin selection
+ pub last_call: atomic::AtomicI64,
}
impl std::fmt::Display for ProxyEntry {
@@ -102,7 +102,7 @@ impl std::fmt::Display for ProxyEntry {
if !self.add_headers.is_empty() {
write!(f, " +Headers: {:?}", self.add_headers)?;
}
- write!(f, " ({})", self.calls.load(atomic::Ordering::Relaxed))
+ Ok(())
}
}
@@ -167,7 +167,8 @@ fn parse_tricot_tag(
path_prefix,
priority,
add_headers: add_headers.to_vec(),
- calls: atomic::AtomicI64::from(0),
+ last_call: atomic::AtomicI64::from(0),
+ calls_in_progress: atomic::AtomicI64::from(0),
})
}