diff options
author | Alex Auvolat <alex@adnab.me> | 2023-04-04 18:46:14 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-04-04 18:46:14 +0200 |
commit | 615f926618471998f85ee184b378b1128340367b (patch) | |
tree | be9b5d5da3e844460a533bea9fefd452892a7f32 /src | |
parent | e64be9e8816b9bd5d3d787d1d5d57d460ae37569 (diff) | |
download | diplonat-615f926618471998f85ee184b378b1128340367b.tar.gz diplonat-615f926618471998f85ee184b378b1128340367b.zip |
Add STUN actor that saves autodiscovered IPv4/IPv6 to Consul
Diffstat (limited to 'src')
-rw-r--r-- | src/config/mod.rs | 3 | ||||
-rw-r--r-- | src/config/options.rs | 4 | ||||
-rw-r--r-- | src/config/options_test.rs | 4 | ||||
-rw-r--r-- | src/config/runtime.rs | 48 | ||||
-rw-r--r-- | src/consul.rs | 19 | ||||
-rw-r--r-- | src/consul_actor.rs | 19 | ||||
-rw-r--r-- | src/diplonat.rs | 16 | ||||
-rw-r--r-- | src/fw.rs | 4 | ||||
-rw-r--r-- | src/igd_actor.rs | 1 | ||||
-rw-r--r-- | src/main.rs | 9 | ||||
-rw-r--r-- | src/stun_actor.rs | 95 |
11 files changed, 195 insertions, 27 deletions
diff --git a/src/config/mod.rs b/src/config/mod.rs index 2bf8f66..a9cbd13 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -5,9 +5,10 @@ mod runtime; pub use options::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul}; pub use runtime::{ - RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd, + RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd, RuntimeConfigStun }; pub const EXPIRATION_TIME: u16 = 300; pub const REFRESH_TIME: u16 = 60; pub const CONSUL_URL: &str = "http://127.0.0.1:8500"; +pub const STUN_SERVER: &str = "stun.nextcloud.com:443"; diff --git a/src/config/options.rs b/src/config/options.rs index 793838a..08cdd15 100644 --- a/src/config/options.rs +++ b/src/config/options.rs @@ -17,6 +17,8 @@ pub struct ConfigOptsBase { pub expiration_time: Option<u16>, /// Refresh time for IGD and Firewall rules [default: 300] pub refresh_time: Option<u16>, + /// STUN server [default: stun.nextcloud.com:443] + pub stun_server: Option<String>, } /// ACME configuration options @@ -69,7 +71,7 @@ impl ConfigOpts { } // Currently only used in tests - #[allow(dead_code)] + #[cfg(test)] pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig> where Iter: IntoIterator<Item = (String, String)>, diff --git a/src/config/options_test.rs b/src/config/options_test.rs index 6b91235..427b70e 100644 --- a/src/config/options_test.rs +++ b/src/config/options_test.rs @@ -21,6 +21,10 @@ fn all_valid_options() -> HashMap<String, String> { let mut opts = minimal_valid_options(); opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string()); opts.insert( + "DIPLONAT_STUN_SERVER".to_string(), + "stun.nextcloud.com:443".to_string(), + ); + opts.insert( "DIPLONAT_PRIVATE_IP".to_string(), "172.123.43.555".to_string(), ); diff --git a/src/config/runtime.rs b/src/config/runtime.rs index 2e7b573..50624de 100644 --- a/src/config/runtime.rs +++ b/src/config/runtime.rs @@ -1,6 +1,7 @@ use std::fs::File; use std::io::Read; use std::time::Duration; +use std::net::{SocketAddr, ToSocketAddrs}; use anyhow::{anyhow, bail, Result}; @@ -36,25 +37,35 @@ pub struct RuntimeConfigIgd { } #[derive(Debug)] +pub struct RuntimeConfigStun { + pub stun_server_v4: SocketAddr, + pub stun_server_v6: SocketAddr, + pub refresh_time: Duration, +} + +#[derive(Debug)] pub struct RuntimeConfig { pub acme: Option<RuntimeConfigAcme>, pub consul: RuntimeConfigConsul, pub firewall: RuntimeConfigFirewall, pub igd: RuntimeConfigIgd, + pub stun: RuntimeConfigStun, } impl RuntimeConfig { pub fn new(opts: ConfigOpts) -> Result<Self> { - let acme = RuntimeConfigAcme::new(opts.acme.clone())?; - let consul = RuntimeConfigConsul::new(opts.consul.clone())?; - let firewall = RuntimeConfigFirewall::new(opts.base.clone())?; - let igd = RuntimeConfigIgd::new(opts.base.clone())?; + let acme = RuntimeConfigAcme::new(opts.acme)?; + let consul = RuntimeConfigConsul::new(opts.consul)?; + let firewall = RuntimeConfigFirewall::new(&opts.base)?; + let igd = RuntimeConfigIgd::new(&opts.base)?; + let stun = RuntimeConfigStun::new(&opts.base)?; Ok(Self { acme, consul, firewall, igd, + stun, }) } } @@ -115,7 +126,7 @@ impl RuntimeConfigConsul { } impl RuntimeConfigFirewall { - pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> { + pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> { let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into()); Ok(Self { refresh_time }) @@ -123,8 +134,8 @@ impl RuntimeConfigFirewall { } impl RuntimeConfigIgd { - pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> { - let private_ip = opts.private_ip; + pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> { + let private_ip = opts.private_ip.clone(); let expiration_time = Duration::from_secs( opts .expiration_time @@ -149,3 +160,26 @@ impl RuntimeConfigIgd { }) } } + +impl RuntimeConfigStun { + pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> { + let mut stun_server_v4 = None; + let mut stun_server_v6 = None; + for addr in opts.stun_server.as_deref().unwrap_or(super::STUN_SERVER).to_socket_addrs()? { + if addr.is_ipv4() { + stun_server_v4 = Some(addr); + } + if addr.is_ipv6() { + stun_server_v6 = Some(addr); + } + } + + let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into()); + + Ok(Self { + stun_server_v4: stun_server_v4.ok_or(anyhow!("Unable to resolve STUN server's IPv4 address"))?, + stun_server_v6: stun_server_v6.ok_or(anyhow!("Unable to resolve STUN server's IPv6 address"))?, + refresh_time, + }) + } +} diff --git a/src/consul.rs b/src/consul.rs index c7ac2b6..e31033c 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -7,12 +7,14 @@ use crate::config::RuntimeConfigConsul; #[derive(Serialize, Deserialize, Debug)] pub struct ServiceEntry { - pub Tags: Vec<String>, + #[serde(rename = "Tags")] + pub tags: Vec<String>, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct CatalogNode { - pub Services: HashMap<String, ServiceEntry>, + #[serde(rename = "Services")] + pub services: HashMap<String, ServiceEntry>, } pub struct Consul { @@ -71,7 +73,14 @@ impl Consul { None => return Err(anyhow!("X-Consul-Index header not found")), }; - let resp: CatalogNode = http.json().await?; - return Ok(resp); + let resp: Option<CatalogNode> = http.json().await?; + return Ok(resp.unwrap_or_default()); + } + + pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> { + let url = format!("{}/v1/kv/{}", self.url, key); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) } } diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 8b722b6..e4296e7 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -11,13 +11,16 @@ use crate::{consul, messages}; #[derive(Serialize, Deserialize, Debug)] pub enum DiplonatParameter { - tcp_port(HashSet<u16>), - udp_port(HashSet<u16>), + #[serde(rename = "tcp_port")] + TcpPort(HashSet<u16>), + #[serde(rename = "udp_port")] + UdpPort(HashSet<u16>), } #[derive(Serialize, Deserialize, Debug)] pub enum DiplonatConsul { - diplonat(Vec<DiplonatParameter>), + #[serde(rename = "diplonat")] + Diplonat(Vec<DiplonatParameter>), } pub struct ConsulActor { @@ -42,8 +45,8 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration { fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> { let mut r = Vec::new(); - for (_, service_info) in &catalog.Services { - for tag in &service_info.Tags { + for (_, service_info) in &catalog.services { + for tag in &service_info.tags { let diplo_conf: error::Result<DiplonatConsul> = from_str(tag); match diplo_conf { Ok(conf) => r.push(conf), @@ -62,11 +65,11 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts { }; for conf in params { - let DiplonatConsul::diplonat(c) = conf; + let DiplonatConsul::Diplonat(c) = conf; for parameter in c { match parameter { - DiplonatParameter::tcp_port(p) => op.tcp_ports.extend(p), - DiplonatParameter::udp_port(p) => op.udp_ports.extend(p), + DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p), + DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p), }; } } diff --git a/src/diplonat.rs b/src/diplonat.rs index 22ebd6e..6b282eb 100644 --- a/src/diplonat.rs +++ b/src/diplonat.rs @@ -3,12 +3,14 @@ use tokio::try_join; use crate::{ config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor, + stun_actor::StunActor, }; pub struct Diplonat { consul: ConsulActor, firewall: FirewallActor, igd: IgdActor, + stun: StunActor, } impl Diplonat { @@ -28,22 +30,30 @@ impl Diplonat { ) .await?; + let sa = StunActor::new( + &rt_cfg.consul, + &rt_cfg.stun, + &rt_cfg.consul.node_name, + ); + let ctx = Self { consul: ca, igd: ia, firewall: fw, + stun: sa, }; - return Ok(ctx); + Ok(ctx) } pub async fn listen(&mut self) -> Result<()> { try_join!( self.consul.listen(), self.igd.listen(), - self.firewall.listen() + self.firewall.listen(), + self.stun.listen(), )?; - return Ok(()); + Ok(()) } } @@ -11,6 +11,7 @@ pub fn setup(ipt: &iptables::IPTables) -> Result<()> { // ensure we start from a clean state without any rule already set cleanup(ipt)?; + info!("{}: creating DIPLONAT chain using", ipt.cmd); ipt .new_chain("filter", "DIPLONAT") .context("Failed to create new chain")?; @@ -23,6 +24,7 @@ pub fn setup(ipt: &iptables::IPTables) -> Result<()> { pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<()> { for p in ports.tcp_ports { + info!("{}: opening TCP port {}", ipt.cmd, p); ipt .append( "filter", @@ -33,6 +35,7 @@ pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) } for p in ports.udp_ports { + info!("{}: opening UDP port {}", ipt.cmd, p); ipt .append( "filter", @@ -80,6 +83,7 @@ pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExpo pub fn cleanup(ipt: &iptables::IPTables) -> Result<()> { if ipt.chain_exists("filter", "DIPLONAT")? { + info!("{}: removing old DIPLONAT chain", ipt.cmd); ipt .flush_chain("filter", "DIPLONAT") .context("Failed to flush the DIPLONAT chain")?; diff --git a/src/igd_actor.rs b/src/igd_actor.rs index d32f5dc..68f4217 100644 --- a/src/igd_actor.rs +++ b/src/igd_actor.rs @@ -46,6 +46,7 @@ impl IgdActor { gw.addr.ip() ), }; + #[allow(unused_parens)] let public_ip = get_if_addrs::get_if_addrs()? .into_iter() .map(|i| i.addr.ip()) diff --git a/src/main.rs b/src/main.rs index 99d38f5..4cd57c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod fw; mod fw_actor; mod igd_actor; mod messages; +mod stun_actor; use diplonat::Diplonat; use log::*; @@ -15,6 +16,10 @@ async fn main() { pretty_env_logger::init(); info!("Starting Diplonat"); - let mut diplo = Diplonat::new().await.expect("Setup failed"); - diplo.listen().await.expect("A runtime error occured"); + Diplonat::new() + .await + .expect("Setup failed") + .listen() + .await + .expect("A runtime error occured"); } diff --git a/src/stun_actor.rs b/src/stun_actor.rs new file mode 100644 index 0000000..dee60f0 --- /dev/null +++ b/src/stun_actor.rs @@ -0,0 +1,95 @@ +use std::net::{SocketAddr, IpAddr}; +use std::time::{Duration, SystemTime}; + +use anyhow::{Result, bail, anyhow}; +use log::*; +use serde::{Serialize, Deserialize}; + +use crate::config::{RuntimeConfigConsul, RuntimeConfigStun}; +use crate::consul; + +pub struct StunActor { + node: String, + consul: consul::Consul, + stun_server_v4: SocketAddr, + stun_server_v6: SocketAddr, + refresh_time: Duration, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AutodiscoverResult { + pub timestamp: u64, + pub address: IpAddr, +} + +impl StunActor { + pub fn new(consul_config: &RuntimeConfigConsul, stun_config: &RuntimeConfigStun, node: &str) -> Self { + assert!(stun_config.stun_server_v4.is_ipv4()); + assert!(stun_config.stun_server_v6.is_ipv6()); + + Self { + consul: consul::Consul::new(consul_config), + node: node.to_string(), + stun_server_v4: stun_config.stun_server_v4, + stun_server_v6: stun_config.stun_server_v6, + refresh_time: stun_config.refresh_time, + } + } + + pub async fn listen(&mut self) -> Result<()> { + loop { + if let Err(e) = self.autodiscover_ip(self.stun_server_v4).await { + error!("Unable to autodiscover IPv4 address: {}", e); + } + if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await { + error!("Unable to autodiscover IPv6 address: {}", e); + } + tokio::time::sleep(self.refresh_time).await; + } + } + + async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> { + let binding_addr = match stun_server.is_ipv4() { + true => "0.0.0.0:34791".parse().unwrap(), + false => "[::]:34792".parse().unwrap(), + }; + + let discovered_addr = get_mapped_addr(stun_server, binding_addr).await?.ip(); + + let consul_key = match stun_server.is_ipv4() { + true => { + debug!("Autodiscovered IPv4: {}", discovered_addr); + format!("diplonat/autodiscovery/ipv4/{}", self.node) + } + false => { + debug!("Autodiscovered IPv6: {}", discovered_addr); + format!("diplonat/autodiscovery/ipv6/{}", self.node) + } + }; + + self.consul.kv_put(&consul_key, serde_json::to_vec(&AutodiscoverResult{ + timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs(), + address: discovered_addr, + })?).await?; + + Ok(()) + } +} + +async fn get_mapped_addr(stun_server: SocketAddr, binding_addr: SocketAddr) -> Result<SocketAddr> { + use stun_client::*; + + let mut client = Client::new(binding_addr, None).await.unwrap(); + let res = client + .binding_request(stun_server, None) + .await + .unwrap(); + + if res.get_class() != Class::SuccessResponse { + bail!("STUN server did not responde with a success response"); + } + + let xor_mapped_addr = Attribute::get_xor_mapped_address(&res) + .ok_or(anyhow!("no XorMappedAddress found in STUN response"))?; + Ok(xor_mapped_addr) +} |