aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/consul_actor.rs13
-rw-r--r--src/diplonat.rs15
-rw-r--r--src/fw.rs37
-rw-r--r--src/fw_actor.rs80
-rw-r--r--src/main.rs4
-rw-r--r--src/messages.rs12
6 files changed, 129 insertions, 32 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index 1cbb1b8..ba5d704 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -8,11 +8,12 @@ use serde::{Serialize, Deserialize};
use serde_lexpr::{from_str,error};
use crate::messages;
use crate::consul;
+use std::collections::HashSet;
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
- tcp_port(Vec<u16>),
- udp_port(Vec<u16>)
+ tcp_port(HashSet<u16>),
+ udp_port(HashSet<u16>)
}
#[derive(Serialize, Deserialize, Debug)]
@@ -53,8 +54,8 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
let mut op = messages::PublicExposedPorts {
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
};
for conf in params {
@@ -73,8 +74,8 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
impl ConsulActor {
pub fn new(url: &str, node: &str) -> Self {
let (tx, rx) = watch::channel(messages::PublicExposedPorts{
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
});
return Self {
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 1be00f7..7b7bbb8 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -4,10 +4,12 @@ use tokio::try_join;
use crate::consul_actor::ConsulActor;
use crate::igd_actor::IgdActor;
use crate::environment::Environment;
+use crate::fw_actor::FirewallActor;
pub struct Diplonat {
consul: ConsulActor,
- igd: IgdActor
+ igd: IgdActor,
+ firewall: FirewallActor
}
impl Diplonat {
@@ -21,9 +23,15 @@ impl Diplonat {
&ca.rx_open_ports
).await?;
+ let fw = FirewallActor::new(
+ env.refresh_time,
+ &ca.rx_open_ports
+ ).await?;
+
let ctx = Self {
consul: ca,
- igd: ia
+ igd: ia,
+ firewall: fw
};
return Ok(ctx);
@@ -32,7 +40,8 @@ impl Diplonat {
pub async fn listen(&mut self) -> Result<()> {
try_join!(
self.consul.listen(),
- self.igd.listen()
+ self.igd.listen(),
+ self.firewall.listen()
)?;
return Ok(());
diff --git a/src/fw.rs b/src/fw.rs
index 7650b3a..955425a 100644
--- a/src/fw.rs
+++ b/src/fw.rs
@@ -2,13 +2,7 @@ use iptables;
use regex::Regex;
use std::collections::HashSet;
use std::io;
-
-
-#[derive(PartialEq,Eq,Debug,Hash)]
-pub struct Port {
- proto: String,
- number: u16,
-}
+use crate::messages;
#[derive(Debug)]
pub struct FirewallError(String);
@@ -17,26 +11,34 @@ impl From<iptables::error::IPTError> for FirewallError {
fn from(error: iptables::error::IPTError) -> Self {
FirewallError(error.to_string())
}
-
}
pub fn setup(ipt: &iptables::IPTables) -> Result<(), FirewallError> {
+
ipt.new_chain("filter", "DIPLONAT")?;
ipt.insert("filter", "INPUT", "-j DIPLONAT", 1)?;
+
Ok(())
}
-pub fn open_ports(ipt: &iptables::IPTables, ports: Vec<Port>) -> Result<(), FirewallError> {
+pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<(), FirewallError> {
+
+ for p in ports.tcp_ports {
+ ipt.append("filter", "DIPLONAT", &format!("-p tcp --dport {} -j ACCEPT", p))?;
+ }
- for p in ports {
- ipt.append("filter", "DIPLONAT", &format!("-p {} --dport {} -j ACCEPT", p.proto, p.number))?;
+ for p in ports.udp_ports {
+ ipt.append("filter", "DIPLONAT", &format!("-p udp --dport {} -j ACCEPT", p))?;
}
Ok(())
}
-pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<HashSet<Port>, FirewallError> {
- let mut opened_ports: HashSet<Port> = HashSet::new();
+pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExposedPorts, FirewallError> {
+ let mut ports = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
+ };
let list = ipt.list("filter", "DIPLONAT")?;
let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT").unwrap();
@@ -50,13 +52,18 @@ pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<HashSet<Port>, Firew
let proto = String::from(raw_proto.as_str());
let number = String::from(raw_port.as_str()).parse::<u16>().unwrap();
- opened_ports.insert( Port { proto, number } );
+ if proto == "tcp" {
+ ports.tcp_ports.insert(number);
+ } else {
+ ports.udp_ports.insert(number);
+ }
+
},
_ => {}
}
}
- Ok(opened_ports)
+ Ok(ports)
}
pub fn cleanup(ipt: &iptables::IPTables) -> Result<(), FirewallError> {
diff --git a/src/fw_actor.rs b/src/fw_actor.rs
new file mode 100644
index 0000000..9bc6610
--- /dev/null
+++ b/src/fw_actor.rs
@@ -0,0 +1,80 @@
+use igd::aio::*;
+use igd::PortMappingProtocol;
+use std::net::SocketAddrV4;
+use log::*;
+use anyhow::{Result, Context};
+use tokio::{
+ select,
+ sync::watch,
+ time::{
+ self,
+ Duration
+}};
+
+use iptables;
+use crate::messages;
+use crate::fw;
+use std::collections::HashSet;
+
+pub struct FirewallActor {
+ ipt: iptables::IPTables,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ last_ports: messages::PublicExposedPorts,
+ refresh: Duration
+}
+
+impl FirewallActor {
+ pub async fn new(_refresh: Duration, rxp: &watch::Receiver<messages::PublicExposedPorts>) -> Result<Self> {
+
+
+ let ctx = Self {
+ ipt: iptables::new(false).unwrap(),
+ rx_ports: rxp.clone(),
+ last_ports: messages::PublicExposedPorts::new(),
+ refresh: _refresh,
+ };
+
+ fw::setup(&ctx.ipt).expect("iptables setup failed");
+
+ return Ok(ctx);
+ }
+
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ Some(ports) = self.rx_ports.recv() => Some(ports),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
+
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports { self.last_ports = p; }
+
+ // 3. Update firewall rules
+ match self.do_fw_update().await {
+ Ok(()) => debug!("Successfully updated firewall rules"),
+ Err(e) => error!("An error occured while updating firewall rules. {}", e),
+ }
+ }
+ }
+
+ pub async fn do_fw_update(&self) -> Result<()> {
+
+ let curr_opened_ports = fw::get_opened_ports(&self.ipt).unwrap();
+
+ let diff_tcp = self.last_ports.tcp_ports.difference(&curr_opened_ports.tcp_ports).copied().collect::<HashSet<u16>>();
+ let diff_udp = self.last_ports.udp_ports.difference(&curr_opened_ports.udp_ports).copied().collect::<HashSet<u16>>();
+
+ let ports_to_open = messages::PublicExposedPorts {
+ tcp_ports: diff_tcp,
+ udp_ports: diff_udp
+ };
+
+ fw::open_ports(&self.ipt, ports_to_open).unwrap();
+
+ return Ok(());
+ }
+
+}
diff --git a/src/main.rs b/src/main.rs
index bf8248d..e845017 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,6 +5,7 @@ mod consul_actor;
mod igd_actor;
mod diplonat;
mod fw;
+mod fw_actor;
use iptables;
use log::*;
@@ -15,9 +16,6 @@ async fn main() {
pretty_env_logger::init();
info!("Starting Diplonat");
- let ipt = iptables::new(false).unwrap();
- fw::setup(&ipt).expect("iptables setup failed");
-
let mut diplo = Diplonat::new().await.expect("Setup failed");
diplo.listen().await.expect("A runtime error occured");
}
diff --git a/src/messages.rs b/src/messages.rs
index 31ed48f..09a7c14 100644
--- a/src/messages.rs
+++ b/src/messages.rs
@@ -1,14 +1,16 @@
-#[derive(Debug, Clone)]
+use std::collections::HashSet;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicExposedPorts {
- pub tcp_ports: Vec<u16>,
- pub udp_ports: Vec<u16>
+ pub tcp_ports: HashSet<u16>,
+ pub udp_ports: HashSet<u16>
}
impl PublicExposedPorts {
pub fn new() -> Self {
return Self {
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
}
}
}