aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config/mod.rs5
-rw-r--r--src/config/options.rs106
-rw-r--r--src/config/options_test.rs171
-rw-r--r--src/config/runtime.rs246
-rw-r--r--src/consul.rs117
-rw-r--r--src/consul_actor.rs157
-rw-r--r--src/diplonat.rs107
-rw-r--r--src/fw.rs148
-rw-r--r--src/fw_actor.rs142
-rw-r--r--src/igd_actor.rs200
-rw-r--r--src/main.rs13
-rw-r--r--src/messages.rs16
-rw-r--r--src/stun_actor.rs147
13 files changed, 903 insertions, 672 deletions
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 2bf8f66..9281f44 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -3,11 +3,12 @@ mod options;
mod options_test;
mod runtime;
-pub use options::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul};
+pub use options::{ConfigOpts, ConfigOptsBase, ConfigOptsConsul};
pub use runtime::{
- RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd,
+ RuntimeConfig, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd, RuntimeConfigStun,
};
pub const EXPIRATION_TIME: u16 = 300;
pub const REFRESH_TIME: u16 = 60;
pub const CONSUL_URL: &str = "http://127.0.0.1:8500";
+pub const STUN_SERVER: &str = "stun.nextcloud.com:443";
diff --git a/src/config/options.rs b/src/config/options.rs
index 793838a..c31484f 100644
--- a/src/config/options.rs
+++ b/src/config/options.rs
@@ -11,77 +11,67 @@ use crate::config::RuntimeConfig;
/// Base configuration options
#[derive(Clone, Default, Deserialize)]
pub struct ConfigOptsBase {
- /// This node's private IP address [default: None]
- pub private_ip: Option<String>,
- /// Expiration time for IGD rules [default: 60]
- pub expiration_time: Option<u16>,
- /// Refresh time for IGD and Firewall rules [default: 300]
- pub refresh_time: Option<u16>,
-}
-
-/// ACME configuration options
-#[derive(Clone, Default, Deserialize)]
-pub struct ConfigOptsAcme {
- /// Whether ACME is enabled [default: false]
- #[serde(default)]
- pub enable: bool,
-
- /// The default domain holder's e-mail [default: None]
- pub email: Option<String>,
+ /// This node's private IP address [default: None]
+ pub private_ip: Option<String>,
+ /// Expiration time for IGD rules [default: 60]
+ pub expiration_time: Option<u16>,
+ /// Refresh time for IGD and Firewall rules [default: 300]
+ pub refresh_time: Option<u16>,
+ /// STUN server [default: stun.nextcloud.com:443]
+ pub stun_server: Option<String>,
+ /// IPv6-only mode (disables IGD, IPv4 firewall and IPv4 address autodiscovery) [default: false]
+ #[serde(default)]
+ pub ipv6_only: bool,
}
/// Consul configuration options
#[derive(Clone, Default, Deserialize)]
pub struct ConfigOptsConsul {
- /// Consul's node name [default: None]
- pub node_name: Option<String>,
- /// Consul's REST URL [default: "http://127.0.0.1:8500"]
- pub url: Option<String>,
- /// Consul's CA certificate [default: None]
- pub ca_cert: Option<String>,
- /// Skip TLS verification for Consul server [default: false]
- #[serde(default)]
- pub tls_skip_verify: bool,
- /// Consul's client certificate [default: None]
- pub client_cert: Option<String>,
- /// Consul's client key [default: None]
- pub client_key: Option<String>,
+ /// Consul's node name [default: None]
+ pub node_name: Option<String>,
+ /// Consul's REST URL [default: "http://127.0.0.1:8500"]
+ pub url: Option<String>,
+ /// Consul's CA certificate [default: None]
+ pub ca_cert: Option<String>,
+ /// Skip TLS verification for Consul server [default: false]
+ #[serde(default)]
+ pub tls_skip_verify: bool,
+ /// Consul's client certificate [default: None]
+ pub client_cert: Option<String>,
+ /// Consul's client key [default: None]
+ pub client_key: Option<String>,
}
/// Model of all potential configuration options
pub struct ConfigOpts {
- pub base: ConfigOptsBase,
- pub acme: ConfigOptsAcme,
- pub consul: ConfigOptsConsul,
+ pub base: ConfigOptsBase,
+ pub consul: ConfigOptsConsul,
}
impl ConfigOpts {
- pub fn from_env() -> Result<RuntimeConfig> {
- let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_env()?;
- let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_env()?;
- let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_env()?;
+ pub fn from_env() -> Result<RuntimeConfig> {
+ let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_env()?;
+ let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_env()?;
- RuntimeConfig::new(Self {
- base: base,
- consul: consul,
- acme: acme,
- })
- }
+ RuntimeConfig::new(Self {
+ base: base,
+ consul: consul,
+ })
+ }
- // Currently only used in tests
- #[allow(dead_code)]
- pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
- where
- Iter: IntoIterator<Item = (String, String)>,
- {
- let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
- let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
- let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_iter(iter.clone())?;
+ // Currently only used in tests
+ #[cfg(test)]
+ pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
+ where
+ Iter: IntoIterator<Item = (String, String)>,
+ {
+ let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
+ let consul: ConfigOptsConsul =
+ envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
- RuntimeConfig::new(Self {
- base: base,
- consul: consul,
- acme: acme,
- })
- }
+ RuntimeConfig::new(Self {
+ base: base,
+ consul: consul,
+ })
+ }
}
diff --git a/src/config/options_test.rs b/src/config/options_test.rs
index 6b91235..a028a4e 100644
--- a/src/config/options_test.rs
+++ b/src/config/options_test.rs
@@ -9,116 +9,111 @@ use crate::config::*;
// This is why we only test ConfigOpts::from_iter(iter).
fn minimal_valid_options() -> HashMap<String, String> {
- let mut opts = HashMap::new();
- opts.insert(
- "DIPLONAT_CONSUL_NODE_NAME".to_string(),
- "consul_node".to_string(),
- );
- opts
+ let mut opts = HashMap::new();
+ opts.insert(
+ "DIPLONAT_CONSUL_NODE_NAME".to_string(),
+ "consul_node".to_string(),
+ );
+ opts
}
fn all_valid_options() -> HashMap<String, String> {
- let mut opts = minimal_valid_options();
- opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
- opts.insert(
- "DIPLONAT_PRIVATE_IP".to_string(),
- "172.123.43.555".to_string(),
- );
- opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "10".to_string());
- opts.insert(
- "DIPLONAT_CONSUL_URL".to_string(),
- "http://127.0.0.1:9999".to_string(),
- );
- opts.insert("DIPLONAT_ACME_ENABLE".to_string(), "true".to_string());
- opts.insert(
- "DIPLONAT_ACME_EMAIL".to_string(),
- "bozo@bozo.net".to_string(),
- );
- opts
+ let mut opts = minimal_valid_options();
+ opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
+ opts.insert(
+ "DIPLONAT_STUN_SERVER".to_string(),
+ "stun.nextcloud.com:443".to_string(),
+ );
+ opts.insert(
+ "DIPLONAT_PRIVATE_IP".to_string(),
+ "172.123.43.55".to_string(),
+ );
+ opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "10".to_string());
+ opts.insert(
+ "DIPLONAT_CONSUL_URL".to_string(),
+ "http://127.0.0.1:9999".to_string(),
+ );
+ opts.insert("DIPLONAT_ACME_ENABLE".to_string(), "true".to_string());
+ opts.insert(
+ "DIPLONAT_ACME_EMAIL".to_string(),
+ "bozo@bozo.net".to_string(),
+ );
+ opts
}
#[test]
#[should_panic]
fn err_empty_env() {
- std::env::remove_var("DIPLONAT_CONSUL_NODE_NAME");
- ConfigOpts::from_env().unwrap();
+ std::env::remove_var("DIPLONAT_CONSUL_NODE_NAME");
+ ConfigOpts::from_env().unwrap();
}
#[test]
fn ok_from_iter_minimal_valid_options() {
- let opts = minimal_valid_options();
- let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
+ let opts = minimal_valid_options();
+ let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
- assert!(rt_config.acme.is_none());
- assert_eq!(
- &rt_config.consul.node_name,
- opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
- );
- assert_eq!(rt_config.consul.url, CONSUL_URL.to_string());
- assert_eq!(
- rt_config.firewall.refresh_time,
- Duration::from_secs(REFRESH_TIME.into())
- );
- assert!(rt_config.igd.private_ip.is_none());
- assert_eq!(
- rt_config.igd.expiration_time,
- Duration::from_secs(EXPIRATION_TIME.into())
- );
- assert_eq!(
- rt_config.igd.refresh_time,
- Duration::from_secs(REFRESH_TIME.into())
- );
+ assert_eq!(
+ &rt_config.consul.node_name,
+ opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
+ );
+ assert_eq!(rt_config.consul.url, CONSUL_URL.to_string());
+ assert_eq!(
+ rt_config.firewall.refresh_time,
+ Duration::from_secs(REFRESH_TIME.into())
+ );
+ let igd = rt_config.igd.unwrap();
+ assert!(igd.private_ip.is_none());
+ assert_eq!(
+ igd.expiration_time,
+ Duration::from_secs(EXPIRATION_TIME.into())
+ );
+ assert_eq!(igd.refresh_time, Duration::from_secs(REFRESH_TIME.into()));
}
#[test]
#[should_panic]
fn err_from_iter_invalid_refresh_time() {
- let mut opts = minimal_valid_options();
- opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "60".to_string());
- opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "60".to_string());
- ConfigOpts::from_iter(opts).unwrap();
+ let mut opts = minimal_valid_options();
+ opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "60".to_string());
+ opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "60".to_string());
+ ConfigOpts::from_iter(opts).unwrap();
}
#[test]
fn ok_from_iter_all_valid_options() {
- let opts = all_valid_options();
- let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
+ let opts = all_valid_options();
+ let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
- let expiration_time = Duration::from_secs(
- opts
- .get(&"DIPLONAT_EXPIRATION_TIME".to_string())
- .unwrap()
- .parse::<u64>()
- .unwrap()
- .into(),
- );
- let refresh_time = Duration::from_secs(
- opts
- .get(&"DIPLONAT_REFRESH_TIME".to_string())
- .unwrap()
- .parse::<u64>()
- .unwrap()
- .into(),
- );
+ let expiration_time = Duration::from_secs(
+ opts.get(&"DIPLONAT_EXPIRATION_TIME".to_string())
+ .unwrap()
+ .parse::<u64>()
+ .unwrap()
+ .into(),
+ );
+ let refresh_time = Duration::from_secs(
+ opts.get(&"DIPLONAT_REFRESH_TIME".to_string())
+ .unwrap()
+ .parse::<u64>()
+ .unwrap()
+ .into(),
+ );
- assert!(rt_config.acme.is_some());
- assert_eq!(
- &rt_config.acme.unwrap().email,
- opts.get(&"DIPLONAT_ACME_EMAIL".to_string()).unwrap()
- );
- assert_eq!(
- &rt_config.consul.node_name,
- opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
- );
- assert_eq!(
- &rt_config.consul.url,
- opts.get(&"DIPLONAT_CONSUL_URL".to_string()).unwrap()
- );
- assert_eq!(rt_config.firewall.refresh_time, refresh_time);
- assert_eq!(
- &rt_config.igd.private_ip.unwrap(),
- opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
- );
- assert_eq!(rt_config.igd.expiration_time, expiration_time);
- assert_eq!(rt_config.igd.refresh_time, refresh_time);
+ assert_eq!(
+ &rt_config.consul.node_name,
+ opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
+ );
+ assert_eq!(
+ &rt_config.consul.url,
+ opts.get(&"DIPLONAT_CONSUL_URL".to_string()).unwrap()
+ );
+ assert_eq!(rt_config.firewall.refresh_time, refresh_time);
+ let igd = rt_config.igd.unwrap();
+ assert_eq!(
+ &igd.private_ip.unwrap().to_string(),
+ opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
+ );
+ assert_eq!(igd.expiration_time, expiration_time);
+ assert_eq!(igd.refresh_time, refresh_time);
}
diff --git a/src/config/runtime.rs b/src/config/runtime.rs
index 2e7b573..8084439 100644
--- a/src/config/runtime.rs
+++ b/src/config/runtime.rs
@@ -1,10 +1,11 @@
use std::fs::File;
use std::io::Read;
+use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::time::Duration;
-use anyhow::{anyhow, bail, Result};
+use anyhow::{anyhow, bail, Context, Result};
-use crate::config::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul};
+use crate::config::{ConfigOpts, ConfigOptsBase, ConfigOptsConsul};
// This code is inspired by the Trunk crate (https://github.com/thedodd/trunk)
@@ -12,140 +13,179 @@ use crate::config::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul
// RuntimeConfig. We apply default values and business logic.
#[derive(Debug)]
-pub struct RuntimeConfigAcme {
- pub email: String,
-}
-
-#[derive(Debug)]
pub struct RuntimeConfigConsul {
- pub node_name: String,
- pub url: String,
- pub tls: Option<(Option<reqwest::Certificate>, bool, reqwest::Identity)>,
+ pub node_name: String,
+ pub url: String,
+ pub tls: Option<(Option<reqwest::Certificate>, bool, reqwest::Identity)>,
}
#[derive(Debug)]
pub struct RuntimeConfigFirewall {
- pub refresh_time: Duration,
+ pub ipv6_only: bool,
+ pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfigIgd {
- pub private_ip: Option<String>,
- pub expiration_time: Duration,
- pub refresh_time: Duration,
+ pub private_ip: Option<Ipv4Addr>,
+ pub expiration_time: Duration,
+ pub refresh_time: Duration,
}
#[derive(Debug)]
-pub struct RuntimeConfig {
- pub acme: Option<RuntimeConfigAcme>,
- pub consul: RuntimeConfigConsul,
- pub firewall: RuntimeConfigFirewall,
- pub igd: RuntimeConfigIgd,
+pub struct RuntimeConfigStun {
+ pub stun_server_v4: Option<SocketAddr>,
+ pub stun_server_v6: SocketAddr,
+ pub refresh_time: Duration,
}
-impl RuntimeConfig {
- pub fn new(opts: ConfigOpts) -> Result<Self> {
- let acme = RuntimeConfigAcme::new(opts.acme.clone())?;
- let consul = RuntimeConfigConsul::new(opts.consul.clone())?;
- let firewall = RuntimeConfigFirewall::new(opts.base.clone())?;
- let igd = RuntimeConfigIgd::new(opts.base.clone())?;
-
- Ok(Self {
- acme,
- consul,
- firewall,
- igd,
- })
- }
+#[derive(Debug)]
+pub struct RuntimeConfig {
+ pub consul: RuntimeConfigConsul,
+ pub firewall: RuntimeConfigFirewall,
+ pub igd: Option<RuntimeConfigIgd>,
+ pub stun: RuntimeConfigStun,
}
-impl RuntimeConfigAcme {
- pub fn new(opts: ConfigOptsAcme) -> Result<Option<Self>> {
- if !opts.enable {
- return Ok(None);
+impl RuntimeConfig {
+ pub fn new(opts: ConfigOpts) -> Result<Self> {
+ let consul = RuntimeConfigConsul::new(opts.consul)?;
+ let firewall = RuntimeConfigFirewall::new(&opts.base)?;
+ let igd = match opts.base.ipv6_only {
+ false => Some(RuntimeConfigIgd::new(&opts.base)?),
+ true => None,
+ };
+ let stun = RuntimeConfigStun::new(&opts.base)?;
+
+ Ok(Self {
+ consul,
+ firewall,
+ igd,
+ stun,
+ })
}
-
- let email = opts.email.expect(
- "'DIPLONAT_ACME_EMAIL' environment variable is required if 'DIPLONAT_ACME_ENABLE' == 'true'",
- );
-
- Ok(Some(Self { email }))
- }
}
impl RuntimeConfigConsul {
- pub(super) fn new(opts: ConfigOptsConsul) -> Result<Self> {
- let node_name = opts
- .node_name
- .expect("'DIPLONAT_CONSUL_NODE_NAME' environment variable is required");
- let url = opts.url.unwrap_or(super::CONSUL_URL.to_string());
-
- let tls = match (&opts.client_cert, &opts.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let cert = match &opts.ca_cert {
- Some(ca_cert) => {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
- Some(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- }
- None => None,
+ pub(super) fn new(opts: ConfigOptsConsul) -> Result<Self> {
+ let node_name = opts
+ .node_name
+ .expect("'DIPLONAT_CONSUL_NODE_NAME' environment variable is required");
+ let url = opts.url.unwrap_or(super::CONSUL_URL.to_string());
+
+ let tls = match (&opts.client_cert, &opts.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let cert = match &opts.ca_cert {
+ Some(ca_cert) => {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+ Some(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ }
+ None => None,
+ };
+
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let ident = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ Some((cert, opts.tls_skip_verify, ident))
+ }
+ (None, None) => None,
+ _ => bail!("Incomplete TLS configuration parameters"),
};
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let ident =
- reqwest::Identity::from_pem(&[&client_cert_buf[..], &client_key_buf[..]].concat()[..])?;
-
- Some((cert, opts.tls_skip_verify, ident))
- }
- (None, None) => None,
- _ => bail!("Incomplete TLS configuration parameters"),
- };
-
- Ok(Self {
- node_name,
- url,
- tls,
- })
- }
+ Ok(Self {
+ node_name,
+ url,
+ tls,
+ })
+ }
}
impl RuntimeConfigFirewall {
- pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
- let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
-
- Ok(Self { refresh_time })
- }
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+
+ Ok(Self {
+ refresh_time,
+ ipv6_only: opts.ipv6_only,
+ })
+ }
}
impl RuntimeConfigIgd {
- pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
- let private_ip = opts.private_ip;
- let expiration_time = Duration::from_secs(
- opts
- .expiration_time
- .unwrap_or(super::EXPIRATION_TIME)
- .into(),
- );
- let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
-
- if refresh_time.as_secs() * 2 > expiration_time.as_secs() {
- return Err(anyhow!(
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let private_ip = opts
+ .private_ip
+ .as_ref()
+ .map(|x| x.parse())
+ .transpose()
+ .context("parse private_ip")?;
+ let expiration_time = Duration::from_secs(
+ opts.expiration_time
+ .unwrap_or(super::EXPIRATION_TIME)
+ .into(),
+ );
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+
+ if refresh_time.as_secs() * 2 > expiration_time.as_secs() {
+ return Err(anyhow!(
"IGD expiration time (currently: {}s) must be at least twice bigger than refresh time \
(currently: {}s)",
expiration_time.as_secs(),
refresh_time.as_secs()
));
+ }
+
+ Ok(Self {
+ private_ip,
+ expiration_time,
+ refresh_time,
+ })
}
+}
- Ok(Self {
- private_ip,
- expiration_time,
- refresh_time,
- })
- }
+impl RuntimeConfigStun {
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let mut stun_server_v4 = None;
+ let mut stun_server_v6 = None;
+ for addr in opts
+ .stun_server
+ .as_deref()
+ .unwrap_or(super::STUN_SERVER)
+ .to_socket_addrs()?
+ {
+ if addr.is_ipv4() {
+ stun_server_v4 = Some(addr);
+ }
+ if addr.is_ipv6() {
+ stun_server_v6 = Some(addr);
+ }
+ }
+
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+
+ let stun_server_v4 = match opts.ipv6_only {
+ false => Some(
+ stun_server_v4.ok_or(anyhow!("Unable to resolve STUN server's IPv4 address"))?,
+ ),
+ true => None,
+ };
+
+ Ok(Self {
+ stun_server_v4,
+ stun_server_v6: stun_server_v6
+ .ok_or(anyhow!("Unable to resolve STUN server's IPv6 address"))?,
+ refresh_time,
+ })
+ }
}
diff --git a/src/consul.rs b/src/consul.rs
index c7ac2b6..7f695b2 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -7,71 +7,80 @@ use crate::config::RuntimeConfigConsul;
#[derive(Serialize, Deserialize, Debug)]
pub struct ServiceEntry {
- pub Tags: Vec<String>,
+ #[serde(rename = "Tags")]
+ pub tags: Vec<String>,
}
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Default)]
pub struct CatalogNode {
- pub Services: HashMap<String, ServiceEntry>,
+ #[serde(rename = "Services")]
+ pub services: HashMap<String, ServiceEntry>,
}
pub struct Consul {
- client: reqwest::Client,
- url: String,
- idx: Option<u64>,
+ client: reqwest::Client,
+ url: String,
+ idx: Option<u64>,
}
impl Consul {
- pub fn new(config: &RuntimeConfigConsul) -> Self {
- let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
- if skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else if let Some(ca) = ca {
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(ca)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- }
- } else {
- reqwest::Client::new()
- };
- return Self {
- client,
- url: config.url.clone(),
- idx: None,
- };
- }
+ pub fn new(config: &RuntimeConfigConsul) -> Self {
+ let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
+ if skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else if let Some(ca) = ca {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(ca)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ }
+ } else {
+ reqwest::Client::new()
+ };
+ return Self {
+ client,
+ url: config.url.clone(),
+ idx: None,
+ };
+ }
+
+ pub fn watch_node_reset(&mut self) -> () {
+ self.idx = None;
+ }
- pub fn watch_node_reset(&mut self) -> () {
- self.idx = None;
- }
+ pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
+ let url = match self.idx {
+ Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
+ None => format!("{}/v1/catalog/node/{}", self.url, host),
+ };
- pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
- let url = match self.idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
+ let http = self.client.get(&url).send().await?;
+ self.idx = match http.headers().get("X-Consul-Index") {
+ Some(v) => Some(v.to_str()?.parse::<u64>()?),
+ None => return Err(anyhow!("X-Consul-Index header not found")),
+ };
- let http = self.client.get(&url).send().await?;
- self.idx = match http.headers().get("X-Consul-Index") {
- Some(v) => Some(v.to_str()?.parse::<u64>()?),
- None => return Err(anyhow!("X-Consul-Index header not found")),
- };
+ let resp: Option<CatalogNode> = http.json().await?;
+ return Ok(resp.unwrap_or_default());
+ }
- let resp: CatalogNode = http.json().await?;
- return Ok(resp);
- }
+ pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> {
+ let url = format!("{}/v1/kv/{}", self.url, key);
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
}
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index 8b722b6..c099215 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -11,107 +11,110 @@ use crate::{consul, messages};
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
- tcp_port(HashSet<u16>),
- udp_port(HashSet<u16>),
+ #[serde(rename = "tcp_port")]
+ TcpPort(HashSet<u16>),
+ #[serde(rename = "udp_port")]
+ UdpPort(HashSet<u16>),
}
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatConsul {
- diplonat(Vec<DiplonatParameter>),
+ #[serde(rename = "diplonat")]
+ Diplonat(Vec<DiplonatParameter>),
}
pub struct ConsulActor {
- pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
+ pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
- consul: consul::Consul,
- node: String,
- retries: u32,
- tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
+ consul: consul::Consul,
+ node: String,
+ retries: u32,
+ tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
}
fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
- // 1.2^x seems to be a good value to exponentially increase time at a good pace
- // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
- // minutes
- return Duration::from_secs(cmp::min(
- max_time.as_secs(),
- 1.2f64.powf(retries as f64) as u64,
- ));
+ // 1.2^x seems to be a good value to exponentially increase time at a good pace
+ // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
+ // minutes
+ return Duration::from_secs(cmp::min(
+ max_time.as_secs(),
+ 1.2f64.powf(retries as f64) as u64,
+ ));
}
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
- let mut r = Vec::new();
-
- for (_, service_info) in &catalog.Services {
- for tag in &service_info.Tags {
- let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
- match diplo_conf {
- Ok(conf) => r.push(conf),
- Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
- };
+ let mut r = Vec::new();
+
+ for (_, service_info) in &catalog.services {
+ for tag in &service_info.tags {
+ let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
+ match diplo_conf {
+ Ok(conf) => r.push(conf),
+ Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
+ };
+ }
}
- }
- return r;
+ return r;
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
- let mut op = messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
-
- for conf in params {
- let DiplonatConsul::diplonat(c) = conf;
- for parameter in c {
- match parameter {
- DiplonatParameter::tcp_port(p) => op.tcp_ports.extend(p),
- DiplonatParameter::udp_port(p) => op.udp_ports.extend(p),
- };
+ let mut op = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+
+ for conf in params {
+ let DiplonatConsul::Diplonat(c) = conf;
+ for parameter in c {
+ match parameter {
+ DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
+ DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
+ };
+ }
}
- }
- return op;
+ return op;
}
impl ConsulActor {
- pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
- let (tx, rx) = watch::channel(messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- });
-
- return Self {
- consul: consul::Consul::new(config),
- rx_open_ports: rx,
- tx_open_ports: tx,
- node: node.to_string(),
- retries: 0,
- };
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- loop {
- let catalog = match self.consul.watch_node(&self.node).await {
- Ok(c) => c,
- Err(e) => {
- self.consul.watch_node_reset();
- self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
- let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
- error!(
- "Failed to query consul. Will retry in {}s. {}",
- will_retry_in.as_secs(),
- e
- );
- sleep(will_retry_in).await;
- continue;
- }
- };
- self.retries = 0;
- let msg = to_open_ports(&to_parameters(&catalog));
- debug!("Extracted configuration: {:#?}", msg);
+ pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
+ let (tx, rx) = watch::channel(messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ });
+
+ return Self {
+ consul: consul::Consul::new(config),
+ rx_open_ports: rx,
+ tx_open_ports: tx,
+ node: node.to_string(),
+ retries: 0,
+ };
+ }
- self.tx_open_ports.send(msg)?;
+ pub async fn listen(&mut self) -> Result<()> {
+ loop {
+ let catalog = match self.consul.watch_node(&self.node).await {
+ Ok(c) => c,
+ Err(e) => {
+ self.consul.watch_node_reset();
+ self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
+ let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
+ error!(
+ "Failed to query consul. Will retry in {}s. {}",
+ will_retry_in.as_secs(),
+ e
+ );
+ sleep(will_retry_in).await;
+ continue;
+ }
+ };
+ self.retries = 0;
+ let msg = to_open_ports(&to_parameters(&catalog));
+ debug!("Extracted configuration: {:#?}", msg);
+
+ self.tx_open_ports.send(msg)?;
+ }
}
- }
}
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 22ebd6e..96bac3b 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -1,49 +1,78 @@
-use anyhow::Result;
+use anyhow::{Context, Result};
+use futures::future::FutureExt;
use tokio::try_join;
use crate::{
- config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor,
+ config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor,
+ stun_actor::StunActor,
};
pub struct Diplonat {
- consul: ConsulActor,
- firewall: FirewallActor,
- igd: IgdActor,
+ consul: ConsulActor,
+ firewall: FirewallActor,
+ igd: Option<IgdActor>,
+ stun: StunActor,
}
impl Diplonat {
- pub async fn new() -> Result<Self> {
- let rt_cfg = ConfigOpts::from_env()?;
- println!("{:#?}", rt_cfg);
-
- let ca = ConsulActor::new(&rt_cfg.consul, &rt_cfg.consul.node_name);
-
- let fw = FirewallActor::new(rt_cfg.firewall.refresh_time, &ca.rx_open_ports).await?;
-
- let ia = IgdActor::new(
- rt_cfg.igd.private_ip.as_ref().map(String::as_str),
- rt_cfg.igd.refresh_time,
- rt_cfg.igd.expiration_time,
- &ca.rx_open_ports,
- )
- .await?;
-
- let ctx = Self {
- consul: ca,
- igd: ia,
- firewall: fw,
- };
-
- return Ok(ctx);
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- try_join!(
- self.consul.listen(),
- self.igd.listen(),
- self.firewall.listen()
- )?;
-
- return Ok(());
- }
+ pub async fn new() -> Result<Self> {
+ let rt_cfg = ConfigOpts::from_env().context("Parse configuration")?;
+ println!("{:#?}", rt_cfg);
+
+ let ca = ConsulActor::new(&rt_cfg.consul, &rt_cfg.consul.node_name);
+
+ let fw = FirewallActor::new(
+ rt_cfg.firewall.ipv6_only,
+ rt_cfg.firewall.refresh_time,
+ &ca.rx_open_ports,
+ )
+ .await
+ .context("Setup fireall actor")?;
+
+ let ia = match rt_cfg.igd {
+ Some(igdc) => Some(
+ IgdActor::new(
+ igdc.private_ip,
+ igdc.refresh_time,
+ igdc.expiration_time,
+ &ca.rx_open_ports,
+ )
+ .await
+ .context("Setup IGD actor")?,
+ ),
+ None => None,
+ };
+
+ let sa = StunActor::new(&rt_cfg.consul, &rt_cfg.stun, &rt_cfg.consul.node_name);
+
+ let ctx = Self {
+ consul: ca,
+ igd: ia,
+ firewall: fw,
+ stun: sa,
+ };
+
+ Ok(ctx)
+ }
+
+ pub async fn listen(&mut self) -> Result<()> {
+ let igd_opt = &mut self.igd;
+
+ try_join!(
+ self.consul.listen().map(|x| x.context("Run consul actor")),
+ async {
+ if let Some(igd) = igd_opt {
+ igd.listen().await.context("Run IGD actor")
+ } else {
+ Ok(())
+ }
+ },
+ self.firewall
+ .listen()
+ .map(|x| x.context("Run firewall actor")),
+ self.stun.listen().map(|x| x.context("Run STUN actor")),
+ )?;
+
+ Ok(())
+ }
}
diff --git a/src/fw.rs b/src/fw.rs
index e18a301..a3e6dec 100644
--- a/src/fw.rs
+++ b/src/fw.rs
@@ -8,92 +8,96 @@ use regex::Regex;
use crate::messages;
pub fn setup(ipt: &iptables::IPTables) -> Result<()> {
- // ensure we start from a clean state without any rule already set
- cleanup(ipt)?;
+ // ensure we start from a clean state without any rule already set
+ cleanup(ipt)?;
- ipt
- .new_chain("filter", "DIPLONAT")
- .context("Failed to create new chain")?;
- ipt
- .insert_unique("filter", "INPUT", "-j DIPLONAT", 1)
- .context("Failed to insert jump rule")?;
+ info!("{}: creating DIPLONAT chain", ipt.cmd);
+ ipt.new_chain("filter", "DIPLONAT")
+ .context("Failed to create new chain")?;
+ ipt.insert_unique("filter", "INPUT", "-j DIPLONAT", 1)
+ .context("Failed to insert jump rule")?;
- Ok(())
+ Ok(())
}
pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<()> {
- for p in ports.tcp_ports {
- ipt
- .append(
- "filter",
- "DIPLONAT",
- &format!("-p tcp --dport {} -j ACCEPT", p),
- )
- .context("Failed to insert port rule")?;
- }
-
- for p in ports.udp_ports {
- ipt
- .append(
- "filter",
- "DIPLONAT",
- &format!("-p udp --dport {} -j ACCEPT", p),
- )
- .context("Failed to insert port rule")?;
- }
-
- Ok(())
+ for p in ports.tcp_ports {
+ info!("{}: opening TCP port {}", ipt.cmd, p);
+ ipt.append(
+ "filter",
+ "DIPLONAT",
+ &format!("-p tcp --dport {} -j ACCEPT", p),
+ )
+ .context("Failed to insert port rule")?;
+ }
+
+ for p in ports.udp_ports {
+ info!("{}: opening UDP port {}", ipt.cmd, p);
+ ipt.append(
+ "filter",
+ "DIPLONAT",
+ &format!("-p udp --dport {} -j ACCEPT", p),
+ )
+ .context("Failed to insert port rule")?;
+ }
+
+ Ok(())
}
pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExposedPorts> {
- let mut ports = messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
-
- let list = ipt.list("filter", "DIPLONAT")?;
- let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT")
- .context("Regex matching open ports encountered an unexpected rule")?;
- for i in list {
- let caps = re.captures(&i);
- match caps {
- Some(c) => {
- if let (Some(raw_proto), Some(raw_port)) = (c.get(1), c.get(2)) {
- let proto = String::from(raw_proto.as_str());
- let number = String::from(raw_port.as_str()).parse::<u16>()?;
-
- if proto == "tcp" {
- ports.tcp_ports.insert(number);
- } else {
- ports.udp_ports.insert(number);
- }
- } else {
- error!("Unexpected rule found in DIPLONAT chain")
+ let mut ports = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+
+ let list = ipt.list("filter", "DIPLONAT")?;
+ let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT")
+ .context("Regex matching open ports encountered an unexpected rule")?;
+ for i in list {
+ debug!("{} list DIPLONAT: got {}", ipt.cmd, i);
+ let caps = re.captures(&i);
+ match caps {
+ Some(c) => {
+ if let (Some(raw_proto), Some(raw_port)) = (c.get(1), c.get(2)) {
+ let proto = String::from(raw_proto.as_str());
+ let number = String::from(raw_port.as_str()).parse::<u16>()?;
+
+ if proto == "tcp" || proto == "6" {
+ ports.tcp_ports.insert(number);
+ } else if proto == "udp" || proto == "17" {
+ ports.udp_ports.insert(number);
+ } else {
+ error!("Unexpected protocol in iptables rule: {}", proto);
+ }
+ } else {
+ error!("Unexpected rule found in DIPLONAT chain")
+ }
+ }
+ _ => {
+ debug!("{} rule not parsed: {}", ipt.cmd, i);
+ }
}
- }
- _ => {}
}
- }
- Ok(ports)
+ debug!("{} ports already openned: {:?}", ipt.cmd, ports);
+
+ Ok(ports)
}
pub fn cleanup(ipt: &iptables::IPTables) -> Result<()> {
- if ipt.chain_exists("filter", "DIPLONAT")? {
- ipt
- .flush_chain("filter", "DIPLONAT")
- .context("Failed to flush the DIPLONAT chain")?;
-
- if ipt.exists("filter", "INPUT", "-j DIPLONAT")? {
- ipt
- .delete("filter", "INPUT", "-j DIPLONAT")
- .context("Failed to delete jump rule")?;
- }
+ if ipt.chain_exists("filter", "DIPLONAT")? {
+ info!("{}: removing old DIPLONAT chain", ipt.cmd);
+ ipt.flush_chain("filter", "DIPLONAT")
+ .context("Failed to flush the DIPLONAT chain")?;
+
+ if ipt.exists("filter", "INPUT", "-j DIPLONAT")? {
+ ipt.delete("filter", "INPUT", "-j DIPLONAT")
+ .context("Failed to delete jump rule")?;
+ }
- ipt
- .delete_chain("filter", "DIPLONAT")
- .context("Failed to delete chain")?;
- }
+ ipt.delete_chain("filter", "DIPLONAT")
+ .context("Failed to delete chain")?;
+ }
- Ok(())
+ Ok(())
}
diff --git a/src/fw_actor.rs b/src/fw_actor.rs
index 518c1b8..02d8bcb 100644
--- a/src/fw_actor.rs
+++ b/src/fw_actor.rs
@@ -4,88 +4,100 @@ use anyhow::Result;
use iptables;
use log::*;
use tokio::{
- select,
- sync::watch,
- time::{self, Duration},
+ select,
+ sync::watch,
+ time::{self, Duration},
};
use crate::{fw, messages};
pub struct FirewallActor {
- pub ipt_v4: iptables::IPTables,
- pub ipt_v6: iptables::IPTables,
- rx_ports: watch::Receiver<messages::PublicExposedPorts>,
- last_ports: messages::PublicExposedPorts,
- refresh: Duration,
+ pub ipt_v4: Option<iptables::IPTables>,
+ pub ipt_v6: iptables::IPTables,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ last_ports: messages::PublicExposedPorts,
+ refresh: Duration,
}
impl FirewallActor {
- pub async fn new(
- refresh: Duration,
- rxp: &watch::Receiver<messages::PublicExposedPorts>,
- ) -> Result<Self> {
- let ctx = Self {
- ipt_v4: iptables::new(false)?,
- ipt_v6: iptables::new(true)?,
- rx_ports: rxp.clone(),
- last_ports: messages::PublicExposedPorts::new(),
- refresh,
- };
+ pub async fn new(
+ ipv6_only: bool,
+ refresh: Duration,
+ rxp: &watch::Receiver<messages::PublicExposedPorts>,
+ ) -> Result<Self> {
+ let ctx = Self {
+ ipt_v4: match ipv6_only {
+ false => Some(iptables::new(false)?),
+ true => None,
+ },
+ ipt_v6: iptables::new(true)?,
+ rx_ports: rxp.clone(),
+ last_ports: messages::PublicExposedPorts::new(),
+ refresh,
+ };
- fw::setup(&ctx.ipt_v4)?;
- fw::setup(&ctx.ipt_v6)?;
+ if let Some(ipt_v4) = &ctx.ipt_v4 {
+ fw::setup(ipt_v4)?;
+ }
+ fw::setup(&ctx.ipt_v6)?;
- return Ok(ctx);
- }
+ return Ok(ctx);
+ }
- pub async fn listen(&mut self) -> Result<()> {
- let mut interval = time::interval(self.refresh);
- loop {
- // 1. Wait for an event
- let new_ports = select! {
- _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
- _ = interval.tick() => None,
- else => return Ok(()) // Sender dropped, terminate loop.
- };
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
- // 2. Update last ports if needed
- if let Some(p) = new_ports {
- self.last_ports = p;
- }
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports {
+ self.last_ports = p;
+ }
- // 3. Update firewall rules
- match self.do_fw_update().await {
- Ok(()) => debug!("Successfully updated firewall rules"),
- Err(e) => error!("An error occured while updating firewall rules. {}", e),
- }
+ // 3. Update firewall rules
+ match self.do_fw_update().await {
+ Ok(()) => debug!("Successfully updated firewall rules"),
+ Err(e) => error!("An error occured while updating firewall rules. {}", e),
+ }
+ }
}
- }
- pub async fn do_fw_update(&self) -> Result<()> {
- for ipt in [&self.ipt_v4, &self.ipt_v6] {
- let curr_opened_ports = fw::get_opened_ports(ipt)?;
+ pub async fn do_fw_update(&self) -> Result<()> {
+ if let Some(ipt_v4) = &self.ipt_v4 {
+ self.do_fw_update_on(ipt_v4).await?;
+ }
+ self.do_fw_update_on(&self.ipt_v6).await?;
+ Ok(())
+ }
- let diff_tcp = self
- .last_ports
- .tcp_ports
- .difference(&curr_opened_ports.tcp_ports)
- .copied()
- .collect::<HashSet<u16>>();
- let diff_udp = self
- .last_ports
- .udp_ports
- .difference(&curr_opened_ports.udp_ports)
- .copied()
- .collect::<HashSet<u16>>();
+ pub async fn do_fw_update_on(&self, ipt: &iptables::IPTables) -> Result<()> {
+ let curr_opened_ports = fw::get_opened_ports(ipt)?;
- let ports_to_open = messages::PublicExposedPorts {
- tcp_ports: diff_tcp,
- udp_ports: diff_udp,
- };
+ let diff_tcp = self
+ .last_ports
+ .tcp_ports
+ .difference(&curr_opened_ports.tcp_ports)
+ .copied()
+ .collect::<HashSet<u16>>();
+ let diff_udp = self
+ .last_ports
+ .udp_ports
+ .difference(&curr_opened_ports.udp_ports)
+ .copied()
+ .collect::<HashSet<u16>>();
- fw::open_ports(ipt, ports_to_open)?;
- }
+ let ports_to_open = messages::PublicExposedPorts {
+ tcp_ports: diff_tcp,
+ udp_ports: diff_udp,
+ };
+
+ fw::open_ports(ipt, ports_to_open)?;
- return Ok(());
- }
+ return Ok(());
+ }
}
diff --git a/src/igd_actor.rs b/src/igd_actor.rs
index d32f5dc..f0dd391 100644
--- a/src/igd_actor.rs
+++ b/src/igd_actor.rs
@@ -1,126 +1,122 @@
-use std::net::SocketAddrV4;
+use std::net::{Ipv4Addr, SocketAddrV4};
use anyhow::{Context, Result};
use igd::{aio::*, PortMappingProtocol};
use log::*;
use tokio::{
- select,
- sync::watch,
- time::{self, Duration},
+ select,
+ sync::watch,
+ time::{self, Duration},
};
use crate::messages;
pub struct IgdActor {
- last_ports: messages::PublicExposedPorts,
- rx_ports: watch::Receiver<messages::PublicExposedPorts>,
- gateway: Gateway,
- refresh: Duration,
- expire: Duration,
- private_ip: String,
+ last_ports: messages::PublicExposedPorts,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ gateway: Gateway,
+ refresh: Duration,
+ expire: Duration,
+ private_ip: Ipv4Addr,
}
impl IgdActor {
- pub async fn new(
- priv_ip: Option<&str>,
- refresh: Duration,
- expire: Duration,
- rxp: &watch::Receiver<messages::PublicExposedPorts>,
- ) -> Result<Self> {
- let gw = search_gateway(Default::default())
- .await
- .context("Failed to find IGD gateway")?;
- info!("IGD gateway: {}", gw);
+ pub async fn new(
+ priv_ip: Option<Ipv4Addr>,
+ refresh: Duration,
+ expire: Duration,
+ rxp: &watch::Receiver<messages::PublicExposedPorts>,
+ ) -> Result<Self> {
+ let gw = search_gateway(Default::default())
+ .await
+ .context("Failed to find IGD gateway")?;
+ info!("IGD gateway: {}", gw);
- let private_ip = if let Some(ip) = priv_ip {
- info!("Using private IP from config: {}", ip);
- ip.to_string()
- } else {
- info!("Trying to automatically detect private IP");
- let gwa = gw.addr.ip().octets();
- let cmplen = match gwa {
- [192, 168, _, _] => 3,
- [10, _, _, _] => 2,
- _ => panic!(
- "Gateway IP does not appear to be in a local network ({})",
- gw.addr.ip()
- ),
- };
- let public_ip = get_if_addrs::get_if_addrs()?
- .into_iter()
- .map(|i| i.addr.ip())
- .filter(|a| match a {
- std::net::IpAddr::V4(a4) => (a4.octets()[..cmplen] == gwa[..cmplen]),
- _ => false,
- })
- .next()
- .expect("No interface has an IP on same subnet as gateway")
- .to_string();
- info!("Found private IP: {}", public_ip);
- public_ip
- };
+ let private_ip = if let Some(ip) = priv_ip {
+ info!("Using private IP from config: {}", ip);
+ ip
+ } else {
+ info!("Trying to automatically detect private IP");
+ let gwa = gw.addr.ip().octets();
+ let cmplen = match gwa {
+ [192, 168, _, _] => 3,
+ [10, _, _, _] => 2,
+ _ => panic!(
+ "Gateway IP does not appear to be in a local network ({})",
+ gw.addr.ip()
+ ),
+ };
+ #[allow(unused_parens)]
+ let private_ip = get_if_addrs::get_if_addrs()?
+ .into_iter()
+ .map(|i| i.addr.ip())
+ .filter_map(|a| match a {
+ std::net::IpAddr::V4(a4) if a4.octets()[..cmplen] == gwa[..cmplen] => Some(a4),
+ _ => None,
+ })
+ .next()
+ .expect("No interface has an IP on same subnet as gateway");
+ info!("Autodetected private IP: {}", private_ip);
+ private_ip
+ };
- let ctx = Self {
- gateway: gw,
- rx_ports: rxp.clone(),
- private_ip,
- refresh: refresh,
- expire: expire,
- last_ports: messages::PublicExposedPorts::new(),
- };
+ let ctx = Self {
+ gateway: gw,
+ rx_ports: rxp.clone(),
+ private_ip,
+ refresh: refresh,
+ expire: expire,
+ last_ports: messages::PublicExposedPorts::new(),
+ };
- return Ok(ctx);
- }
+ return Ok(ctx);
+ }
- pub async fn listen(&mut self) -> Result<()> {
- let mut interval = time::interval(self.refresh);
- loop {
- // 1. Wait for an event
- let new_ports = select! {
- _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
- _ = interval.tick() => None,
- else => return Ok(()) // Sender dropped, terminate loop.
- };
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
- // 2. Update last ports if needed
- if let Some(p) = new_ports {
- self.last_ports = p;
- }
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports {
+ self.last_ports = p;
+ }
- // 3. Flush IGD requests
- match self.do_igd().await {
- Ok(()) => debug!("Successfully updated IGD"),
- Err(e) => error!("An error occured while updating IGD. {}", e),
- }
+ // 3. Flush IGD requests
+ match self.do_igd().await {
+ Ok(()) => debug!("Successfully updated IGD"),
+ Err(e) => error!("An error occured while updating IGD. {}", e),
+ }
+ }
}
- }
- pub async fn do_igd(&self) -> Result<()> {
- let actions = [
- (PortMappingProtocol::TCP, &self.last_ports.tcp_ports),
- (PortMappingProtocol::UDP, &self.last_ports.udp_ports),
- ];
+ pub async fn do_igd(&self) -> Result<()> {
+ let actions = [
+ (PortMappingProtocol::TCP, &self.last_ports.tcp_ports),
+ (PortMappingProtocol::UDP, &self.last_ports.udp_ports),
+ ];
- for (proto, list) in actions.iter() {
- for port in *list {
- let service_str = format!("{}:{}", self.private_ip, port);
- let service = service_str
- .parse::<SocketAddrV4>()
- .context("Invalid socket address")?;
- self
- .gateway
- .add_port(
- *proto,
- *port,
- service,
- self.expire.as_secs() as u32,
- "diplonat",
- )
- .await?;
- debug!("IGD request successful for {:#?} {}", proto, service);
- }
- }
+ for (proto, list) in actions.iter() {
+ for port in *list {
+ let service = SocketAddrV4::new(self.private_ip, *port);
+ self.gateway
+ .add_port(
+ *proto,
+ *port,
+ service,
+ self.expire.as_secs() as u32,
+ "diplonat",
+ )
+ .await?;
+ debug!("IGD request successful for {:#?} {}", proto, service);
+ }
+ }
- return Ok(());
- }
+ return Ok(());
+ }
}
diff --git a/src/main.rs b/src/main.rs
index 99d38f5..204520d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,15 +6,20 @@ mod fw;
mod fw_actor;
mod igd_actor;
mod messages;
+mod stun_actor;
use diplonat::Diplonat;
use log::*;
#[tokio::main]
async fn main() {
- pretty_env_logger::init();
- info!("Starting Diplonat");
+ pretty_env_logger::init();
+ info!("Starting Diplonat");
- let mut diplo = Diplonat::new().await.expect("Setup failed");
- diplo.listen().await.expect("A runtime error occured");
+ Diplonat::new()
+ .await
+ .expect("Setup failed")
+ .listen()
+ .await
+ .expect("A runtime error occured");
}
diff --git a/src/messages.rs b/src/messages.rs
index 63f16b0..e5322f1 100644
--- a/src/messages.rs
+++ b/src/messages.rs
@@ -2,15 +2,15 @@ use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicExposedPorts {
- pub tcp_ports: HashSet<u16>,
- pub udp_ports: HashSet<u16>,
+ pub tcp_ports: HashSet<u16>,
+ pub udp_ports: HashSet<u16>,
}
impl PublicExposedPorts {
- pub fn new() -> Self {
- return Self {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
- }
+ pub fn new() -> Self {
+ return Self {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+ }
}
diff --git a/src/stun_actor.rs b/src/stun_actor.rs
new file mode 100644
index 0000000..6740c83
--- /dev/null
+++ b/src/stun_actor.rs
@@ -0,0 +1,147 @@
+use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::time::{Duration, SystemTime};
+
+use anyhow::{anyhow, bail, Result};
+use log::*;
+use serde::{Deserialize, Serialize};
+
+use crate::config::{RuntimeConfigConsul, RuntimeConfigStun};
+use crate::consul;
+
+pub struct StunActor {
+ node: String,
+ consul: consul::Consul,
+ stun_server_v4: Option<SocketAddr>,
+ stun_server_v6: SocketAddr,
+ refresh_time: Duration,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct AutodiscoverResult {
+ pub timestamp: u64,
+ pub address: Option<IpAddr>,
+}
+
+impl StunActor {
+ pub fn new(
+ consul_config: &RuntimeConfigConsul,
+ stun_config: &RuntimeConfigStun,
+ node: &str,
+ ) -> Self {
+ assert!(stun_config
+ .stun_server_v4
+ .map(|x| x.is_ipv4())
+ .unwrap_or(true));
+ assert!(stun_config.stun_server_v6.is_ipv6());
+
+ Self {
+ consul: consul::Consul::new(consul_config),
+ node: node.to_string(),
+ stun_server_v4: stun_config.stun_server_v4,
+ stun_server_v6: stun_config.stun_server_v6,
+ refresh_time: stun_config.refresh_time,
+ }
+ }
+
+ pub async fn listen(&mut self) -> Result<()> {
+ loop {
+ let ipv4_result = match self.stun_server_v4 {
+ Some(stun_server_v4) => self.autodiscover_ip(stun_server_v4).await,
+ None => self.autodiscover_none_ipv4().await,
+ };
+ if let Err(e) = ipv4_result {
+ error!("Unable to autodiscover IPv4 address: {}", e);
+ }
+ if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await {
+ error!("Unable to autodiscover IPv6 address: {}", e);
+ }
+ tokio::time::sleep(self.refresh_time).await;
+ }
+ }
+
+ async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> {
+ let binding_ip = match stun_server.is_ipv4() {
+ true => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0
+ false => IpAddr::V6(Ipv6Addr::UNSPECIFIED), // [::]
+ };
+ let binding_addr = SocketAddr::new(binding_ip, 0);
+
+ let discovered_addr = get_mapped_addr(stun_server, binding_addr)
+ .await?
+ .map(|x| x.ip());
+
+ let consul_key = match stun_server.is_ipv4() {
+ true => {
+ debug!("Autodiscovered IPv4: {:?}", discovered_addr);
+ format!("diplonat/autodiscovery/ipv4/{}", self.node)
+ }
+ false => {
+ debug!("Autodiscovered IPv6: {:?}", discovered_addr);
+ format!("diplonat/autodiscovery/ipv6/{}", self.node)
+ }
+ };
+
+ self.consul
+ .kv_put(
+ &consul_key,
+ serde_json::to_vec(&AutodiscoverResult {
+ timestamp: timestamp(),
+ address: discovered_addr,
+ })?,
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ async fn autodiscover_none_ipv4(&self) -> Result<()> {
+ let consul_key = format!("diplonat/autodiscovery/ipv4/{}", self.node);
+
+ self.consul
+ .kv_put(
+ &consul_key,
+ serde_json::to_vec(&AutodiscoverResult {
+ timestamp: timestamp(),
+ address: None,
+ })?,
+ )
+ .await?;
+
+ Ok(())
+ }
+}
+
+async fn get_mapped_addr(
+ stun_server: SocketAddr,
+ binding_addr: SocketAddr,
+) -> Result<Option<SocketAddr>> {
+ use stun_client::*;
+
+ let mut client = Client::new(binding_addr, None).await?;
+ let res = match client.binding_request(stun_server, None).await {
+ Err(e) => {
+ info!(
+ "STUN binding request to {} failed, assuming no address (error: {})",
+ binding_addr, e
+ );
+ return Ok(None);
+ }
+ Ok(r) => r,
+ };
+
+ if res.get_class() != Class::SuccessResponse {
+ bail!("STUN server did not responde with a success response");
+ }
+
+ let xor_mapped_addr = Attribute::get_xor_mapped_address(&res)
+ .ok_or(anyhow!("no XorMappedAddress found in STUN response"))?;
+
+ Ok(Some(xor_mapped_addr))
+}
+
+fn timestamp() -> u64 {
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("clock error")
+ .as_secs()
+}