aboutsummaryrefslogtreecommitdiff
path: root/src/consul_actor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/consul_actor.rs')
-rw-r--r--src/consul_actor.rs48
1 files changed, 40 insertions, 8 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index b26c2dd..163334d 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -1,16 +1,28 @@
-use crate::consul::Consul;
-use tokio::sync::watch;
-use tokio::time::delay_for;
-use crate::messages;
-use anyhow::Result;
use std::cmp;
use std::time::Duration;
use log::*;
+use tokio::sync::watch;
+use tokio::time::delay_for;
+use anyhow::Result;
+use serde::{Serialize, Deserialize};
+use serde_lexpr::{from_str,to_string,error};
+use crate::messages;
+use crate::consul;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum DiplonatParameter {
+ port(Vec<u16>)
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum DiplonatConsul {
+ diplonat(Vec<DiplonatParameter>)
+}
pub struct ConsulActor {
pub rx_open_ports: watch::Receiver<messages::OpenPorts>,
- consul: Consul,
+ consul: consul::Consul,
node: String,
retries: u32,
tx_open_ports: watch::Sender<messages::OpenPorts>
@@ -22,11 +34,31 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64))
}
+fn from_catalog_to_open_ports(catalog: &consul::CatalogNode) -> messages::OpenPorts {
+ let mut op = messages::OpenPorts { ports: Vec::new() };
+ for (_, service_info) in &catalog.Services {
+ for tag in &service_info.Tags {
+ let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
+ match diplo_conf {
+ Ok(conf) => {
+ let DiplonatConsul::diplonat(c) = conf;
+ for parameter in &c {
+ let DiplonatParameter::port(p) = parameter;
+ op.ports.extend(p);
+ }
+ }
+ Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
+ };
+ }
+ }
+ return op;
+}
+
impl ConsulActor {
pub fn new(url: &str, node: &str) -> Self {
let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() });
return Self {
- consul: Consul::new(url),
+ consul: consul::Consul::new(url),
rx_open_ports: rx,
tx_open_ports: tx,
node: node.to_string(),
@@ -47,7 +79,7 @@ impl ConsulActor {
}
};
- info!("{:#?}", catalog);
+ info!("{:#?}", from_catalog_to_open_ports(&catalog));
}
}
}