From 3999723308da10e564c4634997c6ecf63f2839d4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 11:40:13 +0200 Subject: update df-consul to fix some issues --- src/proxy_config.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'src/proxy_config.rs') diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 3bf6903..b4fab90 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -12,7 +12,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use log::*; use tokio::{select, sync::watch, time::sleep}; -use crate::consul::*; +use crate::consul; // ---- Extract proxy config from Consul catalog ---- @@ -196,7 +196,7 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> { } fn parse_consul_catalog( - catalog: &ConsulNodeCatalog, + catalog: &consul::catalog::CatalogNode, same_node: bool, same_site: bool, ) -> Vec { @@ -262,12 +262,12 @@ fn parse_consul_catalog( #[derive(Default)] struct NodeWatchState { last_idx: Option, - last_catalog: Option, + last_catalog: Option, retries: u32, } pub fn spawn_proxy_config_task( - consul: Consul, + consul: consul::Consul, local_node: String, mut must_exit: watch::Receiver, ) -> watch::Receiver> { @@ -286,14 +286,14 @@ pub fn spawn_proxy_config_task( while !*must_exit.borrow() { let list_nodes = select! { - ln = consul.list_nodes() => ln, + ln = consul.catalog_node_list(None) => ln, _ = must_exit.changed() => continue, }; match list_nodes { Ok(consul_nodes) => { info!("Watched consul nodes: {:?}", consul_nodes); - for consul_node in consul_nodes { + for consul_node in consul_nodes.into_inner() { let node = &consul_node.node; if !nodes.contains_key(node) { nodes.insert(node.clone(), NodeWatchState::default()); @@ -302,7 +302,7 @@ pub fn spawn_proxy_config_task( let consul = consul.clone(); watches.push(Box::pin(async move { - let res = consul.watch_node(&node, None).await; + let res = consul.catalog_node(&node, None).await; (node, res) })); } @@ -331,16 +331,19 @@ pub fn spawn_proxy_config_task( }; match res { - Ok((catalog, new_idx)) => { + Ok(res) => { + let new_idx = res.index(); + let catalog = res.into_inner(); + let mut watch_state = nodes.get_mut(&node).unwrap(); watch_state.last_idx = Some(new_idx); - watch_state.last_catalog = Some(catalog); + watch_state.last_catalog = catalog; watch_state.retries = 0; let idx = watch_state.last_idx; let consul = consul.clone(); watches.push(Box::pin(async move { - let res = consul.watch_node(&node, idx).await; + let res = consul.catalog_node(&node, idx).await; (node, res) })); } @@ -361,7 +364,7 @@ pub fn spawn_proxy_config_task( let consul = consul.clone(); watches.push(Box::pin(async move { sleep(will_retry_in).await; - let res = consul.watch_node(&node, None).await; + let res = consul.catalog_node(&node, None).await; (node, res) })); continue; -- cgit v1.2.3