aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock34
-rw-r--r--Cargo.toml1
-rw-r--r--README.md2
-rw-r--r--src/consul.rs4
-rw-r--r--src/consul_actor.rs48
5 files changed, 78 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 11ae2f9..b51fc45 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -131,6 +131,7 @@ dependencies = [
"pretty_env_logger",
"reqwest",
"serde",
+ "serde-lexpr",
"serde_json",
"tokio",
]
@@ -523,6 +524,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
+name = "lexpr"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ab101919962a12ffdaf7170f0943715a9d47f35a9753986028c305183bbf1a6"
+dependencies = [
+ "itoa",
+ "lexpr-macros",
+ "proc-macro-hack",
+ "ryu",
+]
+
+[[package]]
+name = "lexpr-macros"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd627fb38e19c00d8d068618259205f7a91c91aeade5c15bc35dbca037bb1c35"
+dependencies = [
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+]
+
+[[package]]
name = "libc"
version = "0.2.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -975,6 +999,16 @@ dependencies = [
]
[[package]]
+name = "serde-lexpr"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "143e5e658ac3a7374bdf285b9355cab74dd144293b86c9be27eab39452239d41"
+dependencies = [
+ "lexpr",
+ "serde",
+]
+
+[[package]]
name = "serde_derive"
version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 14e6488..ce0fbc3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,4 +15,5 @@ tokio = "0.2.11"
futures = "0.3.5"
serde = { version = "1.0.107", features = ["derive"] }
serde_json = "1.0.53"
+serde-lexpr = "0.1.1"
anyhow = "1.0.28"
diff --git a/README.md b/README.md
index 1911569..adddc2f 100644
--- a/README.md
+++ b/README.md
@@ -59,6 +59,6 @@ To test the Consul Catalog part, you can do:
```bash
consul agent -dev #in a separate terminal, if not already running
-consul services register -name=example -port=1337 -tag="diplonat_pub_port=1337"
+consul services register -name=example -port=1337 -tag="(diplonat ((port 1337) (port 1338)))"
consul services -id=example
```
diff --git a/src/consul.rs b/src/consul.rs
index b06f62d..01dff46 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -4,12 +4,12 @@ use anyhow::{Result, anyhow};
#[derive(Serialize, Deserialize, Debug)]
pub struct ServiceEntry {
- Tags: Vec<String>
+ pub Tags: Vec<String>
}
#[derive(Serialize, Deserialize, Debug)]
pub struct CatalogNode {
- Services: HashMap<String, ServiceEntry>
+ pub Services: HashMap<String, ServiceEntry>
}
pub struct Consul {
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index b26c2dd..163334d 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -1,16 +1,28 @@
-use crate::consul::Consul;
-use tokio::sync::watch;
-use tokio::time::delay_for;
-use crate::messages;
-use anyhow::Result;
use std::cmp;
use std::time::Duration;
use log::*;
+use tokio::sync::watch;
+use tokio::time::delay_for;
+use anyhow::Result;
+use serde::{Serialize, Deserialize};
+use serde_lexpr::{from_str,to_string,error};
+use crate::messages;
+use crate::consul;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum DiplonatParameter {
+ port(Vec<u16>)
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum DiplonatConsul {
+ diplonat(Vec<DiplonatParameter>)
+}
pub struct ConsulActor {
pub rx_open_ports: watch::Receiver<messages::OpenPorts>,
- consul: Consul,
+ consul: consul::Consul,
node: String,
retries: u32,
tx_open_ports: watch::Sender<messages::OpenPorts>
@@ -22,11 +34,31 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64))
}
+fn from_catalog_to_open_ports(catalog: &consul::CatalogNode) -> messages::OpenPorts {
+ let mut op = messages::OpenPorts { ports: Vec::new() };
+ for (_, service_info) in &catalog.Services {
+ for tag in &service_info.Tags {
+ let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
+ match diplo_conf {
+ Ok(conf) => {
+ let DiplonatConsul::diplonat(c) = conf;
+ for parameter in &c {
+ let DiplonatParameter::port(p) = parameter;
+ op.ports.extend(p);
+ }
+ }
+ Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
+ };
+ }
+ }
+ return op;
+}
+
impl ConsulActor {
pub fn new(url: &str, node: &str) -> Self {
let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() });
return Self {
- consul: Consul::new(url),
+ consul: consul::Consul::new(url),
rx_open_ports: rx,
tx_open_ports: tx,
node: node.to_string(),
@@ -47,7 +79,7 @@ impl ConsulActor {
}
};
- info!("{:#?}", catalog);
+ info!("{:#?}", from_catalog_to_open_ports(&catalog));
}
}
}