aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config/options.rs4
-rw-r--r--src/config/runtime.rs10
-rw-r--r--src/consul.rs8
-rw-r--r--src/consul_actor.rs16
-rw-r--r--src/diplonat.rs6
-rw-r--r--src/fw_actor.rs6
-rw-r--r--src/igd_actor.rs38
-rw-r--r--src/messages.rs2
8 files changed, 60 insertions, 30 deletions
diff --git a/src/config/options.rs b/src/config/options.rs
index b8dccf7..f62d14c 100644
--- a/src/config/options.rs
+++ b/src/config/options.rs
@@ -62,7 +62,9 @@ impl ConfigOpts {
// Currently only used in tests
#[allow(dead_code)]
pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
- where Iter: IntoIterator<Item = (String, String)> {
+ where
+ Iter: IntoIterator<Item = (String, String)>,
+ {
let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_iter(iter.clone())?;
diff --git a/src/config/runtime.rs b/src/config/runtime.rs
index eeb34f6..a1582e4 100644
--- a/src/config/runtime.rs
+++ b/src/config/runtime.rs
@@ -27,7 +27,7 @@ pub struct RuntimeConfigFirewall {
#[derive(Debug)]
pub struct RuntimeConfigIgd {
- pub private_ip: String,
+ pub private_ip: Option<String>,
pub expiration_time: Duration,
pub refresh_time: Duration,
}
@@ -59,7 +59,7 @@ impl RuntimeConfig {
impl RuntimeConfigAcme {
pub fn new(opts: ConfigOptsAcme) -> Result<Option<Self>> {
if !opts.enable {
- return Ok(None)
+ return Ok(None);
}
let email = opts.email.expect(
@@ -91,9 +91,7 @@ impl RuntimeConfigFirewall {
impl RuntimeConfigIgd {
pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
- let private_ip = opts
- .private_ip
- .expect("'DIPLONAT_PRIVATE_IP' environment variable is required");
+ let private_ip = opts.private_ip;
let expiration_time = Duration::from_secs(
opts
.expiration_time
@@ -108,7 +106,7 @@ impl RuntimeConfigIgd {
(currently: {}s)",
expiration_time.as_secs(),
refresh_time.as_secs()
- ))
+ ));
}
Ok(Self {
diff --git a/src/consul.rs b/src/consul.rs
index 4e6fe5d..4e4f79c 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -25,10 +25,12 @@ impl Consul {
client: reqwest::Client::new(),
url: url.to_string(),
idx: None,
- }
+ };
}
- pub fn watch_node_reset(&mut self) -> () { self.idx = None; }
+ pub fn watch_node_reset(&mut self) -> () {
+ self.idx = None;
+ }
pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
let url = match self.idx {
@@ -43,6 +45,6 @@ impl Consul {
};
let resp: CatalogNode = http.json().await?;
- return Ok(resp)
+ return Ok(resp);
}
}
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index b111f09..61789ca 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -4,7 +4,7 @@ use anyhow::Result;
use log::*;
use serde::{Deserialize, Serialize};
use serde_lexpr::{error, from_str};
-use tokio::{sync::watch, time::delay_for};
+use tokio::{sync::watch, time::sleep};
use crate::{consul, messages};
@@ -35,7 +35,7 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
return Duration::from_secs(cmp::min(
max_time.as_secs(),
1.2f64.powf(retries as f64) as u64,
- ))
+ ));
}
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
@@ -51,7 +51,7 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
}
}
- return r
+ return r;
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
@@ -70,7 +70,7 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
}
}
- return op
+ return op;
}
impl ConsulActor {
@@ -86,7 +86,7 @@ impl ConsulActor {
tx_open_ports: tx,
node: node.to_string(),
retries: 0,
- }
+ };
}
pub async fn listen(&mut self) -> Result<()> {
@@ -102,15 +102,15 @@ impl ConsulActor {
will_retry_in.as_secs(),
e
);
- delay_for(will_retry_in).await;
- continue
+ sleep(will_retry_in).await;
+ continue;
}
};
self.retries = 0;
let msg = to_open_ports(&to_parameters(&catalog));
debug!("Extracted configuration: {:#?}", msg);
- self.tx_open_ports.broadcast(msg)?;
+ self.tx_open_ports.send(msg)?;
}
}
}
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 496f0ab..397ba82 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -21,7 +21,7 @@ impl Diplonat {
let fw = FirewallActor::new(rt_cfg.firewall.refresh_time, &ca.rx_open_ports).await?;
let ia = IgdActor::new(
- &rt_cfg.igd.private_ip,
+ rt_cfg.igd.private_ip.as_ref().map(String::as_str),
rt_cfg.igd.refresh_time,
rt_cfg.igd.expiration_time,
&ca.rx_open_ports,
@@ -34,7 +34,7 @@ impl Diplonat {
firewall: fw,
};
- return Ok(ctx)
+ return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@@ -44,6 +44,6 @@ impl Diplonat {
self.firewall.listen()
)?;
- return Ok(())
+ return Ok(());
}
}
diff --git a/src/fw_actor.rs b/src/fw_actor.rs
index 0fa6e64..ac553b1 100644
--- a/src/fw_actor.rs
+++ b/src/fw_actor.rs
@@ -32,7 +32,7 @@ impl FirewallActor {
fw::setup(&ctx.ipt)?;
- return Ok(ctx)
+ return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@@ -40,7 +40,7 @@ impl FirewallActor {
loop {
// 1. Wait for an event
let new_ports = select! {
- Some(ports) = self.rx_ports.recv() => Some(ports),
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
_ = interval.tick() => None,
else => return Ok(()) // Sender dropped, terminate loop.
};
@@ -81,6 +81,6 @@ impl FirewallActor {
fw::open_ports(&self.ipt, ports_to_open)?;
- return Ok(())
+ return Ok(());
}
}
diff --git a/src/igd_actor.rs b/src/igd_actor.rs
index bb9099e..d32f5dc 100644
--- a/src/igd_actor.rs
+++ b/src/igd_actor.rs
@@ -22,7 +22,7 @@ pub struct IgdActor {
impl IgdActor {
pub async fn new(
- priv_ip: &str,
+ priv_ip: Option<&str>,
refresh: Duration,
expire: Duration,
rxp: &watch::Receiver<messages::PublicExposedPorts>,
@@ -32,16 +32,44 @@ impl IgdActor {
.context("Failed to find IGD gateway")?;
info!("IGD gateway: {}", gw);
+ let private_ip = if let Some(ip) = priv_ip {
+ info!("Using private IP from config: {}", ip);
+ ip.to_string()
+ } else {
+ info!("Trying to automatically detect private IP");
+ let gwa = gw.addr.ip().octets();
+ let cmplen = match gwa {
+ [192, 168, _, _] => 3,
+ [10, _, _, _] => 2,
+ _ => panic!(
+ "Gateway IP does not appear to be in a local network ({})",
+ gw.addr.ip()
+ ),
+ };
+ let public_ip = get_if_addrs::get_if_addrs()?
+ .into_iter()
+ .map(|i| i.addr.ip())
+ .filter(|a| match a {
+ std::net::IpAddr::V4(a4) => (a4.octets()[..cmplen] == gwa[..cmplen]),
+ _ => false,
+ })
+ .next()
+ .expect("No interface has an IP on same subnet as gateway")
+ .to_string();
+ info!("Found private IP: {}", public_ip);
+ public_ip
+ };
+
let ctx = Self {
gateway: gw,
rx_ports: rxp.clone(),
- private_ip: priv_ip.to_string(),
+ private_ip,
refresh: refresh,
expire: expire,
last_ports: messages::PublicExposedPorts::new(),
};
- return Ok(ctx)
+ return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@@ -49,7 +77,7 @@ impl IgdActor {
loop {
// 1. Wait for an event
let new_ports = select! {
- Some(ports) = self.rx_ports.recv() => Some(ports),
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
_ = interval.tick() => None,
else => return Ok(()) // Sender dropped, terminate loop.
};
@@ -93,6 +121,6 @@ impl IgdActor {
}
}
- return Ok(())
+ return Ok(());
}
}
diff --git a/src/messages.rs b/src/messages.rs
index b622be1..63f16b0 100644
--- a/src/messages.rs
+++ b/src/messages.rs
@@ -11,6 +11,6 @@ impl PublicExposedPorts {
return Self {
tcp_ports: HashSet::new(),
udp_ports: HashSet::new(),
- }
+ };
}
}