aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/basalt.rs24
-rw-r--r--src/peering/fullmesh.rs12
2 files changed, 14 insertions, 22 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 27461ab..5da65e0 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -264,18 +264,18 @@ impl Basalt {
});
let basalt2 = basalt.clone();
- netapp.on_connected.store(Some(Arc::new(Box::new(
+ netapp.on_connected(
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
basalt2.on_connected(pk, addr, is_incoming);
- },
- ))));
+ }
+ );
let basalt2 = basalt.clone();
- netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ netapp.on_disconnected(
move |pk: ed25519::PublicKey, is_incoming: bool| {
basalt2.on_disconnected(pk, is_incoming);
},
- ))));
+ );
let basalt2 = basalt.clone();
netapp.add_msg_handler::<PullMessage, _, _>(
@@ -331,7 +331,7 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
match self
.netapp
- .request(&peer, PullMessage {}, prio::NORMAL)
+ .request(&peer, PullMessage {}, PRIO_NORMAL)
.await
{
Ok(resp) => {
@@ -345,7 +345,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
let push_msg = self.make_push_message();
- if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await {
+ if let Err(e) = self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
warn!("Error during push exchange: {}", e);
}
}
@@ -448,17 +448,9 @@ impl Basalt {
}
fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
- let client_conns = self.netapp.client_conns.read().unwrap();
for peer in prev_peers.iter() {
if !new_peers.contains(peer) {
- if let Some(c) = client_conns.get(&peer.id) {
- debug!(
- "Closing connection to {} ({})",
- hex::encode(peer.id),
- peer.addr
- );
- c.close();
- }
+ self.netapp.disconnect(&peer.id);
}
}
}
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 4e9a78d..1b26489 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -177,20 +177,20 @@ impl FullMeshPeeringStrategy {
);
let strat2 = strat.clone();
- netapp.on_connected.store(Some(Arc::new(Box::new(
+ netapp.on_connected(
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
let strat2 = strat2.clone();
tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
},
- ))));
+ );
let strat2 = strat.clone();
- netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ netapp.on_disconnected(
move |pk: ed25519::PublicKey, is_incoming: bool| {
let strat2 = strat2.clone();
tokio::spawn(strat2.on_disconnected(pk, is_incoming));
},
- ))));
+ );
strat
}
@@ -271,7 +271,7 @@ impl FullMeshPeeringStrategy {
hex::encode(id),
ping_time
);
- match self.netapp.request(&id, ping_msg, prio::HIGH).await {
+ match self.netapp.request(&id, ping_msg, PRIO_HIGH).await {
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
Ok(ping_resp) => {
let resp_time = Instant::now();
@@ -300,7 +300,7 @@ impl FullMeshPeeringStrategy {
async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
let pex_message = PeerListMessage { list: peer_list };
- match self.netapp.request(id, pex_message, prio::BACKGROUND).await {
+ match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await {
Err(e) => warn!("Error doing peer exchange: {}", e),
Ok(resp) => {
self.handle_peer_list(&resp.list[..]);