aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-04-04 18:48:52 +0200
committerAlex Auvolat <alex@adnab.me>2023-04-04 18:48:52 +0200
commitb3f76f272abab8695b4406c6f0addcb58253c89d (patch)
treed7a2dd93548d7c35bab47865c2508b252683f9d3 /src
parent2d39adcabb898686cb63c10713e9d9b63efc5601 (diff)
downloaddiplonat-b3f76f272abab8695b4406c6f0addcb58253c89d.tar.gz
diplonat-b3f76f272abab8695b4406c6f0addcb58253c89d.zip
Remove .rustfmt.toml and move to standard rustfmt format (4 spaces)
Diffstat (limited to 'src')
-rw-r--r--src/config/mod.rs4
-rw-r--r--src/config/options.rs109
-rw-r--r--src/config/options_test.rs182
-rw-r--r--src/config/runtime.rs249
-rw-r--r--src/consul.rs122
-rw-r--r--src/consul_actor.rs160
-rw-r--r--src/diplonat.rs88
-rw-r--r--src/fw.rs149
-rw-r--r--src/fw_actor.rs130
-rw-r--r--src/igd_actor.rs203
-rw-r--r--src/main.rs16
-rw-r--r--src/messages.rs16
-rw-r--r--src/stun_actor.rs153
13 files changed, 787 insertions, 794 deletions
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 45ea975..00577c6 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -5,8 +5,8 @@ mod runtime;
pub use options::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul};
pub use runtime::{
- RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd,
- RuntimeConfigStun,
+ RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd,
+ RuntimeConfigStun,
};
pub const EXPIRATION_TIME: u16 = 300;
diff --git a/src/config/options.rs b/src/config/options.rs
index 08cdd15..100c23c 100644
--- a/src/config/options.rs
+++ b/src/config/options.rs
@@ -11,79 +11,80 @@ use crate::config::RuntimeConfig;
/// Base configuration options
#[derive(Clone, Default, Deserialize)]
pub struct ConfigOptsBase {
- /// This node's private IP address [default: None]
- pub private_ip: Option<String>,
- /// Expiration time for IGD rules [default: 60]
- pub expiration_time: Option<u16>,
- /// Refresh time for IGD and Firewall rules [default: 300]
- pub refresh_time: Option<u16>,
- /// STUN server [default: stun.nextcloud.com:443]
- pub stun_server: Option<String>,
+ /// This node's private IP address [default: None]
+ pub private_ip: Option<String>,
+ /// Expiration time for IGD rules [default: 60]
+ pub expiration_time: Option<u16>,
+ /// Refresh time for IGD and Firewall rules [default: 300]
+ pub refresh_time: Option<u16>,
+ /// STUN server [default: stun.nextcloud.com:443]
+ pub stun_server: Option<String>,
}
/// ACME configuration options
#[derive(Clone, Default, Deserialize)]
pub struct ConfigOptsAcme {
- /// Whether ACME is enabled [default: false]
- #[serde(default)]
- pub enable: bool,
+ /// Whether ACME is enabled [default: false]
+ #[serde(default)]
+ pub enable: bool,
- /// The default domain holder's e-mail [default: None]
- pub email: Option<String>,
+ /// The default domain holder's e-mail [default: None]
+ pub email: Option<String>,
}
/// Consul configuration options
#[derive(Clone, Default, Deserialize)]
pub struct ConfigOptsConsul {
- /// Consul's node name [default: None]
- pub node_name: Option<String>,
- /// Consul's REST URL [default: "http://127.0.0.1:8500"]
- pub url: Option<String>,
- /// Consul's CA certificate [default: None]
- pub ca_cert: Option<String>,
- /// Skip TLS verification for Consul server [default: false]
- #[serde(default)]
- pub tls_skip_verify: bool,
- /// Consul's client certificate [default: None]
- pub client_cert: Option<String>,
- /// Consul's client key [default: None]
- pub client_key: Option<String>,
+ /// Consul's node name [default: None]
+ pub node_name: Option<String>,
+ /// Consul's REST URL [default: "http://127.0.0.1:8500"]
+ pub url: Option<String>,
+ /// Consul's CA certificate [default: None]
+ pub ca_cert: Option<String>,
+ /// Skip TLS verification for Consul server [default: false]
+ #[serde(default)]
+ pub tls_skip_verify: bool,
+ /// Consul's client certificate [default: None]
+ pub client_cert: Option<String>,
+ /// Consul's client key [default: None]
+ pub client_key: Option<String>,
}
/// Model of all potential configuration options
pub struct ConfigOpts {
- pub base: ConfigOptsBase,
- pub acme: ConfigOptsAcme,
- pub consul: ConfigOptsConsul,
+ pub base: ConfigOptsBase,
+ pub acme: ConfigOptsAcme,
+ pub consul: ConfigOptsConsul,
}
impl ConfigOpts {
- pub fn from_env() -> Result<RuntimeConfig> {
- let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_env()?;
- let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_env()?;
- let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_env()?;
+ pub fn from_env() -> Result<RuntimeConfig> {
+ let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_env()?;
+ let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_env()?;
+ let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_env()?;
- RuntimeConfig::new(Self {
- base: base,
- consul: consul,
- acme: acme,
- })
- }
+ RuntimeConfig::new(Self {
+ base: base,
+ consul: consul,
+ acme: acme,
+ })
+ }
- // Currently only used in tests
- #[cfg(test)]
- pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
- where
- Iter: IntoIterator<Item = (String, String)>,
- {
- let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
- let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
- let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_iter(iter.clone())?;
+ // Currently only used in tests
+ #[cfg(test)]
+ pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
+ where
+ Iter: IntoIterator<Item = (String, String)>,
+ {
+ let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
+ let consul: ConfigOptsConsul =
+ envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
+ let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_iter(iter.clone())?;
- RuntimeConfig::new(Self {
- base: base,
- consul: consul,
- acme: acme,
- })
- }
+ RuntimeConfig::new(Self {
+ base: base,
+ consul: consul,
+ acme: acme,
+ })
+ }
}
diff --git a/src/config/options_test.rs b/src/config/options_test.rs
index 427b70e..655fb01 100644
--- a/src/config/options_test.rs
+++ b/src/config/options_test.rs
@@ -9,120 +9,118 @@ use crate::config::*;
// This is why we only test ConfigOpts::from_iter(iter).
fn minimal_valid_options() -> HashMap<String, String> {
- let mut opts = HashMap::new();
- opts.insert(
- "DIPLONAT_CONSUL_NODE_NAME".to_string(),
- "consul_node".to_string(),
- );
- opts
+ let mut opts = HashMap::new();
+ opts.insert(
+ "DIPLONAT_CONSUL_NODE_NAME".to_string(),
+ "consul_node".to_string(),
+ );
+ opts
}
fn all_valid_options() -> HashMap<String, String> {
- let mut opts = minimal_valid_options();
- opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
- opts.insert(
- "DIPLONAT_STUN_SERVER".to_string(),
- "stun.nextcloud.com:443".to_string(),
- );
- opts.insert(
- "DIPLONAT_PRIVATE_IP".to_string(),
- "172.123.43.555".to_string(),
- );
- opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "10".to_string());
- opts.insert(
- "DIPLONAT_CONSUL_URL".to_string(),
- "http://127.0.0.1:9999".to_string(),
- );
- opts.insert("DIPLONAT_ACME_ENABLE".to_string(), "true".to_string());
- opts.insert(
- "DIPLONAT_ACME_EMAIL".to_string(),
- "bozo@bozo.net".to_string(),
- );
- opts
+ let mut opts = minimal_valid_options();
+ opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
+ opts.insert(
+ "DIPLONAT_STUN_SERVER".to_string(),
+ "stun.nextcloud.com:443".to_string(),
+ );
+ opts.insert(
+ "DIPLONAT_PRIVATE_IP".to_string(),
+ "172.123.43.555".to_string(),
+ );
+ opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "10".to_string());
+ opts.insert(
+ "DIPLONAT_CONSUL_URL".to_string(),
+ "http://127.0.0.1:9999".to_string(),
+ );
+ opts.insert("DIPLONAT_ACME_ENABLE".to_string(), "true".to_string());
+ opts.insert(
+ "DIPLONAT_ACME_EMAIL".to_string(),
+ "bozo@bozo.net".to_string(),
+ );
+ opts
}
#[test]
#[should_panic]
fn err_empty_env() {
- std::env::remove_var("DIPLONAT_CONSUL_NODE_NAME");
- ConfigOpts::from_env().unwrap();
+ std::env::remove_var("DIPLONAT_CONSUL_NODE_NAME");
+ ConfigOpts::from_env().unwrap();
}
#[test]
fn ok_from_iter_minimal_valid_options() {
- let opts = minimal_valid_options();
- let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
+ let opts = minimal_valid_options();
+ let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
- assert!(rt_config.acme.is_none());
- assert_eq!(
- &rt_config.consul.node_name,
- opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
- );
- assert_eq!(rt_config.consul.url, CONSUL_URL.to_string());
- assert_eq!(
- rt_config.firewall.refresh_time,
- Duration::from_secs(REFRESH_TIME.into())
- );
- assert!(rt_config.igd.private_ip.is_none());
- assert_eq!(
- rt_config.igd.expiration_time,
- Duration::from_secs(EXPIRATION_TIME.into())
- );
- assert_eq!(
- rt_config.igd.refresh_time,
- Duration::from_secs(REFRESH_TIME.into())
- );
+ assert!(rt_config.acme.is_none());
+ assert_eq!(
+ &rt_config.consul.node_name,
+ opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
+ );
+ assert_eq!(rt_config.consul.url, CONSUL_URL.to_string());
+ assert_eq!(
+ rt_config.firewall.refresh_time,
+ Duration::from_secs(REFRESH_TIME.into())
+ );
+ assert!(rt_config.igd.private_ip.is_none());
+ assert_eq!(
+ rt_config.igd.expiration_time,
+ Duration::from_secs(EXPIRATION_TIME.into())
+ );
+ assert_eq!(
+ rt_config.igd.refresh_time,
+ Duration::from_secs(REFRESH_TIME.into())
+ );
}
#[test]
#[should_panic]
fn err_from_iter_invalid_refresh_time() {
- let mut opts = minimal_valid_options();
- opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "60".to_string());
- opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "60".to_string());
- ConfigOpts::from_iter(opts).unwrap();
+ let mut opts = minimal_valid_options();
+ opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "60".to_string());
+ opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "60".to_string());
+ ConfigOpts::from_iter(opts).unwrap();
}
#[test]
fn ok_from_iter_all_valid_options() {
- let opts = all_valid_options();
- let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
+ let opts = all_valid_options();
+ let rt_config = ConfigOpts::from_iter(opts.clone()).unwrap();
- let expiration_time = Duration::from_secs(
- opts
- .get(&"DIPLONAT_EXPIRATION_TIME".to_string())
- .unwrap()
- .parse::<u64>()
- .unwrap()
- .into(),
- );
- let refresh_time = Duration::from_secs(
- opts
- .get(&"DIPLONAT_REFRESH_TIME".to_string())
- .unwrap()
- .parse::<u64>()
- .unwrap()
- .into(),
- );
+ let expiration_time = Duration::from_secs(
+ opts.get(&"DIPLONAT_EXPIRATION_TIME".to_string())
+ .unwrap()
+ .parse::<u64>()
+ .unwrap()
+ .into(),
+ );
+ let refresh_time = Duration::from_secs(
+ opts.get(&"DIPLONAT_REFRESH_TIME".to_string())
+ .unwrap()
+ .parse::<u64>()
+ .unwrap()
+ .into(),
+ );
- assert!(rt_config.acme.is_some());
- assert_eq!(
- &rt_config.acme.unwrap().email,
- opts.get(&"DIPLONAT_ACME_EMAIL".to_string()).unwrap()
- );
- assert_eq!(
- &rt_config.consul.node_name,
- opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
- );
- assert_eq!(
- &rt_config.consul.url,
- opts.get(&"DIPLONAT_CONSUL_URL".to_string()).unwrap()
- );
- assert_eq!(rt_config.firewall.refresh_time, refresh_time);
- assert_eq!(
- &rt_config.igd.private_ip.unwrap(),
- opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
- );
- assert_eq!(rt_config.igd.expiration_time, expiration_time);
- assert_eq!(rt_config.igd.refresh_time, refresh_time);
+ assert!(rt_config.acme.is_some());
+ assert_eq!(
+ &rt_config.acme.unwrap().email,
+ opts.get(&"DIPLONAT_ACME_EMAIL".to_string()).unwrap()
+ );
+ assert_eq!(
+ &rt_config.consul.node_name,
+ opts.get(&"DIPLONAT_CONSUL_NODE_NAME".to_string()).unwrap()
+ );
+ assert_eq!(
+ &rt_config.consul.url,
+ opts.get(&"DIPLONAT_CONSUL_URL".to_string()).unwrap()
+ );
+ assert_eq!(rt_config.firewall.refresh_time, refresh_time);
+ assert_eq!(
+ &rt_config.igd.private_ip.unwrap(),
+ opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
+ );
+ assert_eq!(rt_config.igd.expiration_time, expiration_time);
+ assert_eq!(rt_config.igd.refresh_time, refresh_time);
}
diff --git a/src/config/runtime.rs b/src/config/runtime.rs
index f2d2a12..bffea52 100644
--- a/src/config/runtime.rs
+++ b/src/config/runtime.rs
@@ -14,179 +14,182 @@ use crate::config::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul
#[derive(Debug)]
pub struct RuntimeConfigAcme {
- pub email: String,
+ pub email: String,
}
#[derive(Debug)]
pub struct RuntimeConfigConsul {
- pub node_name: String,
- pub url: String,
- pub tls: Option<(Option<reqwest::Certificate>, bool, reqwest::Identity)>,
+ pub node_name: String,
+ pub url: String,
+ pub tls: Option<(Option<reqwest::Certificate>, bool, reqwest::Identity)>,
}
#[derive(Debug)]
pub struct RuntimeConfigFirewall {
- pub refresh_time: Duration,
+ pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfigIgd {
- pub private_ip: Option<String>,
- pub expiration_time: Duration,
- pub refresh_time: Duration,
+ pub private_ip: Option<String>,
+ pub expiration_time: Duration,
+ pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfigStun {
- pub stun_server_v4: SocketAddr,
- pub stun_server_v6: SocketAddr,
- pub refresh_time: Duration,
+ pub stun_server_v4: SocketAddr,
+ pub stun_server_v6: SocketAddr,
+ pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfig {
- pub acme: Option<RuntimeConfigAcme>,
- pub consul: RuntimeConfigConsul,
- pub firewall: RuntimeConfigFirewall,
- pub igd: RuntimeConfigIgd,
- pub stun: RuntimeConfigStun,
+ pub acme: Option<RuntimeConfigAcme>,
+ pub consul: RuntimeConfigConsul,
+ pub firewall: RuntimeConfigFirewall,
+ pub igd: RuntimeConfigIgd,
+ pub stun: RuntimeConfigStun,
}
impl RuntimeConfig {
- pub fn new(opts: ConfigOpts) -> Result<Self> {
- let acme = RuntimeConfigAcme::new(opts.acme)?;
- let consul = RuntimeConfigConsul::new(opts.consul)?;
- let firewall = RuntimeConfigFirewall::new(&opts.base)?;
- let igd = RuntimeConfigIgd::new(&opts.base)?;
- let stun = RuntimeConfigStun::new(&opts.base)?;
-
- Ok(Self {
- acme,
- consul,
- firewall,
- igd,
- stun,
- })
- }
+ pub fn new(opts: ConfigOpts) -> Result<Self> {
+ let acme = RuntimeConfigAcme::new(opts.acme)?;
+ let consul = RuntimeConfigConsul::new(opts.consul)?;
+ let firewall = RuntimeConfigFirewall::new(&opts.base)?;
+ let igd = RuntimeConfigIgd::new(&opts.base)?;
+ let stun = RuntimeConfigStun::new(&opts.base)?;
+
+ Ok(Self {
+ acme,
+ consul,
+ firewall,
+ igd,
+ stun,
+ })
+ }
}
impl RuntimeConfigAcme {
- pub fn new(opts: ConfigOptsAcme) -> Result<Option<Self>> {
- if !opts.enable {
- return Ok(None);
- }
+ pub fn new(opts: ConfigOptsAcme) -> Result<Option<Self>> {
+ if !opts.enable {
+ return Ok(None);
+ }
- let email = opts.email.expect(
+ let email = opts.email.expect(
"'DIPLONAT_ACME_EMAIL' environment variable is required if 'DIPLONAT_ACME_ENABLE' == 'true'",
);
- Ok(Some(Self { email }))
- }
+ Ok(Some(Self { email }))
+ }
}
impl RuntimeConfigConsul {
- pub(super) fn new(opts: ConfigOptsConsul) -> Result<Self> {
- let node_name = opts
- .node_name
- .expect("'DIPLONAT_CONSUL_NODE_NAME' environment variable is required");
- let url = opts.url.unwrap_or(super::CONSUL_URL.to_string());
-
- let tls = match (&opts.client_cert, &opts.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let cert = match &opts.ca_cert {
- Some(ca_cert) => {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
- Some(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- }
- None => None,
+ pub(super) fn new(opts: ConfigOptsConsul) -> Result<Self> {
+ let node_name = opts
+ .node_name
+ .expect("'DIPLONAT_CONSUL_NODE_NAME' environment variable is required");
+ let url = opts.url.unwrap_or(super::CONSUL_URL.to_string());
+
+ let tls = match (&opts.client_cert, &opts.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let cert = match &opts.ca_cert {
+ Some(ca_cert) => {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+ Some(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ }
+ None => None,
+ };
+
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let ident = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ Some((cert, opts.tls_skip_verify, ident))
+ }
+ (None, None) => None,
+ _ => bail!("Incomplete TLS configuration parameters"),
};
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let ident =
- reqwest::Identity::from_pem(&[&client_cert_buf[..], &client_key_buf[..]].concat()[..])?;
-
- Some((cert, opts.tls_skip_verify, ident))
- }
- (None, None) => None,
- _ => bail!("Incomplete TLS configuration parameters"),
- };
-
- Ok(Self {
- node_name,
- url,
- tls,
- })
- }
+ Ok(Self {
+ node_name,
+ url,
+ tls,
+ })
+ }
}
impl RuntimeConfigFirewall {
- pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
- let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
- Ok(Self { refresh_time })
- }
+ Ok(Self { refresh_time })
+ }
}
impl RuntimeConfigIgd {
- pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
- let private_ip = opts.private_ip.clone();
- let expiration_time = Duration::from_secs(
- opts
- .expiration_time
- .unwrap_or(super::EXPIRATION_TIME)
- .into(),
- );
- let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
-
- if refresh_time.as_secs() * 2 > expiration_time.as_secs() {
- return Err(anyhow!(
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let private_ip = opts.private_ip.clone();
+ let expiration_time = Duration::from_secs(
+ opts.expiration_time
+ .unwrap_or(super::EXPIRATION_TIME)
+ .into(),
+ );
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+
+ if refresh_time.as_secs() * 2 > expiration_time.as_secs() {
+ return Err(anyhow!(
"IGD expiration time (currently: {}s) must be at least twice bigger than refresh time \
(currently: {}s)",
expiration_time.as_secs(),
refresh_time.as_secs()
));
- }
+ }
- Ok(Self {
- private_ip,
- expiration_time,
- refresh_time,
- })
- }
+ Ok(Self {
+ private_ip,
+ expiration_time,
+ refresh_time,
+ })
+ }
}
impl RuntimeConfigStun {
- pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
- let mut stun_server_v4 = None;
- let mut stun_server_v6 = None;
- for addr in opts
- .stun_server
- .as_deref()
- .unwrap_or(super::STUN_SERVER)
- .to_socket_addrs()?
- {
- if addr.is_ipv4() {
- stun_server_v4 = Some(addr);
- }
- if addr.is_ipv6() {
- stun_server_v6 = Some(addr);
- }
+ pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
+ let mut stun_server_v4 = None;
+ let mut stun_server_v6 = None;
+ for addr in opts
+ .stun_server
+ .as_deref()
+ .unwrap_or(super::STUN_SERVER)
+ .to_socket_addrs()?
+ {
+ if addr.is_ipv4() {
+ stun_server_v4 = Some(addr);
+ }
+ if addr.is_ipv6() {
+ stun_server_v6 = Some(addr);
+ }
+ }
+
+ let refresh_time =
+ Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
+
+ Ok(Self {
+ stun_server_v4: stun_server_v4
+ .ok_or(anyhow!("Unable to resolve STUN server's IPv4 address"))?,
+ stun_server_v6: stun_server_v6
+ .ok_or(anyhow!("Unable to resolve STUN server's IPv6 address"))?,
+ refresh_time,
+ })
}
-
- let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
-
- Ok(Self {
- stun_server_v4: stun_server_v4
- .ok_or(anyhow!("Unable to resolve STUN server's IPv4 address"))?,
- stun_server_v6: stun_server_v6
- .ok_or(anyhow!("Unable to resolve STUN server's IPv6 address"))?,
- refresh_time,
- })
- }
}
diff --git a/src/consul.rs b/src/consul.rs
index e31033c..7f695b2 100644
--- a/src/consul.rs
+++ b/src/consul.rs
@@ -7,80 +7,80 @@ use crate::config::RuntimeConfigConsul;
#[derive(Serialize, Deserialize, Debug)]
pub struct ServiceEntry {
- #[serde(rename = "Tags")]
- pub tags: Vec<String>,
+ #[serde(rename = "Tags")]
+ pub tags: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct CatalogNode {
- #[serde(rename = "Services")]
- pub services: HashMap<String, ServiceEntry>,
+ #[serde(rename = "Services")]
+ pub services: HashMap<String, ServiceEntry>,
}
pub struct Consul {
- client: reqwest::Client,
- url: String,
- idx: Option<u64>,
+ client: reqwest::Client,
+ url: String,
+ idx: Option<u64>,
}
impl Consul {
- pub fn new(config: &RuntimeConfigConsul) -> Self {
- let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
- if skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else if let Some(ca) = ca {
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(ca)
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(ident)
- .build()
- .expect("Unable to build reqwest client")
- }
- } else {
- reqwest::Client::new()
- };
- return Self {
- client,
- url: config.url.clone(),
- idx: None,
- };
- }
+ pub fn new(config: &RuntimeConfigConsul) -> Self {
+ let client = if let Some((ca, skip_verify, ident)) = config.tls.clone() {
+ if skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else if let Some(ca) = ca {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(ca)
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(ident)
+ .build()
+ .expect("Unable to build reqwest client")
+ }
+ } else {
+ reqwest::Client::new()
+ };
+ return Self {
+ client,
+ url: config.url.clone(),
+ idx: None,
+ };
+ }
- pub fn watch_node_reset(&mut self) -> () {
- self.idx = None;
- }
+ pub fn watch_node_reset(&mut self) -> () {
+ self.idx = None;
+ }
- pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
- let url = match self.idx {
- Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
- None => format!("{}/v1/catalog/node/{}", self.url, host),
- };
+ pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
+ let url = match self.idx {
+ Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
+ None => format!("{}/v1/catalog/node/{}", self.url, host),
+ };
- let http = self.client.get(&url).send().await?;
- self.idx = match http.headers().get("X-Consul-Index") {
- Some(v) => Some(v.to_str()?.parse::<u64>()?),
- None => return Err(anyhow!("X-Consul-Index header not found")),
- };
+ let http = self.client.get(&url).send().await?;
+ self.idx = match http.headers().get("X-Consul-Index") {
+ Some(v) => Some(v.to_str()?.parse::<u64>()?),
+ None => return Err(anyhow!("X-Consul-Index header not found")),
+ };
- let resp: Option<CatalogNode> = http.json().await?;
- return Ok(resp.unwrap_or_default());
- }
+ let resp: Option<CatalogNode> = http.json().await?;
+ return Ok(resp.unwrap_or_default());
+ }
- pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> {
- let url = format!("{}/v1/kv/{}", self.url, key);
- let http = self.client.put(&url).body(bytes).send().await?;
- http.error_for_status()?;
- Ok(())
- }
+ pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> {
+ let url = format!("{}/v1/kv/{}", self.url, key);
+ let http = self.client.put(&url).body(bytes).send().await?;
+ http.error_for_status()?;
+ Ok(())
+ }
}
diff --git a/src/consul_actor.rs b/src/consul_actor.rs
index e4296e7..c099215 100644
--- a/src/consul_actor.rs
+++ b/src/consul_actor.rs
@@ -11,110 +11,110 @@ use crate::{consul, messages};
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
- #[serde(rename = "tcp_port")]
- TcpPort(HashSet<u16>),
- #[serde(rename = "udp_port")]
- UdpPort(HashSet<u16>),
+ #[serde(rename = "tcp_port")]
+ TcpPort(HashSet<u16>),
+ #[serde(rename = "udp_port")]
+ UdpPort(HashSet<u16>),
}
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatConsul {
- #[serde(rename = "diplonat")]
- Diplonat(Vec<DiplonatParameter>),
+ #[serde(rename = "diplonat")]
+ Diplonat(Vec<DiplonatParameter>),
}
pub struct ConsulActor {
- pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
+ pub rx_open_ports: watch::Receiver<messages::PublicExposedPorts>,
- consul: consul::Consul,
- node: String,
- retries: u32,
- tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
+ consul: consul::Consul,
+ node: String,
+ retries: u32,
+ tx_open_ports: watch::Sender<messages::PublicExposedPorts>,
}
fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
- // 1.2^x seems to be a good value to exponentially increase time at a good pace
- // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
- // minutes
- return Duration::from_secs(cmp::min(
- max_time.as_secs(),
- 1.2f64.powf(retries as f64) as u64,
- ));
+ // 1.2^x seems to be a good value to exponentially increase time at a good pace
+ // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
+ // minutes
+ return Duration::from_secs(cmp::min(
+ max_time.as_secs(),
+ 1.2f64.powf(retries as f64) as u64,
+ ));
}
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
- let mut r = Vec::new();
-
- for (_, service_info) in &catalog.services {
- for tag in &service_info.tags {
- let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
- match diplo_conf {
- Ok(conf) => r.push(conf),
- Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
- };
+ let mut r = Vec::new();
+
+ for (_, service_info) in &catalog.services {
+ for tag in &service_info.tags {
+ let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
+ match diplo_conf {
+ Ok(conf) => r.push(conf),
+ Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
+ };
+ }
}
- }
- return r;
+ return r;
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
- let mut op = messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
-
- for conf in params {
- let DiplonatConsul::Diplonat(c) = conf;
- for parameter in c {
- match parameter {
- DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
- DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
- };
+ let mut op = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+
+ for conf in params {
+ let DiplonatConsul::Diplonat(c) = conf;
+ for parameter in c {
+ match parameter {
+ DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
+ DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
+ };
+ }
}
- }
- return op;
+ return op;
}
impl ConsulActor {
- pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
- let (tx, rx) = watch::channel(messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- });
-
- return Self {
- consul: consul::Consul::new(config),
- rx_open_ports: rx,
- tx_open_ports: tx,
- node: node.to_string(),
- retries: 0,
- };
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- loop {
- let catalog = match self.consul.watch_node(&self.node).await {
- Ok(c) => c,
- Err(e) => {
- self.consul.watch_node_reset();
- self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
- let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
- error!(
- "Failed to query consul. Will retry in {}s. {}",
- will_retry_in.as_secs(),
- e
- );
- sleep(will_retry_in).await;
- continue;
- }
- };
- self.retries = 0;
- let msg = to_open_ports(&to_parameters(&catalog));
- debug!("Extracted configuration: {:#?}", msg);
+ pub fn new(config: &RuntimeConfigConsul, node: &str) -> Self {
+ let (tx, rx) = watch::channel(messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ });
+
+ return Self {
+ consul: consul::Consul::new(config),
+ rx_open_ports: rx,
+ tx_open_ports: tx,
+ node: node.to_string(),
+ retries: 0,
+ };
+ }
- self.tx_open_ports.send(msg)?;
+ pub async fn listen(&mut self) -> Result<()> {
+ loop {
+ let catalog = match self.consul.watch_node(&self.node).await {
+ Ok(c) => c,
+ Err(e) => {
+ self.consul.watch_node_reset();
+ self.retries = cmp::min(std::u32::MAX - 1, self.retries) + 1;
+ let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600));
+ error!(
+ "Failed to query consul. Will retry in {}s. {}",
+ will_retry_in.as_secs(),
+ e
+ );
+ sleep(will_retry_in).await;
+ continue;
+ }
+ };
+ self.retries = 0;
+ let msg = to_open_ports(&to_parameters(&catalog));
+ debug!("Extracted configuration: {:#?}", msg);
+
+ self.tx_open_ports.send(msg)?;
+ }
}
- }
}
diff --git a/src/diplonat.rs b/src/diplonat.rs
index 4252f43..604227c 100644
--- a/src/diplonat.rs
+++ b/src/diplonat.rs
@@ -2,54 +2,54 @@ use anyhow::Result;
use tokio::try_join;
use crate::{
- config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor,
- stun_actor::StunActor,
+ config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor,
+ stun_actor::StunActor,
};
pub struct Diplonat {
- consul: ConsulActor,
- firewall: FirewallActor,
- igd: IgdActor,
- stun: StunActor,
+ consul: ConsulActor,
+ firewall: FirewallActor,
+ igd: IgdActor,
+ stun: StunActor,
}
impl Diplonat {
- pub async fn new() -> Result<Self> {
- let rt_cfg = ConfigOpts::from_env()?;
- println!("{:#?}", rt_cfg);
-
- let ca = ConsulActor::new(&rt_cfg.consul, &rt_cfg.consul.node_name);
-
- let fw = FirewallActor::new(rt_cfg.firewall.refresh_time, &ca.rx_open_ports).await?;
-
- let ia = IgdActor::new(
- rt_cfg.igd.private_ip.as_ref().map(String::as_str),
- rt_cfg.igd.refresh_time,
- rt_cfg.igd.expiration_time,
- &ca.rx_open_ports,
- )
- .await?;
-
- let sa = StunActor::new(&rt_cfg.consul, &rt_cfg.stun, &rt_cfg.consul.node_name);
-
- let ctx = Self {
- consul: ca,
- igd: ia,
- firewall: fw,
- stun: sa,
- };
-
- Ok(ctx)
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- try_join!(
- self.consul.listen(),
- self.igd.listen(),
- self.firewall.listen(),
- self.stun.listen(),
- )?;
-
- Ok(())
- }
+ pub async fn new() -> Result<Self> {
+ let rt_cfg = ConfigOpts::from_env()?;
+ println!("{:#?}", rt_cfg);
+
+ let ca = ConsulActor::new(&rt_cfg.consul, &rt_cfg.consul.node_name);
+
+ let fw = FirewallActor::new(rt_cfg.firewall.refresh_time, &ca.rx_open_ports).await?;
+
+ let ia = IgdActor::new(
+ rt_cfg.igd.private_ip.as_ref().map(String::as_str),
+ rt_cfg.igd.refresh_time,
+ rt_cfg.igd.expiration_time,
+ &ca.rx_open_ports,
+ )
+ .await?;
+
+ let sa = StunActor::new(&rt_cfg.consul, &rt_cfg.stun, &rt_cfg.consul.node_name);
+
+ let ctx = Self {
+ consul: ca,
+ igd: ia,
+ firewall: fw,
+ stun: sa,
+ };
+
+ Ok(ctx)
+ }
+
+ pub async fn listen(&mut self) -> Result<()> {
+ try_join!(
+ self.consul.listen(),
+ self.igd.listen(),
+ self.firewall.listen(),
+ self.stun.listen(),
+ )?;
+
+ Ok(())
+ }
}
diff --git a/src/fw.rs b/src/fw.rs
index f416e6a..6745b3f 100644
--- a/src/fw.rs
+++ b/src/fw.rs
@@ -8,96 +8,89 @@ use regex::Regex;
use crate::messages;
pub fn setup(ipt: &iptables::IPTables) -> Result<()> {
- // ensure we start from a clean state without any rule already set
- cleanup(ipt)?;
-
- info!("{}: creating DIPLONAT chain using", ipt.cmd);
- ipt
- .new_chain("filter", "DIPLONAT")
- .context("Failed to create new chain")?;
- ipt
- .insert_unique("filter", "INPUT", "-j DIPLONAT", 1)
- .context("Failed to insert jump rule")?;
-
- Ok(())
+ // ensure we start from a clean state without any rule already set
+ cleanup(ipt)?;
+
+ info!("{}: creating DIPLONAT chain using", ipt.cmd);
+ ipt.new_chain("filter", "DIPLONAT")
+ .context("Failed to create new chain")?;
+ ipt.insert_unique("filter", "INPUT", "-j DIPLONAT", 1)
+ .context("Failed to insert jump rule")?;
+
+ Ok(())
}
pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<()> {
- for p in ports.tcp_ports {
- info!("{}: opening TCP port {}", ipt.cmd, p);
- ipt
- .append(
- "filter",
- "DIPLONAT",
- &format!("-p tcp --dport {} -j ACCEPT", p),
- )
- .context("Failed to insert port rule")?;
- }
-
- for p in ports.udp_ports {
- info!("{}: opening UDP port {}", ipt.cmd, p);
- ipt
- .append(
- "filter",
- "DIPLONAT",
- &format!("-p udp --dport {} -j ACCEPT", p),
- )
- .context("Failed to insert port rule")?;
- }
-
- Ok(())
+ for p in ports.tcp_ports {
+ info!("{}: opening TCP port {}", ipt.cmd, p);
+ ipt.append(
+ "filter",
+ "DIPLONAT",
+ &format!("-p tcp --dport {} -j ACCEPT", p),
+ )
+ .context("Failed to insert port rule")?;
+ }
+
+ for p in ports.udp_ports {
+ info!("{}: opening UDP port {}", ipt.cmd, p);
+ ipt.append(
+ "filter",
+ "DIPLONAT",
+ &format!("-p udp --dport {} -j ACCEPT", p),
+ )
+ .context("Failed to insert port rule")?;
+ }
+
+ Ok(())
}
pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExposedPorts> {
- let mut ports = messages::PublicExposedPorts {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
-
- let list = ipt.list("filter", "DIPLONAT")?;
- let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT")
- .context("Regex matching open ports encountered an unexpected rule")?;
- for i in list {
- let caps = re.captures(&i);
- match caps {
- Some(c) => {
- if let (Some(raw_proto), Some(raw_port)) = (c.get(1), c.get(2)) {
- let proto = String::from(raw_proto.as_str());
- let number = String::from(raw_port.as_str()).parse::<u16>()?;
-
- if proto == "tcp" {
- ports.tcp_ports.insert(number);
- } else {
- ports.udp_ports.insert(number);
- }
- } else {
- error!("Unexpected rule found in DIPLONAT chain")
+ let mut ports = messages::PublicExposedPorts {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+
+ let list = ipt.list("filter", "DIPLONAT")?;
+ let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT")
+ .context("Regex matching open ports encountered an unexpected rule")?;
+ for i in list {
+ let caps = re.captures(&i);
+ match caps {
+ Some(c) => {
+ if let (Some(raw_proto), Some(raw_port)) = (c.get(1), c.get(2)) {
+ let proto = String::from(raw_proto.as_str());
+ let number = String::from(raw_port.as_str()).parse::<u16>()?;
+
+ if proto == "tcp" {
+ ports.tcp_ports.insert(number);
+ } else {
+ ports.udp_ports.insert(number);
+ }
+ } else {
+ error!("Unexpected rule found in DIPLONAT chain")
+ }
+ }
+ _ => {}
}
- }
- _ => {}
}
- }
- Ok(ports)
+ Ok(ports)
}
pub fn cleanup(ipt: &iptables::IPTables) -> Result<()> {
- if ipt.chain_exists("filter", "DIPLONAT")? {
- info!("{}: removing old DIPLONAT chain", ipt.cmd);
- ipt
- .flush_chain("filter", "DIPLONAT")
- .context("Failed to flush the DIPLONAT chain")?;
-
- if ipt.exists("filter", "INPUT", "-j DIPLONAT")? {
- ipt
- .delete("filter", "INPUT", "-j DIPLONAT")
- .context("Failed to delete jump rule")?;
- }
+ if ipt.chain_exists("filter", "DIPLONAT")? {
+ info!("{}: removing old DIPLONAT chain", ipt.cmd);
+ ipt.flush_chain("filter", "DIPLONAT")
+ .context("Failed to flush the DIPLONAT chain")?;
+
+ if ipt.exists("filter", "INPUT", "-j DIPLONAT")? {
+ ipt.delete("filter", "INPUT", "-j DIPLONAT")
+ .context("Failed to delete jump rule")?;
+ }
- ipt
- .delete_chain("filter", "DIPLONAT")
- .context("Failed to delete chain")?;
- }
+ ipt.delete_chain("filter", "DIPLONAT")
+ .context("Failed to delete chain")?;
+ }
- Ok(())
+ Ok(())
}
diff --git a/src/fw_actor.rs b/src/fw_actor.rs
index 518c1b8..fe68381 100644
--- a/src/fw_actor.rs
+++ b/src/fw_actor.rs
@@ -4,88 +4,88 @@ use anyhow::Result;
use iptables;
use log::*;
use tokio::{
- select,
- sync::watch,
- time::{self, Duration},
+ select,
+ sync::watch,
+ time::{self, Duration},
};
use crate::{fw, messages};
pub struct FirewallActor {
- pub ipt_v4: iptables::IPTables,
- pub ipt_v6: iptables::IPTables,
- rx_ports: watch::Receiver<messages::PublicExposedPorts>,
- last_ports: messages::PublicExposedPorts,
- refresh: Duration,
+ pub ipt_v4: iptables::IPTables,
+ pub ipt_v6: iptables::IPTables,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ last_ports: messages::PublicExposedPorts,
+ refresh: Duration,
}
impl FirewallActor {
- pub async fn new(
- refresh: Duration,
- rxp: &watch::Receiver<messages::PublicExposedPorts>,
- ) -> Result<Self> {
- let ctx = Self {
- ipt_v4: iptables::new(false)?,
- ipt_v6: iptables::new(true)?,
- rx_ports: rxp.clone(),
- last_ports: messages::PublicExposedPorts::new(),
- refresh,
- };
+ pub async fn new(
+ refresh: Duration,
+ rxp: &watch::Receiver<messages::PublicExposedPorts>,
+ ) -> Result<Self> {
+ let ctx = Self {
+ ipt_v4: iptables::new(false)?,
+ ipt_v6: iptables::new(true)?,
+ rx_ports: rxp.clone(),
+ last_ports: messages::PublicExposedPorts::new(),
+ refresh,
+ };
- fw::setup(&ctx.ipt_v4)?;
- fw::setup(&ctx.ipt_v6)?;
+ fw::setup(&ctx.ipt_v4)?;
+ fw::setup(&ctx.ipt_v6)?;
- return Ok(ctx);
- }
+ return Ok(ctx);
+ }
- pub async fn listen(&mut self) -> Result<()> {
- let mut interval = time::interval(self.refresh);
- loop {
- // 1. Wait for an event
- let new_ports = select! {
- _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
- _ = interval.tick() => None,
- else => return Ok(()) // Sender dropped, terminate loop.
- };
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
- // 2. Update last ports if needed
- if let Some(p) = new_ports {
- self.last_ports = p;
- }
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports {
+ self.last_ports = p;
+ }
- // 3. Update firewall rules
- match self.do_fw_update().await {
- Ok(()) => debug!("Successfully updated firewall rules"),
- Err(e) => error!("An error occured while updating firewall rules. {}", e),
- }
+ // 3. Update firewall rules
+ match self.do_fw_update().await {
+ Ok(()) => debug!("Successfully updated firewall rules"),
+ Err(e) => error!("An error occured while updating firewall rules. {}", e),
+ }
+ }
}
- }
- pub async fn do_fw_update(&self) -> Result<()> {
- for ipt in [&self.ipt_v4, &self.ipt_v6] {
- let curr_opened_ports = fw::get_opened_ports(ipt)?;
+ pub async fn do_fw_update(&self) -> Result<()> {
+ for ipt in [&self.ipt_v4, &self.ipt_v6] {
+ let curr_opened_ports = fw::get_opened_ports(ipt)?;
- let diff_tcp = self
- .last_ports
- .tcp_ports
- .difference(&curr_opened_ports.tcp_ports)
- .copied()
- .collect::<HashSet<u16>>();
- let diff_udp = self
- .last_ports
- .udp_ports
- .difference(&curr_opened_ports.udp_ports)
- .copied()
- .collect::<HashSet<u16>>();
+ let diff_tcp = self
+ .last_ports
+ .tcp_ports
+ .difference(&curr_opened_ports.tcp_ports)
+ .copied()
+ .collect::<HashSet<u16>>();
+ let diff_udp = self
+ .last_ports
+ .udp_ports
+ .difference(&curr_opened_ports.udp_ports)
+ .copied()
+ .collect::<HashSet<u16>>();
- let ports_to_open = messages::PublicExposedPorts {
- tcp_ports: diff_tcp,
- udp_ports: diff_udp,
- };
+ let ports_to_open = messages::PublicExposedPorts {
+ tcp_ports: diff_tcp,
+ udp_ports: diff_udp,
+ };
- fw::open_ports(ipt, ports_to_open)?;
- }
+ fw::open_ports(ipt, ports_to_open)?;
+ }
- return Ok(());
- }
+ return Ok(());
+ }
}
diff --git a/src/igd_actor.rs b/src/igd_actor.rs
index 68f4217..a75395d 100644
--- a/src/igd_actor.rs
+++ b/src/igd_actor.rs
@@ -4,124 +4,123 @@ use anyhow::{Context, Result};
use igd::{aio::*, PortMappingProtocol};
use log::*;
use tokio::{
- select,
- sync::watch,
- time::{self, Duration},
+ select,
+ sync::watch,
+ time::{self, Duration},
};
use crate::messages;
pub struct IgdActor {
- last_ports: messages::PublicExposedPorts,
- rx_ports: watch::Receiver<messages::PublicExposedPorts>,
- gateway: Gateway,
- refresh: Duration,
- expire: Duration,
- private_ip: String,
+ last_ports: messages::PublicExposedPorts,
+ rx_ports: watch::Receiver<messages::PublicExposedPorts>,
+ gateway: Gateway,
+ refresh: Duration,
+ expire: Duration,
+ private_ip: String,
}
impl IgdActor {
- pub async fn new(
- priv_ip: Option<&str>,
- refresh: Duration,
- expire: Duration,
- rxp: &watch::Receiver<messages::PublicExposedPorts>,
- ) -> Result<Self> {
- let gw = search_gateway(Default::default())
- .await
- .context("Failed to find IGD gateway")?;
- info!("IGD gateway: {}", gw);
+ pub async fn new(
+ priv_ip: Option<&str>,
+ refresh: Duration,
+ expire: Duration,
+ rxp: &watch::Receiver<messages::PublicExposedPorts>,
+ ) -> Result<Self> {
+ let gw = search_gateway(Default::default())
+ .await
+ .context("Failed to find IGD gateway")?;
+ info!("IGD gateway: {}", gw);
- let private_ip = if let Some(ip) = priv_ip {
- info!("Using private IP from config: {}", ip);
- ip.to_string()
- } else {
- info!("Trying to automatically detect private IP");
- let gwa = gw.addr.ip().octets();
- let cmplen = match gwa {
- [192, 168, _, _] => 3,
- [10, _, _, _] => 2,
- _ => panic!(
- "Gateway IP does not appear to be in a local network ({})",
- gw.addr.ip()
- ),
- };
- #[allow(unused_parens)]
- let public_ip = get_if_addrs::get_if_addrs()?
- .into_iter()
- .map(|i| i.addr.ip())
- .filter(|a| match a {
- std::net::IpAddr::V4(a4) => (a4.octets()[..cmplen] == gwa[..cmplen]),
- _ => false,
- })
- .next()
- .expect("No interface has an IP on same subnet as gateway")
- .to_string();
- info!("Found private IP: {}", public_ip);
- public_ip
- };
+ let private_ip = if let Some(ip) = priv_ip {
+ info!("Using private IP from config: {}", ip);
+ ip.to_string()
+ } else {
+ info!("Trying to automatically detect private IP");
+ let gwa = gw.addr.ip().octets();
+ let cmplen = match gwa {
+ [192, 168, _, _] => 3,
+ [10, _, _, _] => 2,
+ _ => panic!(
+ "Gateway IP does not appear to be in a local network ({})",
+ gw.addr.ip()
+ ),
+ };
+ #[allow(unused_parens)]
+ let public_ip = get_if_addrs::get_if_addrs()?
+ .into_iter()
+ .map(|i| i.addr.ip())
+ .filter(|a| match a {
+ std::net::IpAddr::V4(a4) => (a4.octets()[..cmplen] == gwa[..cmplen]),
+ _ => false,
+ })
+ .next()
+ .expect("No interface has an IP on same subnet as gateway")
+ .to_string();
+ info!("Found private IP: {}", public_ip);
+ public_ip
+ };
- let ctx = Self {
- gateway: gw,
- rx_ports: rxp.clone(),
- private_ip,
- refresh: refresh,
- expire: expire,
- last_ports: messages::PublicExposedPorts::new(),
- };
+ let ctx = Self {
+ gateway: gw,
+ rx_ports: rxp.clone(),
+ private_ip,
+ refresh: refresh,
+ expire: expire,
+ last_ports: messages::PublicExposedPorts::new(),
+ };
- return Ok(ctx);
- }
+ return Ok(ctx);
+ }
- pub async fn listen(&mut self) -> Result<()> {
- let mut interval = time::interval(self.refresh);
- loop {
- // 1. Wait for an event
- let new_ports = select! {
- _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
- _ = interval.tick() => None,
- else => return Ok(()) // Sender dropped, terminate loop.
- };
+ pub async fn listen(&mut self) -> Result<()> {
+ let mut interval = time::interval(self.refresh);
+ loop {
+ // 1. Wait for an event
+ let new_ports = select! {
+ _ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
+ _ = interval.tick() => None,
+ else => return Ok(()) // Sender dropped, terminate loop.
+ };
- // 2. Update last ports if needed
- if let Some(p) = new_ports {
- self.last_ports = p;
- }
+ // 2. Update last ports if needed
+ if let Some(p) = new_ports {
+ self.last_ports = p;
+ }
- // 3. Flush IGD requests
- match self.do_igd().await {
- Ok(()) => debug!("Successfully updated IGD"),
- Err(e) => error!("An error occured while updating IGD. {}", e),
- }
+ // 3. Flush IGD requests
+ match self.do_igd().await {
+ Ok(()) => debug!("Successfully updated IGD"),
+ Err(e) => error!("An error occured while updating IGD. {}", e),
+ }
+ }
}
- }
- pub async fn do_igd(&self) -> Result<()> {
- let actions = [
- (PortMappingProtocol::TCP, &self.last_ports.tcp_ports),
- (PortMappingProtocol::UDP, &self.last_ports.udp_ports),
- ];
+ pub async fn do_igd(&self) -> Result<()> {
+ let actions = [
+ (PortMappingProtocol::TCP, &self.last_ports.tcp_ports),
+ (PortMappingProtocol::UDP, &self.last_ports.udp_ports),
+ ];
- for (proto, list) in actions.iter() {
- for port in *list {
- let service_str = format!("{}:{}", self.private_ip, port);
- let service = service_str
- .parse::<SocketAddrV4>()
- .context("Invalid socket address")?;
- self
- .gateway
- .add_port(
- *proto,
- *port,
- service,
- self.expire.as_secs() as u32,
- "diplonat",
- )
- .await?;
- debug!("IGD request successful for {:#?} {}", proto, service);
- }
- }
+ for (proto, list) in actions.iter() {
+ for port in *list {
+ let service_str = format!("{}:{}", self.private_ip, port);
+ let service = service_str
+ .parse::<SocketAddrV4>()
+ .context("Invalid socket address")?;
+ self.gateway
+ .add_port(
+ *proto,
+ *port,
+ service,
+ self.expire.as_secs() as u32,
+ "diplonat",
+ )
+ .await?;
+ debug!("IGD request successful for {:#?} {}", proto, service);
+ }
+ }
- return Ok(());
- }
+ return Ok(());
+ }
}
diff --git a/src/main.rs b/src/main.rs
index 4cd57c5..204520d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -13,13 +13,13 @@ use log::*;
#[tokio::main]
async fn main() {
- pretty_env_logger::init();
- info!("Starting Diplonat");
+ pretty_env_logger::init();
+ info!("Starting Diplonat");
- Diplonat::new()
- .await
- .expect("Setup failed")
- .listen()
- .await
- .expect("A runtime error occured");
+ Diplonat::new()
+ .await
+ .expect("Setup failed")
+ .listen()
+ .await
+ .expect("A runtime error occured");
}
diff --git a/src/messages.rs b/src/messages.rs
index 63f16b0..e5322f1 100644
--- a/src/messages.rs
+++ b/src/messages.rs
@@ -2,15 +2,15 @@ use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicExposedPorts {
- pub tcp_ports: HashSet<u16>,
- pub udp_ports: HashSet<u16>,
+ pub tcp_ports: HashSet<u16>,
+ pub udp_ports: HashSet<u16>,
}
impl PublicExposedPorts {
- pub fn new() -> Self {
- return Self {
- tcp_ports: HashSet::new(),
- udp_ports: HashSet::new(),
- };
- }
+ pub fn new() -> Self {
+ return Self {
+ tcp_ports: HashSet::new(),
+ udp_ports: HashSet::new(),
+ };
+ }
}
diff --git a/src/stun_actor.rs b/src/stun_actor.rs
index c1e10ef..684d3d8 100644
--- a/src/stun_actor.rs
+++ b/src/stun_actor.rs
@@ -9,96 +9,95 @@ use crate::config::{RuntimeConfigConsul, RuntimeConfigStun};
use crate::consul;
pub struct StunActor {
- node: String,
- consul: consul::Consul,
- stun_server_v4: SocketAddr,
- stun_server_v6: SocketAddr,
- refresh_time: Duration,
+ node: String,
+ consul: consul::Consul,
+ stun_server_v4: SocketAddr,
+ stun_server_v6: SocketAddr,
+ refresh_time: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct AutodiscoverResult {
- pub timestamp: u64,
- pub address: IpAddr,
+ pub timestamp: u64,
+ pub address: IpAddr,
}
impl StunActor {
- pub fn new(
- consul_config: &RuntimeConfigConsul,
- stun_config: &RuntimeConfigStun,
- node: &str,
- ) -> Self {
- assert!(stun_config.stun_server_v4.is_ipv4());
- assert!(stun_config.stun_server_v6.is_ipv6());
-
- Self {
- consul: consul::Consul::new(consul_config),
- node: node.to_string(),
- stun_server_v4: stun_config.stun_server_v4,
- stun_server_v6: stun_config.stun_server_v6,
- refresh_time: stun_config.refresh_time,
+ pub fn new(
+ consul_config: &RuntimeConfigConsul,
+ stun_config: &RuntimeConfigStun,
+ node: &str,
+ ) -> Self {
+ assert!(stun_config.stun_server_v4.is_ipv4());
+ assert!(stun_config.stun_server_v6.is_ipv6());
+
+ Self {
+ consul: consul::Consul::new(consul_config),
+ node: node.to_string(),
+ stun_server_v4: stun_config.stun_server_v4,
+ stun_server_v6: stun_config.stun_server_v6,
+ refresh_time: stun_config.refresh_time,
+ }
}
- }
-
- pub async fn listen(&mut self) -> Result<()> {
- loop {
- if let Err(e) = self.autodiscover_ip(self.stun_server_v4).await {
- error!("Unable to autodiscover IPv4 address: {}", e);
- }
- if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await {
- error!("Unable to autodiscover IPv6 address: {}", e);
- }
- tokio::time::sleep(self.refresh_time).await;
+
+ pub async fn listen(&mut self) -> Result<()> {
+ loop {
+ if let Err(e) = self.autodiscover_ip(self.stun_server_v4).await {
+ error!("Unable to autodiscover IPv4 address: {}", e);
+ }
+ if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await {
+ error!("Unable to autodiscover IPv6 address: {}", e);
+ }
+ tokio::time::sleep(self.refresh_time).await;
+ }
+ }
+
+ async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> {
+ let binding_addr = match stun_server.is_ipv4() {
+ true => "0.0.0.0:34791".parse().unwrap(),
+ false => "[::]:34792".parse().unwrap(),
+ };
+
+ let discovered_addr = get_mapped_addr(stun_server, binding_addr).await?.ip();
+
+ let consul_key = match stun_server.is_ipv4() {
+ true => {
+ debug!("Autodiscovered IPv4: {}", discovered_addr);
+ format!("diplonat/autodiscovery/ipv4/{}", self.node)
+ }
+ false => {
+ debug!("Autodiscovered IPv6: {}", discovered_addr);
+ format!("diplonat/autodiscovery/ipv6/{}", self.node)
+ }
+ };
+
+ self.consul
+ .kv_put(
+ &consul_key,
+ serde_json::to_vec(&AutodiscoverResult {
+ timestamp: SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs(),
+ address: discovered_addr,
+ })?,
+ )
+ .await?;
+
+ Ok(())
}
- }
-
- async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> {
- let binding_addr = match stun_server.is_ipv4() {
- true => "0.0.0.0:34791".parse().unwrap(),
- false => "[::]:34792".parse().unwrap(),
- };
-
- let discovered_addr = get_mapped_addr(stun_server, binding_addr).await?.ip();
-
- let consul_key = match stun_server.is_ipv4() {
- true => {
- debug!("Autodiscovered IPv4: {}", discovered_addr);
- format!("diplonat/autodiscovery/ipv4/{}", self.node)
- }
- false => {
- debug!("Autodiscovered IPv6: {}", discovered_addr);
- format!("diplonat/autodiscovery/ipv6/{}", self.node)
- }
- };
-
- self
- .consul
- .kv_put(
- &consul_key,
- serde_json::to_vec(&AutodiscoverResult {
- timestamp: SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)?
- .as_secs(),
- address: discovered_addr,
- })?,
- )
- .await?;
-
- Ok(())
- }
}
async fn get_mapped_addr(stun_server: SocketAddr, binding_addr: SocketAddr) -> Result<SocketAddr> {
- use stun_client::*;
+ use stun_client::*;
- let mut client = Client::new(binding_addr, None).await.unwrap();
- let res = client.binding_request(stun_server, None).await.unwrap();
+ let mut client = Client::new(binding_addr, None).await.unwrap();
+ let res = client.binding_request(stun_server, None).await.unwrap();
- if res.get_class() != Class::SuccessResponse {
- bail!("STUN server did not responde with a success response");
- }
+ if res.get_class() != Class::SuccessResponse {
+ bail!("STUN server did not responde with a success response");
+ }
- let xor_mapped_addr = Attribute::get_xor_mapped_address(&res)
- .ok_or(anyhow!("no XorMappedAddress found in STUN response"))?;
- Ok(xor_mapped_addr)
+ let xor_mapped_addr = Attribute::get_xor_mapped_address(&res)
+ .ok_or(anyhow!("no XorMappedAddress found in STUN response"))?;
+ Ok(xor_mapped_addr)
}