From 453b1c684e2c92fad41ad50fa7a2c9a184f960c5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 22 May 2020 14:17:48 +0200 Subject: Working parsing --- Cargo.lock | 34 ++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + README.md | 2 +- src/consul.rs | 4 ++-- src/consul_actor.rs | 48 ++++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 78 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 11ae2f9..b51fc45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,6 +131,7 @@ dependencies = [ "pretty_env_logger", "reqwest", "serde", + "serde-lexpr", "serde_json", "tokio", ] @@ -522,6 +523,29 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexpr" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab101919962a12ffdaf7170f0943715a9d47f35a9753986028c305183bbf1a6" +dependencies = [ + "itoa", + "lexpr-macros", + "proc-macro-hack", + "ryu", +] + +[[package]] +name = "lexpr-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd627fb38e19c00d8d068618259205f7a91c91aeade5c15bc35dbca037bb1c35" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", +] + [[package]] name = "libc" version = "0.2.66" @@ -974,6 +998,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-lexpr" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "143e5e658ac3a7374bdf285b9355cab74dd144293b86c9be27eab39452239d41" +dependencies = [ + "lexpr", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.107" diff --git a/Cargo.toml b/Cargo.toml index 14e6488..ce0fbc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,5 @@ tokio = "0.2.11" futures = "0.3.5" serde = { version = "1.0.107", features = ["derive"] } serde_json = "1.0.53" +serde-lexpr = "0.1.1" anyhow = "1.0.28" diff --git a/README.md b/README.md index 1911569..adddc2f 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,6 @@ To test the Consul Catalog part, you can do: ```bash consul agent -dev #in a separate terminal, if not already running -consul services register -name=example -port=1337 -tag="diplonat_pub_port=1337" +consul services register -name=example -port=1337 -tag="(diplonat ((port 1337) (port 1338)))" consul services -id=example ``` diff --git a/src/consul.rs b/src/consul.rs index b06f62d..01dff46 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -4,12 +4,12 @@ use anyhow::{Result, anyhow}; #[derive(Serialize, Deserialize, Debug)] pub struct ServiceEntry { - Tags: Vec + pub Tags: Vec } #[derive(Serialize, Deserialize, Debug)] pub struct CatalogNode { - Services: HashMap + pub Services: HashMap } pub struct Consul { diff --git a/src/consul_actor.rs b/src/consul_actor.rs index b26c2dd..163334d 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -1,16 +1,28 @@ -use crate::consul::Consul; -use tokio::sync::watch; -use tokio::time::delay_for; -use crate::messages; -use anyhow::Result; use std::cmp; use std::time::Duration; use log::*; +use tokio::sync::watch; +use tokio::time::delay_for; +use anyhow::Result; +use serde::{Serialize, Deserialize}; +use serde_lexpr::{from_str,to_string,error}; +use crate::messages; +use crate::consul; + +#[derive(Serialize, Deserialize, Debug)] +pub enum DiplonatParameter { + port(Vec) +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum DiplonatConsul { + diplonat(Vec) +} pub struct ConsulActor { pub rx_open_ports: watch::Receiver, - consul: Consul, + consul: consul::Consul, node: String, retries: u32, tx_open_ports: watch::Sender @@ -22,11 +34,31 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration { return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64)) } +fn from_catalog_to_open_ports(catalog: &consul::CatalogNode) -> messages::OpenPorts { + let mut op = messages::OpenPorts { ports: Vec::new() }; + for (_, service_info) in &catalog.Services { + for tag in &service_info.Tags { + let diplo_conf: error::Result = from_str(tag); + match diplo_conf { + Ok(conf) => { + let DiplonatConsul::diplonat(c) = conf; + for parameter in &c { + let DiplonatParameter::port(p) = parameter; + op.ports.extend(p); + } + } + Err(e) => debug!("Failed to parse entry {}. {}", tag, e), + }; + } + } + return op; +} + impl ConsulActor { pub fn new(url: &str, node: &str) -> Self { let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() }); return Self { - consul: Consul::new(url), + consul: consul::Consul::new(url), rx_open_ports: rx, tx_open_ports: tx, node: node.to_string(), @@ -47,7 +79,7 @@ impl ConsulActor { } }; - info!("{:#?}", catalog); + info!("{:#?}", from_catalog_to_open_ports(&catalog)); } } } -- cgit v1.2.3