diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/consul.rs | 4 | ||||
-rw-r--r-- | src/consul_actor.rs | 48 |
2 files changed, 42 insertions, 10 deletions
diff --git a/src/consul.rs b/src/consul.rs index b06f62d..01dff46 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -4,12 +4,12 @@ use anyhow::{Result, anyhow}; #[derive(Serialize, Deserialize, Debug)] pub struct ServiceEntry { - Tags: Vec<String> + pub Tags: Vec<String> } #[derive(Serialize, Deserialize, Debug)] pub struct CatalogNode { - Services: HashMap<String, ServiceEntry> + pub Services: HashMap<String, ServiceEntry> } pub struct Consul { diff --git a/src/consul_actor.rs b/src/consul_actor.rs index b26c2dd..163334d 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -1,16 +1,28 @@ -use crate::consul::Consul; -use tokio::sync::watch; -use tokio::time::delay_for; -use crate::messages; -use anyhow::Result; use std::cmp; use std::time::Duration; use log::*; +use tokio::sync::watch; +use tokio::time::delay_for; +use anyhow::Result; +use serde::{Serialize, Deserialize}; +use serde_lexpr::{from_str,to_string,error}; +use crate::messages; +use crate::consul; + +#[derive(Serialize, Deserialize, Debug)] +pub enum DiplonatParameter { + port(Vec<u16>) +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum DiplonatConsul { + diplonat(Vec<DiplonatParameter>) +} pub struct ConsulActor { pub rx_open_ports: watch::Receiver<messages::OpenPorts>, - consul: Consul, + consul: consul::Consul, node: String, retries: u32, tx_open_ports: watch::Sender<messages::OpenPorts> @@ -22,11 +34,31 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration { return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64)) } +fn from_catalog_to_open_ports(catalog: &consul::CatalogNode) -> messages::OpenPorts { + let mut op = messages::OpenPorts { ports: Vec::new() }; + for (_, service_info) in &catalog.Services { + for tag in &service_info.Tags { + let diplo_conf: error::Result<DiplonatConsul> = from_str(tag); + match diplo_conf { + Ok(conf) => { + let DiplonatConsul::diplonat(c) = conf; + for parameter in &c { + let DiplonatParameter::port(p) = parameter; + op.ports.extend(p); + } + } + Err(e) => debug!("Failed to parse entry {}. {}", tag, e), + }; + } + } + return op; +} + impl ConsulActor { pub fn new(url: &str, node: &str) -> Self { let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() }); return Self { - consul: Consul::new(url), + consul: consul::Consul::new(url), rx_open_ports: rx, tx_open_ports: tx, node: node.to_string(), @@ -47,7 +79,7 @@ impl ConsulActor { } }; - info!("{:#?}", catalog); + info!("{:#?}", from_catalog_to_open_ports(&catalog)); } } } |