diff options
author | Alex Auvolat <alex@adnab.me> | 2022-01-13 11:31:08 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-13 11:36:52 +0100 |
commit | ced324bc876eb592c0ed4aea889827c0000cf9ad (patch) | |
tree | f9c7fb31a0328ff28ad010ad706c245741a345c4 | |
parent | c030c4764570974b09d2387854b30538e81e7b86 (diff) | |
download | tricot-docker-28.tar.gz tricot-docker-28.zip |
Make nodes aware of where they are and use that to priorize backendsdocker-28
-rw-r--r-- | src/consul.rs | 6 | ||||
-rw-r--r-- | src/https.rs | 2 | ||||
-rw-r--r-- | src/proxy_config.rs | 59 |
3 files changed, 56 insertions, 11 deletions
diff --git a/src/consul.rs b/src/consul.rs index 14df1ee..cba435a 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -23,6 +23,8 @@ pub struct ConsulNode { pub node: String, #[serde(rename = "Address")] pub address: String, + #[serde(rename = "Meta")] + pub meta: HashMap<String, String>, } #[derive(Serialize, Deserialize, Debug)] @@ -117,14 +119,14 @@ impl Consul { }) } - pub async fn list_nodes(&self) -> Result<Vec<String>> { + pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> { debug!("list_nodes"); let url = format!("{}/v1/catalog/nodes", self.url); let http = self.client.get(&url).send().await?; let resp: Vec<ConsulNode> = http.json().await?; - Ok(resp.into_iter().map(|n| n.node).collect::<Vec<_>>()) + Ok(resp) } pub async fn watch_node( diff --git a/src/https.rs b/src/https.rs index a9f2cc1..34e3f85 100644 --- a/src/https.rs +++ b/src/https.rs @@ -137,6 +137,8 @@ async fn handle( .as_ref() .map(|x| x.len() as i32) .unwrap_or(0), + ent.same_node, + ent.same_site, -ent.calls.load(Ordering::SeqCst), ) }); diff --git a/src/proxy_config.rs b/src/proxy_config.rs index deb2ffe..a64e5aa 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -40,12 +40,27 @@ impl HostDescription { #[derive(Debug)] pub struct ProxyEntry { - pub target_addr: SocketAddr, - pub https_target: bool, - + /// Publicly exposed TLS hostnames for matching this rule pub host: HostDescription, + /// Path prefix for matching this rule pub path_prefix: Option<String>, + /// Priority with which this rule is considered (highest first) pub priority: u32, + + /// Node address (ip+port) to handle requests that match this entry + pub target_addr: SocketAddr, + /// Is the target serving HTTPS instead of HTTP? + pub https_target: bool, + + /// Is the target the same node as we are running on? + /// (if yes priorize it over other matching targets) + pub same_node: bool, + /// Is the target the same site as this node? + /// (if yes priorize it over other matching targets) + pub same_site: bool, + + /// Add the following headers to all responses returned + /// when matching this rule pub add_headers: Vec<(String, String)>, // Counts the number of times this proxy server has been called to @@ -70,6 +85,11 @@ impl std::fmt::Display for ProxyEntry { self.path_prefix.as_ref().unwrap_or(&String::new()), self.priority )?; + if self.same_node { + write!(f, " OURSELF")?; + } else if self.same_site { + write!(f, " SAME_SITE")?; + } if !self.add_headers.is_empty() { write!(f, " +Headers: {:?}", self.add_headers)?; } @@ -96,6 +116,8 @@ fn parse_tricot_tag( tag: &str, target_addr: SocketAddr, add_headers: &[(String, String)], + same_node: bool, + same_site: bool, ) -> Option<ProxyEntry> { let splits = tag.split(' ').collect::<Vec<_>>(); if (splits.len() != 2 && splits.len() != 3) @@ -129,6 +151,8 @@ fn parse_tricot_tag( target_addr, https_target: (splits[0] == "tricot-https"), host, + same_node, + same_site, path_prefix, priority, add_headers: add_headers.to_vec(), @@ -145,7 +169,11 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> { } } -fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> { +fn parse_consul_catalog( + catalog: &ConsulNodeCatalog, + same_node: bool, + same_site: bool, +) -> Vec<ProxyEntry> { trace!("Parsing node catalog: {:#?}", catalog); let mut entries = vec![]; @@ -174,7 +202,7 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> { } for tag in svc.tags.iter() { - if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..]) { + if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..], same_node, same_site) { entries.push(ent); } } @@ -206,12 +234,15 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi let mut nodes = HashMap::new(); let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); + let mut node_site = HashMap::new(); + loop { match consul.list_nodes().await { Ok(consul_nodes) => { info!("Watched consul nodes: {:?}", consul_nodes); - for node in consul_nodes { - if !nodes.contains_key(&node) { + for consul_node in consul_nodes { + let node = &consul_node.node; + if !nodes.contains_key(node) { nodes.insert(node.clone(), NodeWatchState::default()); let node = node.to_string(); @@ -222,6 +253,9 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi (node, res) })); } + if let Some(site) = consul_node.meta.get("site") { + node_site.insert(node.clone(), site.clone()); + } } } Err(e) => { @@ -277,9 +311,16 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi } let mut entries = vec![]; - for (_, watch_state) in nodes.iter() { + for (node_name, watch_state) in nodes.iter() { if let Some(catalog) = &watch_state.last_catalog { - entries.extend(parse_consul_catalog(catalog)); + let same_node = *node_name == consul.local_node; + let same_site = + match (node_site.get(node_name), node_site.get(&consul.local_node)) { + (Some(s1), Some(s2)) => s1 == s2, + _ => false, + }; + + entries.extend(parse_consul_catalog(catalog, same_node, same_site)); } } let config = ProxyConfig { entries }; |