aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-03-09 13:31:31 +0100
committerAlex Auvolat <alex@adnab.me>2023-03-09 13:31:31 +0100
commitb7a1b4d4546aed80de38c54eb83ad940e21b0d84 (patch)
tree670c60c36e50ac8808ca09248111c2a5dc8ebd37 /src/main.rs
parent67280bd8613d96b6edd82c9c685ae29b4d27896a (diff)
downloadwgautomesh-b7a1b4d4546aed80de38c54eb83ad940e21b0d84.tar.gz
wgautomesh-b7a1b4d4546aed80de38c54eb83ad940e21b0d84.zip
automatic discovery of nodes on the same LAN (fix #2)
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs135
1 files changed, 98 insertions, 37 deletions
diff --git a/src/main.rs b/src/main.rs
index 6eb5171..de2e9c8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -21,6 +21,8 @@ const TIMEOUT: Duration = Duration::from_secs(300);
/// Interval at which to gossip last_seen info
const GOSSIP_INTERVAL: Duration = Duration::from_secs(300);
+const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60);
+
const IGD_INTERVAL: Duration = Duration::from_secs(60);
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
@@ -30,10 +32,16 @@ type Pubkey = String;
struct Config {
/// The Wireguard interface name
interface: Pubkey,
- /// Forward an external port to Wiregard using UPnP IGD
- upnp_forward_external_port: Option<u16>,
/// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes)
gossip_port: u16,
+
+ /// Enable LAN discovery
+ #[serde(default)]
+ lan_discovery: bool,
+
+ /// Forward an external port to Wiregard using UPnP IGD
+ upnp_forward_external_port: Option<u16>,
+
/// The list of peers we try to connect to
#[serde(default)]
peers: Vec<Peer>,
@@ -134,6 +142,7 @@ struct Daemon {
struct PeerInfo {
endpoint: Option<SocketAddr>,
+ lan_endpoint: Option<(SocketAddr, u64)>,
last_seen: u64,
gossip_ip: IpAddr,
gossip_prio: u64,
@@ -146,6 +155,10 @@ enum Gossip {
endpoints: Vec<(SocketAddr, u64)>,
},
Request,
+ LanBroadcast {
+ pubkey: Pubkey,
+ listen_port: u16,
+ }
}
impl Daemon {
@@ -180,6 +193,7 @@ impl Daemon {
thread::scope(|s| {
s.spawn(|| self.wg_loop());
s.spawn(|| self.recv_loop());
+ s.spawn(|| self.lan_broadcast_loop());
s.spawn(|| self.igd_loop());
});
unreachable!()
@@ -197,34 +211,10 @@ impl Daemon {
}
fn wg_loop_iter(&self, i: usize) -> Result<()> {
- let (_, _, wg_peers) = wg_dump(&self.config)?;
let mut state = self.state.lock().unwrap();
// 1. Update local peers info of peers
- for (pk, endpoint, last_seen) in wg_peers {
- match state.peers.get_mut(&pk) {
- Some(i) => {
- i.endpoint = endpoint;
- i.last_seen = last_seen;
- }
- None => {
- let gossip_ip = match self.config.peers.iter().find(|x| x.pubkey == pk) {
- Some(x) => x.address,
- None => continue,
- };
- let gossip_prio = fasthash(format!("{}-{}", self.our_pubkey, pk).as_bytes());
- state.peers.insert(
- pk,
- PeerInfo {
- endpoint,
- gossip_prio,
- gossip_ip,
- last_seen,
- },
- );
- }
- }
- }
+ state.read_wg_peers(self)?;
// 2. Send gossip for peers where there is a big update
let announces = state
@@ -247,7 +237,7 @@ impl Daemon {
}
// 3. Try new address for disconnected peers
- state.setup_wg_peers(&self, i)?;
+ state.setup_wg_peers(self, i)?;
Ok(())
}
@@ -277,6 +267,13 @@ impl Daemon {
self.socket.send_to(&packet, from)?;
}
}
+ Gossip::LanBroadcast{ pubkey, listen_port } => {
+ if self.config.lan_discovery {
+ if let Some(peer) = state.peers.get_mut(&pubkey) {
+ peer.lan_endpoint = Some((SocketAddr::new(from.ip(), listen_port), time()));
+ }
+ }
+ }
}
Ok(())
}
@@ -292,6 +289,27 @@ impl Daemon {
Ok((src, gossip))
}
+ fn lan_broadcast_loop(&self) {
+ if self.config.lan_discovery {
+ loop {
+ if let Err(e) = self.lan_broadcast_iter() {
+ error!("LAN broadcast loop error: {}", e);
+ }
+ std::thread::sleep(LAN_BROADCAST_INTERVAL);
+ }
+ }
+ }
+
+ fn lan_broadcast_iter(&self) -> Result<()> {
+ let packet = bincode::serialize(&Gossip::LanBroadcast {
+ pubkey: self.our_pubkey.clone(),
+ listen_port: self.listen_port,
+ })?;
+ let addr = SocketAddr::new("255.255.255.255".parse().unwrap(), self.config.gossip_port);
+ self.socket.send_to(&packet, addr)?;
+ Ok(())
+ }
+
fn igd_loop(&self) {
if let Some(external_port) = self.config.upnp_forward_external_port {
loop {
@@ -401,6 +419,7 @@ impl State {
}
}
None => {
+ endpoints.sort_by_key(|(_, t)| -(*t as i64));
endpoints.truncate(KEEP_MAX_ADDRESSES);
self.gossip.insert(pubkey.clone(), endpoints.clone());
Some(Gossip::Announce { pubkey, endpoints })
@@ -414,6 +433,37 @@ impl State {
Ok(())
}
+ fn read_wg_peers(&mut self, daemon: &Daemon) -> Result<()> {
+ let (_, _, wg_peers) = wg_dump(&daemon.config)?;
+ for (pk, endpoint, last_seen) in wg_peers {
+ match self.peers.get_mut(&pk) {
+ Some(i) => {
+ i.endpoint = endpoint;
+ i.last_seen = last_seen;
+ }
+ None => {
+ let gossip_ip = match daemon.config.peers.iter().find(|x| x.pubkey == pk) {
+ Some(x) => x.address,
+ None => continue,
+ };
+ let gossip_prio = fasthash(format!("{}-{}", daemon.our_pubkey, pk).as_bytes());
+ self.peers.insert(
+ pk,
+ PeerInfo {
+ endpoint,
+ lan_endpoint: None,
+ gossip_prio,
+ gossip_ip,
+ last_seen,
+ },
+ );
+ }
+ }
+ }
+
+ Ok(())
+ }
+
fn setup_wg_peers(&self, daemon: &Daemon, i: usize) -> Result<()> {
let now = time();
for peer in daemon.config.peers.iter() {
@@ -442,18 +492,28 @@ impl State {
}
// For disconnected peers, cycle through the IP addresses that we know of
- let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default();
- if let Some(endpoint) = &peer.endpoint {
- match endpoint.to_socket_addrs() {
- Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e),
- Ok(iter) => {
- for addr in iter {
- endpoints.push((addr, 0));
+ let lan_endpoint = self.peers.get(&peer.pubkey)
+ .and_then(|peer| peer.lan_endpoint)
+ .filter(|(_, t)| time() < t + TIMEOUT.as_secs());
+
+ let endpoints = match lan_endpoint {
+ Some(endpoint) => vec![endpoint],
+ None => {
+ let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default();
+ if let Some(endpoint) = &peer.endpoint {
+ match endpoint.to_socket_addrs() {
+ Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e),
+ Ok(iter) => {
+ for addr in iter {
+ endpoints.push((addr, 0));
+ }
+ }
}
}
+ endpoints.sort();
+ endpoints
}
- }
- endpoints.sort();
+ };
if !endpoints.is_empty() {
let endpoint = endpoints[i % endpoints.len()];
@@ -486,6 +546,7 @@ impl State {
.output()?;
}
}
+
Ok(())
}
}