diff options
author | Alex Auvolat <alex@adnab.me> | 2021-12-07 17:56:15 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-12-07 17:56:15 +0100 |
commit | 0682c74e9d5083b43b3f83f8bb1ca747658d1455 (patch) | |
tree | 6532eba8bf707c9d32bdbc6535ccc082e2229096 /src/proxy_config.rs | |
parent | bb77e7c459a624bb2b4ea043145fd6ea75771105 (diff) | |
download | tricot-0682c74e9d5083b43b3f83f8bb1ca747658d1455.tar.gz tricot-0682c74e9d5083b43b3f83f8bb1ca747658d1455.zip |
Watch multiple consul nodes
Diffstat (limited to 'src/proxy_config.rs')
-rw-r--r-- | src/proxy_config.rs | 99 |
1 files changed, 85 insertions, 14 deletions
diff --git a/src/proxy_config.rs b/src/proxy_config.rs index ba58484..31a2659 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -1,7 +1,13 @@ use std::net::SocketAddr; use std::sync::{atomic, Arc}; +use std::collections::HashMap; use std::{cmp, time::Duration}; +use anyhow::Result; + +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::future::{BoxFuture}; + use log::*; use tokio::{sync::watch, time::sleep}; @@ -63,7 +69,7 @@ fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option<ProxyEntry> { }) } -fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { +fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> { let mut entries = vec![]; for (_, svc) in catalog.services.iter() { @@ -79,37 +85,102 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { } } - ProxyConfig { entries } + entries } -pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> { +#[derive(Default)] +struct NodeWatchState { + last_idx: Option<usize>, + last_catalog: Option<ConsulNodeCatalog>, + retries: u32, +} + +pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> { let (tx, rx) = watch::channel(Arc::new(ProxyConfig { entries: Vec::new(), })); + + let consul = Arc::new(consul); tokio::spawn(async move { - let mut retries = 0; - let node = consul.local_node.clone(); + let mut nodes = HashMap::new(); + let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); loop { - let catalog = match consul.watch_node(&node).await { - Ok(c) => c, + match consul.list_nodes().await { + Ok(consul_nodes) => { + info!("Watched consul nodes: {:?}", consul_nodes); + for node in consul_nodes { + if !nodes.contains_key(&node) { + nodes.insert(node.clone(), NodeWatchState::default()); + + let node = node.to_string(); + let consul = consul.clone(); + + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, None).await; + (node, res) + })); + } + } + } + Err(e) => { + warn!("Could not get Consul node list: {}", e); + } + } + + let (node, res): (String, Result<_>) = match watches.next().await { + Some(v) => v, + None => { + warn!("No nodes currently watched in proxy_config.rs"); + sleep(Duration::from_secs(10)).await; + continue; + } + }; + + match res { + Ok((catalog, new_idx)) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.last_idx = Some(new_idx); + watch_state.last_catalog = Some(catalog); + watch_state.retries = 0; + + let idx = watch_state.last_idx; + let consul = consul.clone(); + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, idx).await; + (node, res) + })); + } Err(e) => { - consul.watch_node_reset(); - retries = cmp::min(std::u32::MAX - 1, retries) + 1; - let will_retry_in = retry_to_time(retries, Duration::from_secs(600)); + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.retries += 1; + watch_state.last_idx = None; + + let will_retry_in = retry_to_time(watch_state.retries, Duration::from_secs(600)); error!( "Failed to query consul. Will retry in {}s. {}", will_retry_in.as_secs(), e ); - sleep(will_retry_in).await; + + let consul = consul.clone(); + watches.push(Box::pin(async move { + sleep(will_retry_in).await; + let res = consul.watch_node(&node, None).await; + (node, res) + })); continue; } - }; - retries = 0; + } - let config = parse_consul_catalog(&catalog); + let mut entries = vec![]; + for (_, watch_state) in nodes.iter() { + if let Some(catalog) = &watch_state.last_catalog { + entries.extend(parse_consul_catalog(catalog)); + } + } + let config = ProxyConfig { entries }; debug!("Extracted configuration: {:#?}", config); tx.send(Arc::new(config)).expect("Internal error"); |