aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/consul_actor.rs7
-rw-r--r--src/diplonat.rs46
-rw-r--r--src/environment.rs56
-rw-r--r--src/environment_adapter.rs50
-rw-r--r--src/igd_actor.rs (renamed from src/igd_adapter.rs)0
-rw-r--r--src/main.rs14
-rw-r--r--src/node_state.rs25
7 files changed, 82 insertions, 116 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index de5e216..877ea10 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -5,7 +5,7 @@ use tokio::sync::watch;
use tokio::time::delay_for;
use anyhow::Result;
use serde::{Serialize, Deserialize};
-use serde_lexpr::{from_str,to_string,error};
+use serde_lexpr::{from_str,error};
use crate::messages;
use crate::consul;
@@ -100,7 +100,10 @@ impl ConsulActor {
}
};
self.retries = 0;
- info!("{:#?}", to_open_ports(&to_parameters(&catalog)));
+ let msg = to_open_ports(&to_parameters(&catalog));
+ debug!("Extracted configuration: {:#?}", msg);
+
+ self.tx_open_ports.broadcast(msg)?;
}
}
}
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 01bd994..a4cb787 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -1,45 +1,29 @@
-use anyhow::{Result, Context};
-use tokio::sync::broadcast;
+use anyhow::Result;
use futures::future::try_join_all;
use log::*;
-use std::cell::Cell;
+use crate::consul_actor::ConsulActor;
+use crate::environment::Environment;
-use crate::environment_adapter::*;
-use crate::igd_adapter::*;
-use crate::node_state::*;
-
-pub struct Diplonat<'a> {
- pub notif: broadcast::Sender<()>,
- pub state: Cell<NodeState>,
-
- env: EnvironmentAdapter,
- igd: IgdAdapter<'a>,
+pub struct Diplonat {
+ consul: ConsulActor
}
-impl<'a> Diplonat<'a> {
- pub async fn new() -> Result<Diplonat<'a>> {
- let (tx, _) = broadcast::channel(1);
- let ns = Cell::new(NodeState::new());
-
- // we deliberately choose to init one after another
- let ctx = Diplonat {
- notif: tx,
- state: ns,
+impl Diplonat {
+ pub async fn new() -> Result<Self> {
+ let env = Environment::new()?;
- env: EnvironmentAdapter::new(&ns, &tx).await?,
- igd: IgdAdapter::new(&ns, &tx).await?
+ let ctx = Self {
+ consul: ConsulActor::new(&env.consul_url, &env.consul_node_name)
};
- info!("Consul URL: {:#?}", ns.consul_url);
- info!("Consul node name: {:#?}", ns.consul_node_name);
- info!("Private IP address: {:#?}", ns.private_ip);
- info!("Refresh time: {:#?} seconds", ns.refresh_time);
- info!("Expiration time: {:#?} seconds", ns.expiration_time);
-
return Ok(ctx);
}
- pub async fn listen(&self) -> Result<()> {
+ pub async fn listen(&mut self) -> Result<()> {
+ try_join_all(vec![
+ self.consul.listen()
+ ]).await?;
+
return Ok(());
}
}
diff --git a/src/environment.rs b/src/environment.rs
new file mode 100644
index 0000000..335fa37
--- /dev/null
+++ b/src/environment.rs
@@ -0,0 +1,56 @@
+use std::env;
+use anyhow::{Result, Context, anyhow};
+use std::time::Duration;
+use log::*;
+
+const epi: &'static str = "DIPLONAT_PRIVATE_IP";
+const ert: &'static str = "DIPLONAT_REFRESH_TIME";
+const eet: &'static str = "DIPLONAT_EXPIRATION_TIME";
+const ecnd: &'static str = "DIPLONAT_CONSUL_NODE_NAME";
+const ecu: &'static str = "DIPLONAT_CONSUL_URL";
+
+pub struct Environment {
+ pub consul_node_name: String,
+ pub consul_url: String,
+
+ pub refresh_time: Duration,
+ pub expiration_time: Duration,
+
+ pub private_ip: String,
+}
+
+/* @FIXME: Rewrite with Serde Envi */
+impl Environment {
+ pub fn new() -> Result<Self> {
+ let ctx = Self {
+ consul_url: match env::var(ecu) { Ok(e) => e, Err(_) => "http://127.0.0.1:8500".to_string() },
+ consul_node_name: env::var(ecnd).with_context(|| format!("{} env var must be defined", ecnd))?,
+ private_ip: env::var(epi).with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?,
+ refresh_time: Duration::from_secs(env::var(ert)
+ .with_context(|| format!("{} env var must be defined, eg: 60", ert))?
+ .parse()
+ .with_context(|| format!("{} env var must be an integer, eg: 60", ert))?),
+ expiration_time: Duration::from_secs(env::var(eet)
+ .with_context(|| format!("{} env var must be defined, eg: 300", eet))?
+ .parse()
+ .with_context(|| format!("{} env var must be an integer, eg: 300", eet))?),
+ };
+
+ if ctx.refresh_time.as_secs() * 2 > ctx.expiration_time.as_secs() {
+ return Err(anyhow!(
+ "Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)",
+ ctx.refresh_time.as_secs(),
+ ctx.expiration_time.as_secs()));
+ }
+
+ info!("Consul URL: {:#?}", ctx.consul_url);
+ info!("Consul node name: {:#?}", ctx.consul_node_name);
+ info!("Private IP address: {:#?}", ctx.private_ip);
+ info!("Refresh time: {:#?} seconds", ctx.refresh_time.as_secs());
+ info!("Expiration time: {:#?} seconds", ctx.expiration_time.as_secs());
+
+ return Ok(ctx);
+ }
+}
+
+
diff --git a/src/environment_adapter.rs b/src/environment_adapter.rs
deleted file mode 100644
index e4fad70..0000000
--- a/src/environment_adapter.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::env;
-use tokio::sync::broadcast;
-use anyhow::{Result, Context, anyhow};
-use log::*;
-use crate::diplonat::*;
-use crate::node_state::*;
-use std::cell::Cell;
-
-const epi: &'static str = "DIPLONAT_PRIVATE_IP";
-const ert: &'static str = "DIPLONAT_REFRESH_TIME";
-const eet: &'static str = "DIPLONAT_EXPIRATION_TIME";
-const ecnd: &'static str = "DIPLONAT_CONSUL_NODE_NAME";
-const ecu: &'static str = "DIPLONAT_CONSUL_URL";
-
-pub struct EnvironmentAdapter {}
-
-impl EnvironmentAdapter {
- pub async fn new(ns: &Cell<NodeState>, _: &broadcast::Sender<()>) -> Result<Self> {
- ns.consul_node_name = Some(match env::var(ecu) {
- Ok(e) => e,
- Err(_) => "http://127.0.0.1:8500".to_string()
- });
-
- ns.private_ip = Some(env::var(epi)
- .with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?);
-
- ns.refresh_time = Some(env::var(ert)
- .with_context(|| format!("{} env var must be defined, eg: 60", ert))?
- .parse()
- .with_context(|| format!("{} env var must be an integer, eg: 60", ert))?);
-
- ns.expiration_time = Some(env::var(eet)
- .with_context(|| format!("{} env var must be defined, eg: 300", eet))?
- .parse()
- .with_context(|| format!("{} env var must be an integer, eg: 300", eet))?);
-
- ns.consul_node_name = Some(env::var(ecnd)
- .with_context(|| format!("{} env var must be defined", ecnd))?);
-
- match (ns.refresh_time, ns.expiration_time) {
- (Some(rt), Some(et)) if rt * 2 <= et => debug!("Checked refresh time is lower than expiration time"),
- (Some(rt), Some(et)) => return Err(anyhow!("Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)", rt, et)),
- _ => return Err(anyhow!("Please define refresh time and expiration time"))
- }
-
- return Ok(Self{});
- }
-}
-
-
diff --git a/src/igd_adapter.rs b/src/igd_actor.rs
index 6624ab3..6624ab3 100644
--- a/src/igd_adapter.rs
+++ b/src/igd_actor.rs
diff --git a/src/main.rs b/src/main.rs
index fe4c767..7f985b1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,24 +1,22 @@
mod messages;
+mod environment;
mod consul;
mod consul_actor;
+mod diplonat;
//use std::net::SocketAddrV4;
//use std::collections::HashMap;
//use igd::PortMappingProtocol;
use log::*;
-use consul_actor::*;
+use diplonat::Diplonat;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
info!("Starting Diplonat");
-/*
- let diplo = Diplonat::new().await.expect("Setup failed");
- diplo.listen().await.expect("A runtime error occured");
-*/
-
- let mut ca = ConsulActor::new("http://127.0.0.1:8500", "lheureduthe");
- ca.listen().await.expect("Oooooops");
+
+ let mut diplo = Diplonat::new().await.expect("Setup failed");
+ diplo.listen().await.expect("A runtime error occured");
/*
let gateway = match search_gateway(Default::default()).await {
diff --git a/src/node_state.rs b/src/node_state.rs
deleted file mode 100644
index ecf7484..0000000
--- a/src/node_state.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-pub struct NodeState {
- pub consul_node_name: Option<String>,
- pub consul_url: Option<String>,
-
- pub refresh_time: Option<u32>,
- pub expiration_time: Option<u32>,
-
- pub public_ip: Option<String>,
- pub private_ip: Option<String>,
- pub public_ports: Vec<u16>,
-}
-
-impl NodeState {
- pub fn new() -> Self {
- return Self {
- consul_node_name: None,
- consul_url: None,
- refresh_time: None,
- expiration_time: None,
- public_ip: None,
- private_ip: None,
- public_ports: Vec::new()
- };
- }
-}