aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-12 19:27:18 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-12 19:27:18 +0100
commit50401989724ed70f031055463e21caad76589742 (patch)
tree41dcb6481344f7cda3bd1fada9a76fe4bfea2c52
parent54c7c50bb57450f4fbb62eb5e6258bc34f37a99d (diff)
downloadnetapp-50401989724ed70f031055463e21caad76589742.tar.gz
netapp-50401989724ed70f031055463e21caad76589742.zip
Make it passive-client-friendly
-rw-r--r--examples/basalt.rs8
-rw-r--r--examples/fullmesh.rs8
-rw-r--r--src/netapp.rs56
-rw-r--r--src/peering/basalt.rs8
4 files changed, 50 insertions, 30 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs
index e1ac2e4..4ea4f71 100644
--- a/examples/basalt.rs
+++ b/examples/basalt.rs
@@ -85,9 +85,7 @@ async fn main() {
info!("KYEV SK {}", hex::encode(&privkey));
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
- let listen_addr = opt.listen_addr.parse().unwrap();
- let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
- let netapp = NetApp::new(listen_addr, public_addr, netid, privkey);
+ let netapp = NetApp::new(netid, privkey);
let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() {
@@ -119,9 +117,11 @@ async fn main() {
},
);
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(
sampling_loop(netapp.clone(), peering.clone()),
- netapp.listen(),
+ netapp.listen(listen_addr, public_addr),
peering.run(),
);
}
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
index 88784c1..eb954b7 100644
--- a/examples/fullmesh.rs
+++ b/examples/fullmesh.rs
@@ -65,9 +65,7 @@ async fn main() {
info!("Node private key: {}", hex::encode(&privkey));
info!("Node public key: {}", hex::encode(&privkey.public_key()));
- let listen_addr = opt.listen_addr.parse().unwrap();
- let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
- let netapp = NetApp::new(listen_addr, public_addr, netid, privkey);
+ let netapp = NetApp::new(netid, privkey);
let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() {
@@ -81,5 +79,7 @@ async fn main() {
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
- tokio::join!(netapp.listen(), peering.run(),);
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
+ tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),);
}
diff --git a/src/netapp.rs b/src/netapp.rs
index 8397be9..a34b0d6 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -47,8 +47,7 @@ pub(crate) struct Handler {
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp {
- pub listen_addr: SocketAddr,
- pub public_addr: Option<IpAddr>,
+ listen_params: ArcSwapOption<ListenParams>,
pub netid: auth::Key,
pub pubkey: ed25519::PublicKey,
@@ -63,6 +62,11 @@ pub struct NetApp {
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
}
+struct ListenParams {
+ listen_addr: SocketAddr,
+ public_addr: Option<IpAddr>,
+}
+
async fn net_handler_aux<M, F, R>(
handler: Arc<F>,
remote: ed25519::PublicKey,
@@ -110,17 +114,13 @@ where
}
impl NetApp {
- /// Creates a new instance of NetApp. No background process is
- pub fn new(
- listen_addr: SocketAddr,
- public_addr: Option<IpAddr>,
- netid: auth::Key,
- privkey: ed25519::SecretKey,
- ) -> Arc<Self> {
+ /// Creates a new instance of NetApp, which can serve either as a full p2p node,
+ /// or just as a passive client. To upgrade to a full p2p node, spawn a listener
+ /// using `.listen()`
+ pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
let pubkey = privkey.public_key();
let netapp = Arc::new(Self {
- listen_addr,
- public_addr,
+ listen_params: ArcSwapOption::new(None),
netid,
pubkey,
privkey,
@@ -200,9 +200,16 @@ impl NetApp {
/// Main listening process for our app. This future runs during the whole
/// run time of our application.
- pub async fn listen(self: Arc<Self>) {
- let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
- info!("Listening on {}", self.listen_addr);
+ /// If this is not called, the NetApp instance remains a passive client.
+ pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
+ let listen_params = ListenParams {
+ listen_addr,
+ public_addr,
+ };
+ self.listen_params.store(Some(Arc::new(listen_params)));
+
+ let mut listener = TcpListener::bind(listen_addr).await.unwrap();
+ info!("Listening on {}", listen_addr);
loop {
// The second item contains the IP and port of the new connection.
@@ -315,8 +322,7 @@ impl NetApp {
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
- let remote_ip = msg.server_addr
- .unwrap_or(c.remote_addr.ip());
+ let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip());
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
h(id, remote_addr, true);
}
@@ -363,13 +369,21 @@ impl NetApp {
h(conn.peer_pk, conn.remote_addr, false);
}
- let server_addr = self.public_addr;
- let server_port = self.listen_addr.port();
- tokio::spawn(async move {
- conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL)
+ if let Some(lp) = self.listen_params.load_full() {
+ let server_addr = lp.public_addr;
+ let server_port = lp.listen_addr.port();
+ tokio::spawn(async move {
+ conn.request(
+ HelloMessage {
+ server_addr,
+ server_port,
+ },
+ PRIO_NORMAL,
+ )
.await
.log_err("Sending hello message");
- });
+ });
+ }
}
// Called from conn.rs when an outgoinc connection is closed.
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 1898dba..4d56319 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -150,7 +150,13 @@ impl BasaltView {
for peer in peers.iter() {
let peer_cost = peer.cost(&self.slots[i].seed);
if self.slots[i].peer.is_none() || peer_cost < slot_cost {
- trace!("Best match for slot {}: {}@{} (cost {})", i, hex::encode(peer.id), peer.addr, hex::encode(peer_cost));
+ trace!(
+ "Best match for slot {}: {}@{} (cost {})",
+ i,
+ hex::encode(peer.id),
+ peer.addr,
+ hex::encode(peer_cost)
+ );
self.slots[i].peer = Some(*peer);
slot_cost = peer_cost;
}