aboutsummaryrefslogtreecommitdiff
path: root/src/igd_actor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/igd_actor.rs')
-rw-r--r--src/igd_actor.rs54
1 files changed, 49 insertions, 5 deletions
diff --git a/src/igd_actor.rs b/src/igd_actor.rs
index 4e2f4b6..1263902 100644
--- a/src/igd_actor.rs
+++ b/src/igd_actor.rs
@@ -1,16 +1,28 @@
use igd::aio::*;
+use igd::PortMappingProtocol;
+use std::net::SocketAddrV4;
use log::*;
use anyhow::{Result, Context};
-use tokio::sync::watch;
+use tokio::{
+ select,
+ sync::watch,
+ time::{
+ self,
+ Duration
+}};
use crate::messages;
pub struct IgdActor {
+ last_ports: messages::PublicExposedPorts,
rx_ports: watch::Receiver<messages::PublicExposedPorts>,
gateway: Gateway,
+ refresh: Duration,
+ expire: Duration,
+ private_ip: String
}
impl IgdActor {
- pub async fn new(rxp: &watch::Receiver<messages::PublicExposedPorts>) -> Result<Self> {
+ pub async fn new(priv_ip: &str, refresh: Duration, expire: Duration, rxp: &watch::Receiver<messages::PublicExposedPorts>) -> Result<Self> {
let gw = search_gateway(Default::default())
.await
.context("Failed to find gateway")?;
@@ -18,16 +30,48 @@ impl IgdActor {
let ctx = Self {
gateway: gw,
- rx_ports: rxp.clone()
+ rx_ports: rxp.clone(),
+ private_ip: priv_ip.to_string(),
+ refresh: refresh,
+ expire: expire
};
return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
- while let Some(ports) = self.rx_ports.recv().await {
- println!("received = {:#?}", ports);
+ let mut interval = time::interval(self.refresh);
+ loop {
+ let res = select! {
+ msg = self.rx_ports.recv() => match msg {
+ Some(ports) => { self.last_ports = ports ; return self.do_igd().await; } ,
+ None => return Ok(()) // Sender dropped, terminate loop.
+ }
+ _ = interval.tick() => self.do_igd().await
+ };
+
+ match res {
+ Ok(()) => debug!("Successfully updated IGD"),
+ Err(e) => error!("An error occured while updating IGD. {}. {:#?}", e, self.last_ports),
+ }
+ }
+ }
+
+ pub async fn do_igd(&self) -> Result<()> {
+ let actions = [
+ (PortMappingProtocol::TCP, self.last_ports.tcp_ports),
+ (PortMappingProtocol::UDP, self.last_ports.udp_ports)
+ ];
+
+ for (proto, list) in &actions {
+ for port in list {
+ let service_str = format!("{}:{}", self.private_ip, port);
+ let service: SocketAddrV4 = service_str.parse()?.context("Invalid socket address");
+ self.gateway.add_port(*proto, *port, service, self.expire.as_secs() as u32, "diplonat").await?;
+ debug!("IGD request successful for {:#?} {}", proto, service);
+ }
}
+
return Ok(());
}
}