diff options
-rw-r--r-- | examples/basalt.rs | 8 | ||||
-rw-r--r-- | examples/fullmesh.rs | 8 | ||||
-rw-r--r-- | src/netapp.rs | 56 | ||||
-rw-r--r-- | src/peering/basalt.rs | 8 |
4 files changed, 50 insertions, 30 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs index e1ac2e4..4ea4f71 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -85,9 +85,7 @@ async fn main() { info!("KYEV SK {}", hex::encode(&privkey)); info!("KYEV PK {}", hex::encode(&privkey.public_key())); - let listen_addr = opt.listen_addr.parse().unwrap(); - let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); - let netapp = NetApp::new(listen_addr, public_addr, netid, privkey); + let netapp = NetApp::new(netid, privkey); let mut bootstrap_peers = vec![]; for peer in opt.bootstrap_peers.iter() { @@ -119,9 +117,11 @@ async fn main() { }, ); + let listen_addr = opt.listen_addr.parse().unwrap(); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!( sampling_loop(netapp.clone(), peering.clone()), - netapp.listen(), + netapp.listen(listen_addr, public_addr), peering.run(), ); } diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 88784c1..eb954b7 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -65,9 +65,7 @@ async fn main() { info!("Node private key: {}", hex::encode(&privkey)); info!("Node public key: {}", hex::encode(&privkey.public_key())); - let listen_addr = opt.listen_addr.parse().unwrap(); - let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); - let netapp = NetApp::new(listen_addr, public_addr, netid, privkey); + let netapp = NetApp::new(netid, privkey); let mut bootstrap_peers = vec![]; for peer in opt.bootstrap_peers.iter() { @@ -81,5 +79,7 @@ async fn main() { let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers); - tokio::join!(netapp.listen(), peering.run(),); + let listen_addr = opt.listen_addr.parse().unwrap(); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); + tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),); } diff --git a/src/netapp.rs b/src/netapp.rs index 8397be9..a34b0d6 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -47,8 +47,7 @@ pub(crate) struct Handler { /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. pub struct NetApp { - pub listen_addr: SocketAddr, - pub public_addr: Option<IpAddr>, + listen_params: ArcSwapOption<ListenParams>, pub netid: auth::Key, pub pubkey: ed25519::PublicKey, @@ -63,6 +62,11 @@ pub struct NetApp { on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>, } +struct ListenParams { + listen_addr: SocketAddr, + public_addr: Option<IpAddr>, +} + async fn net_handler_aux<M, F, R>( handler: Arc<F>, remote: ed25519::PublicKey, @@ -110,17 +114,13 @@ where } impl NetApp { - /// Creates a new instance of NetApp. No background process is - pub fn new( - listen_addr: SocketAddr, - public_addr: Option<IpAddr>, - netid: auth::Key, - privkey: ed25519::SecretKey, - ) -> Arc<Self> { + /// Creates a new instance of NetApp, which can serve either as a full p2p node, + /// or just as a passive client. To upgrade to a full p2p node, spawn a listener + /// using `.listen()` + pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> { let pubkey = privkey.public_key(); let netapp = Arc::new(Self { - listen_addr, - public_addr, + listen_params: ArcSwapOption::new(None), netid, pubkey, privkey, @@ -200,9 +200,16 @@ impl NetApp { /// Main listening process for our app. This future runs during the whole /// run time of our application. - pub async fn listen(self: Arc<Self>) { - let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); - info!("Listening on {}", self.listen_addr); + /// If this is not called, the NetApp instance remains a passive client. + pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) { + let listen_params = ListenParams { + listen_addr, + public_addr, + }; + self.listen_params.store(Some(Arc::new(listen_params))); + + let mut listener = TcpListener::bind(listen_addr).await.unwrap(); + info!("Listening on {}", listen_addr); loop { // The second item contains the IP and port of the new connection. @@ -315,8 +322,7 @@ impl NetApp { fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { - let remote_ip = msg.server_addr - .unwrap_or(c.remote_addr.ip()); + let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(id, remote_addr, true); } @@ -363,13 +369,21 @@ impl NetApp { h(conn.peer_pk, conn.remote_addr, false); } - let server_addr = self.public_addr; - let server_port = self.listen_addr.port(); - tokio::spawn(async move { - conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL) + if let Some(lp) = self.listen_params.load_full() { + let server_addr = lp.public_addr; + let server_port = lp.listen_addr.port(); + tokio::spawn(async move { + conn.request( + HelloMessage { + server_addr, + server_port, + }, + PRIO_NORMAL, + ) .await .log_err("Sending hello message"); - }); + }); + } } // Called from conn.rs when an outgoinc connection is closed. diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 1898dba..4d56319 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -150,7 +150,13 @@ impl BasaltView { for peer in peers.iter() { let peer_cost = peer.cost(&self.slots[i].seed); if self.slots[i].peer.is_none() || peer_cost < slot_cost { - trace!("Best match for slot {}: {}@{} (cost {})", i, hex::encode(peer.id), peer.addr, hex::encode(peer_cost)); + trace!( + "Best match for slot {}: {}@{} (cost {})", + i, + hex::encode(peer.id), + peer.addr, + hex::encode(peer_cost) + ); self.slots[i].peer = Some(*peer); slot_cost = peer_cost; } |