From 3999723308da10e564c4634997c6ecf63f2839d4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 11:40:13 +0200 Subject: update df-consul to fix some issues --- src/cert_store.rs | 4 ++-- src/main.rs | 2 +- src/proxy_config.rs | 25 ++++++++++++++----------- 3 files changed, 17 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/cert_store.rs b/src/cert_store.rs index 2d3fb90..abbc83d 100644 --- a/src/cert_store.rs +++ b/src/cert_store.rs @@ -15,7 +15,7 @@ use acme_micro::{Directory, DirectoryUrl}; use rustls::sign::CertifiedKey; use crate::cert::{Cert, CertSer}; -use crate::consul::*; +use crate::consul::{self, Consul}; use crate::proxy_config::*; pub struct CertStore { @@ -196,7 +196,7 @@ impl CertStore { let lock_name = format!("tricot/renew:{}@{}", domain, self.node_name); let session = self .consul - .create_session(&ConsulSessionRequest { + .create_session(&consul::locking::SessionRequest { name: lock_name.clone(), node: None, lock_delay: Some("30m".into()), diff --git a/src/main.rs b/src/main.rs index c97bddf..c148a91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -131,7 +131,7 @@ async fn main() { let metrics_server = metrics::MetricsServer::init(opt.metrics_bind_addr); - let consul_config = consul::ConsulConfig { + let consul_config = consul::Config { addr: opt.consul_addr.clone(), ca_cert: opt.consul_ca_cert.clone(), tls_skip_verify: opt.consul_tls_skip_verify, diff --git a/src/proxy_config.rs b/src/proxy_config.rs index 3bf6903..b4fab90 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -12,7 +12,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use log::*; use tokio::{select, sync::watch, time::sleep}; -use crate::consul::*; +use crate::consul; // ---- Extract proxy config from Consul catalog ---- @@ -196,7 +196,7 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> { } fn parse_consul_catalog( - catalog: &ConsulNodeCatalog, + catalog: &consul::catalog::CatalogNode, same_node: bool, same_site: bool, ) -> Vec { @@ -262,12 +262,12 @@ fn parse_consul_catalog( #[derive(Default)] struct NodeWatchState { last_idx: Option, - last_catalog: Option, + last_catalog: Option, retries: u32, } pub fn spawn_proxy_config_task( - consul: Consul, + consul: consul::Consul, local_node: String, mut must_exit: watch::Receiver, ) -> watch::Receiver> { @@ -286,14 +286,14 @@ pub fn spawn_proxy_config_task( while !*must_exit.borrow() { let list_nodes = select! { - ln = consul.list_nodes() => ln, + ln = consul.catalog_node_list(None) => ln, _ = must_exit.changed() => continue, }; match list_nodes { Ok(consul_nodes) => { info!("Watched consul nodes: {:?}", consul_nodes); - for consul_node in consul_nodes { + for consul_node in consul_nodes.into_inner() { let node = &consul_node.node; if !nodes.contains_key(node) { nodes.insert(node.clone(), NodeWatchState::default()); @@ -302,7 +302,7 @@ pub fn spawn_proxy_config_task( let consul = consul.clone(); watches.push(Box::pin(async move { - let res = consul.watch_node(&node, None).await; + let res = consul.catalog_node(&node, None).await; (node, res) })); } @@ -331,16 +331,19 @@ pub fn spawn_proxy_config_task( }; match res { - Ok((catalog, new_idx)) => { + Ok(res) => { + let new_idx = res.index(); + let catalog = res.into_inner(); + let mut watch_state = nodes.get_mut(&node).unwrap(); watch_state.last_idx = Some(new_idx); - watch_state.last_catalog = Some(catalog); + watch_state.last_catalog = catalog; watch_state.retries = 0; let idx = watch_state.last_idx; let consul = consul.clone(); watches.push(Box::pin(async move { - let res = consul.watch_node(&node, idx).await; + let res = consul.catalog_node(&node, idx).await; (node, res) })); } @@ -361,7 +364,7 @@ pub fn spawn_proxy_config_task( let consul = consul.clone(); watches.push(Box::pin(async move { sleep(will_retry_in).await; - let res = consul.watch_node(&node, None).await; + let res = consul.catalog_node(&node, None).await; (node, res) })); continue; -- cgit v1.2.3