aboutsummaryrefslogtreecommitdiff
path: root/src/proxy_config.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/proxy_config.rs')
-rw-r--r--src/proxy_config.rs25
1 files changed, 14 insertions, 11 deletions
diff --git a/src/proxy_config.rs b/src/proxy_config.rs
index 3bf6903..b4fab90 100644
--- a/src/proxy_config.rs
+++ b/src/proxy_config.rs
@@ -12,7 +12,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
use log::*;
use tokio::{select, sync::watch, time::sleep};
-use crate::consul::*;
+use crate::consul;
// ---- Extract proxy config from Consul catalog ----
@@ -196,7 +196,7 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> {
}
fn parse_consul_catalog(
- catalog: &ConsulNodeCatalog,
+ catalog: &consul::catalog::CatalogNode,
same_node: bool,
same_site: bool,
) -> Vec<ProxyEntry> {
@@ -262,12 +262,12 @@ fn parse_consul_catalog(
#[derive(Default)]
struct NodeWatchState {
last_idx: Option<usize>,
- last_catalog: Option<ConsulNodeCatalog>,
+ last_catalog: Option<consul::catalog::CatalogNode>,
retries: u32,
}
pub fn spawn_proxy_config_task(
- consul: Consul,
+ consul: consul::Consul,
local_node: String,
mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<ProxyConfig>> {
@@ -286,14 +286,14 @@ pub fn spawn_proxy_config_task(
while !*must_exit.borrow() {
let list_nodes = select! {
- ln = consul.list_nodes() => ln,
+ ln = consul.catalog_node_list(None) => ln,
_ = must_exit.changed() => continue,
};
match list_nodes {
Ok(consul_nodes) => {
info!("Watched consul nodes: {:?}", consul_nodes);
- for consul_node in consul_nodes {
+ for consul_node in consul_nodes.into_inner() {
let node = &consul_node.node;
if !nodes.contains_key(node) {
nodes.insert(node.clone(), NodeWatchState::default());
@@ -302,7 +302,7 @@ pub fn spawn_proxy_config_task(
let consul = consul.clone();
watches.push(Box::pin(async move {
- let res = consul.watch_node(&node, None).await;
+ let res = consul.catalog_node(&node, None).await;
(node, res)
}));
}
@@ -331,16 +331,19 @@ pub fn spawn_proxy_config_task(
};
match res {
- Ok((catalog, new_idx)) => {
+ Ok(res) => {
+ let new_idx = res.index();
+ let catalog = res.into_inner();
+
let mut watch_state = nodes.get_mut(&node).unwrap();
watch_state.last_idx = Some(new_idx);
- watch_state.last_catalog = Some(catalog);
+ watch_state.last_catalog = catalog;
watch_state.retries = 0;
let idx = watch_state.last_idx;
let consul = consul.clone();
watches.push(Box::pin(async move {
- let res = consul.watch_node(&node, idx).await;
+ let res = consul.catalog_node(&node, idx).await;
(node, res)
}));
}
@@ -361,7 +364,7 @@ pub fn spawn_proxy_config_task(
let consul = consul.clone();
watches.push(Box::pin(async move {
sleep(will_retry_in).await;
- let res = consul.watch_node(&node, None).await;
+ let res = consul.catalog_node(&node, None).await;
(node, res)
}));
continue;