diff options
author | Alex Auvolat <alex@adnab.me> | 2020-12-02 20:12:24 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-12-02 20:12:24 +0100 |
commit | 14d34e76f4007e50af89bd47f6ad36f45494c50a (patch) | |
tree | d1a79d397b26f9500917ef5e4eaa6ae70b2d1429 /src/peering | |
parent | 46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (diff) | |
download | netapp-14d34e76f4007e50af89bd47f6ad36f45494c50a.tar.gz netapp-14d34e76f4007e50af89bd47f6ad36f45494c50a.zip |
Documentate
Diffstat (limited to 'src/peering')
-rw-r--r-- | src/peering/basalt.rs | 24 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 12 |
2 files changed, 14 insertions, 22 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 27461ab..5da65e0 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -264,18 +264,18 @@ impl Basalt { }); let basalt2 = basalt.clone(); - netapp.on_connected.store(Some(Arc::new(Box::new( + netapp.on_connected( move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(pk, addr, is_incoming); - }, - )))); + } + ); let basalt2 = basalt.clone(); - netapp.on_disconnected.store(Some(Arc::new(Box::new( + netapp.on_disconnected( move |pk: ed25519::PublicKey, is_incoming: bool| { basalt2.on_disconnected(pk, is_incoming); }, - )))); + ); let basalt2 = basalt.clone(); netapp.add_msg_handler::<PullMessage, _, _>( @@ -331,7 +331,7 @@ impl Basalt { async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) { match self .netapp - .request(&peer, PullMessage {}, prio::NORMAL) + .request(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -345,7 +345,7 @@ impl Basalt { async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) { let push_msg = self.make_push_message(); - if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await { + if let Err(e) = self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { warn!("Error during push exchange: {}", e); } } @@ -448,17 +448,9 @@ impl Basalt { } fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) { - let client_conns = self.netapp.client_conns.read().unwrap(); for peer in prev_peers.iter() { if !new_peers.contains(peer) { - if let Some(c) = client_conns.get(&peer.id) { - debug!( - "Closing connection to {} ({})", - hex::encode(peer.id), - peer.addr - ); - c.close(); - } + self.netapp.disconnect(&peer.id); } } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 4e9a78d..1b26489 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -177,20 +177,20 @@ impl FullMeshPeeringStrategy { ); let strat2 = strat.clone(); - netapp.on_connected.store(Some(Arc::new(Box::new( + netapp.on_connected( move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); }, - )))); + ); let strat2 = strat.clone(); - netapp.on_disconnected.store(Some(Arc::new(Box::new( + netapp.on_disconnected( move |pk: ed25519::PublicKey, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_disconnected(pk, is_incoming)); }, - )))); + ); strat } @@ -271,7 +271,7 @@ impl FullMeshPeeringStrategy { hex::encode(id), ping_time ); - match self.netapp.request(&id, ping_msg, prio::HIGH).await { + match self.netapp.request(&id, ping_msg, PRIO_HIGH).await { Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), Ok(ping_resp) => { let resp_time = Instant::now(); @@ -300,7 +300,7 @@ impl FullMeshPeeringStrategy { async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; - match self.netapp.request(id, pex_message, prio::BACKGROUND).await { + match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); |