aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/consul_actor.rs13
-rw-r--r--src/diplonat.rs16
-rw-r--r--src/fw.rs81
-rw-r--r--src/fw_actor.rs75
-rw-r--r--src/main.rs4
-rw-r--r--src/messages.rs12
6 files changed, 185 insertions, 16 deletions
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index 1cbb1b8..ba5d704 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -8,11 +8,12 @@ use serde::{Serialize, Deserialize};
use serde_lexpr::{from_str,error};
use crate::messages;
use crate::consul;
+use std::collections::HashSet;
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
- tcp_port(Vec<u16>),
- udp_port(Vec<u16>)
+ tcp_port(HashSet<u16>),
+ udp_port(HashSet<u16>)
}
#[derive(Serialize, Deserialize, Debug)]
@@ -53,8 +54,8 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
let mut op = messages::PublicExposedPorts {
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
};
for conf in params {
@@ -73,8 +74,8 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
impl ConsulActor {
pub fn new(url: &str, node: &str) -> Self {
let (tx, rx) = watch::channel(messages::PublicExposedPorts{
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
});
return Self {
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 1be00f7..798b779 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -1,13 +1,14 @@
use anyhow::Result;
-use log::*;
use tokio::try_join;
use crate::consul_actor::ConsulActor;
use crate::igd_actor::IgdActor;
use crate::environment::Environment;
+use crate::fw_actor::FirewallActor;
pub struct Diplonat {
consul: ConsulActor,
- igd: IgdActor
+ igd: IgdActor,
+ firewall: FirewallActor
}
impl Diplonat {
@@ -21,9 +22,15 @@ impl Diplonat {
&ca.rx_open_ports
).await?;
+ let fw = FirewallActor::new(
+ env.refresh_time,
+ &ca.rx_open_ports
+ ).await?;
+
let ctx = Self {
consul: ca,
- igd: ia
+ igd: ia,
+ firewall: fw
};
return Ok(ctx);
@@ -32,7 +39,8 @@ impl Diplonat {
pub async fn listen(&mut self) -> Result<()> {
try_join!(
self.consul.listen(),
- self.igd.listen()
+ self.igd.listen(),
+ self.firewall.listen()
)?;
return Ok(());
diff --git a/src/fw.rs b/src/fw.rs
new file mode 100644
index 0000000..bc4d119
--- /dev/null
+++ b/src/fw.rs
@@ -0,0 +1,81 @@
+use iptables;
+use regex::Regex;
+use std::collections::HashSet;
+use crate::messages;
+use anyhow::{Result,Context};
+use log::*;
+
+pub fn setup(ipt: &iptables::IPTables) -> Result<()> {
+
+ // ensure we start from a clean state without any rule already set
+ cleanup(ipt)?;
+
+ ipt.new_chain("filter", "DIPLONAT").context("Failed to create new chain")?;
+ ipt.insert_unique("filter", "INPUT", "-j DIPLONAT", 1).context("Failed to insert jump rule")?;
+
+ Ok(())
+}
+
+pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<()> {
+ for p in ports.tcp_ports {
+ ipt.append("filter", "DIPLONAT", &format!("-p tcp --dport {} -j ACCEPT", p)).context("Failed to insert port rule")?;
+ }
+
+ for p in ports.udp_ports {
+ ipt.append("filter", "DIPLONAT", &format!("-p udp --dport {} -j ACCEPT", p)).context("Failed to insert port rule")?;
+ }
+
+ Ok(())
+}
+
+pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExposedPorts> {
+ let mut ports = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
+ };
+
+ let list = ipt.list("filter", "DIPLONAT")?;
+ let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT").context("Regex matching open ports encountered an unexpected rule")?;
+ for i in list {
+ let caps = re.captures(&i);
+ match caps {
+ Some(c) => {
+
+ if let (Some(raw_proto), Some(raw_port)) = (c.get(1), c.get(2)) {
+
+ let proto = String::from(raw_proto.as_str());
+ let number = String::from(raw_port.as_str()).parse::<u16>()?;
+
+ if proto == "tcp" {
+ ports.tcp_ports.insert(number);
+ } else {
+ ports.udp_ports.insert(number);
+ }
+
+ } else {
+ error!("Unexpected rule found in DIPLONAT chain")
+ }
+
+ },
+ _ => {}
+ }
+ }
+
+ Ok(ports)
+}
+
+pub fn cleanup(ipt: &iptables::IPTables) -> Result<()> {
+
+ if ipt.chain_exists("filter", "DIPLONAT")? {
+ ipt.flush_chain("filter", "DIPLONAT").context("Failed to flush the DIPLONAT chain")?;
+
+ if ipt.exists("filter", "INPUT", "-j DIPLONAT")? {
+ ipt.delete("filter", "INPUT", "-j DIPLONAT").context("Failed to delete jump rule")?;
+ }
+
+ ipt.delete_chain("filter", "DIPLONAT").context("Failed to delete chain")?;
+ }
+
+ Ok(())
+}
+
diff --git a/src/fw_actor.rs b/src/fw_actor.rs
new file mode 100644
index 0000000..b5e4c7e
--- /dev/null
+++ b/src/fw_actor.rs
@@ -0,0 +1,75 @@
+use anyhow::Result;
+use tokio::{
+ select,
+ sync::watch,
+ time::{
+ self,
+ Duration
+}};
+use log::*;
+
+use iptables;
+use crate::messages;
+use crate::fw;
+use std::collections::HashSet;
+
+pub struct FirewallActor {
+ pub ipt: iptables::IPTables,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ last_ports: messages::PublicExposedPorts,
+ refresh: Duration
+}
+
+impl FirewallActor {
+ pub async fn new(_refresh: Duration, rxp: &watch::Receiver<messages::PublicExposedPorts>) -> Result<Self> {
+ let ctx = Self {
+ ipt: iptables::new(false)?,
+ rx_ports: rxp.clone(),
+ last_ports: messages::PublicExposedPorts::new(),
+ refresh: _refresh,
+ };
+
+ fw::setup(&ctx.ipt)?;
+
+ return Ok(ctx);
+ }
+
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ Some(ports) = self.rx_ports.recv() => Some(ports),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
+
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports { self.last_ports = p; }
+
+ // 3. Update firewall rules
+ match self.do_fw_update().await {
+ Ok(()) => debug!("Successfully updated firewall rules"),
+ Err(e) => error!("An error occured while updating firewall rules. {}", e),
+ }
+ }
+ }
+
+ pub async fn do_fw_update(&self) -> Result<()> {
+ let curr_opened_ports = fw::get_opened_ports(&self.ipt)?;
+
+ let diff_tcp = self.last_ports.tcp_ports.difference(&curr_opened_ports.tcp_ports).copied().collect::<HashSet<u16>>();
+ let diff_udp = self.last_ports.udp_ports.difference(&curr_opened_ports.udp_ports).copied().collect::<HashSet<u16>>();
+
+ let ports_to_open = messages::PublicExposedPorts {
+ tcp_ports: diff_tcp,
+ udp_ports: diff_udp
+ };
+
+ fw::open_ports(&self.ipt, ports_to_open)?;
+
+ return Ok(());
+ }
+
+}
+
diff --git a/src/main.rs b/src/main.rs
index a35916a..ca36c26 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,6 +4,8 @@ mod consul;
mod consul_actor;
mod igd_actor;
mod diplonat;
+mod fw;
+mod fw_actor;
use log::*;
use diplonat::Diplonat;
@@ -12,7 +14,7 @@ use diplonat::Diplonat;
async fn main() {
pretty_env_logger::init();
info!("Starting Diplonat");
-
+
let mut diplo = Diplonat::new().await.expect("Setup failed");
diplo.listen().await.expect("A runtime error occured");
}
diff --git a/src/messages.rs b/src/messages.rs
index 31ed48f..09a7c14 100644
--- a/src/messages.rs
+++ b/src/messages.rs
@@ -1,14 +1,16 @@
-#[derive(Debug, Clone)]
+use std::collections::HashSet;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicExposedPorts {
- pub tcp_ports: Vec<u16>,
- pub udp_ports: Vec<u16>
+ pub tcp_ports: HashSet<u16>,
+ pub udp_ports: HashSet<u16>
}
impl PublicExposedPorts {
pub fn new() -> Self {
return Self {
- tcp_ports: Vec::new(),
- udp_ports: Vec::new()
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new()
}
}
}