diff options
Diffstat (limited to 'src/peering/basalt.rs')
-rw-r--r-- | src/peering/basalt.rs | 40 |
1 files changed, 18 insertions, 22 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index be807a8..27461ab 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; use sodiumoxide::crypto::sign::ed25519; -use crate::conn::*; use crate::message::*; use crate::netapp::*; use crate::proto::*; @@ -282,7 +281,7 @@ impl Basalt { netapp.add_msg_handler::<PullMessage, _, _>( move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { let push_msg = basalt2.make_push_message(); - async move { Ok(push_msg) } + async move { push_msg } }, ); @@ -290,7 +289,7 @@ impl Basalt { netapp.add_msg_handler::<PushMessage, _, _>( move |_from: ed25519::PublicKey, push_msg: PushMessage| { basalt2.handle_peer_list(&push_msg.peers[..]); - async move { Ok(()) } + async move { () } }, ); @@ -323,25 +322,18 @@ impl Basalt { let peers = self.view.read().unwrap().sample(2); if peers.len() == 2 { - let (c1, c2) = { - let client_conns = self.netapp.client_conns.read().unwrap(); - ( - client_conns.get(&peers[0].id).cloned(), - client_conns.get(&peers[1].id).cloned(), - ) - }; - if let Some(c) = c1 { - tokio::spawn(self.clone().do_pull(c)); - } - if let Some(c) = c2 { - tokio::spawn(self.clone().do_push(c)); - } + tokio::spawn(self.clone().do_pull(peers[0].id)); + tokio::spawn(self.clone().do_push(peers[1].id)); } } } - async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) { - match peer.request(PullMessage {}, prio::NORMAL).await { + async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) { + match self + .netapp + .request(&peer, PullMessage {}, prio::NORMAL) + .await + { Ok(resp) => { self.handle_peer_list(&resp.peers[..]); } @@ -351,9 +343,9 @@ impl Basalt { }; } - async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) { + async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) { let push_msg = self.make_push_message(); - if let Err(e) = peer.request(push_msg, prio::NORMAL).await { + if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await { warn!("Error during push exchange: {}", e); } } @@ -427,7 +419,7 @@ impl Basalt { fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { if is_incoming { - self.handle_peer_list(&[Peer{id: pk, addr}][..]); + self.handle_peer_list(&[Peer { id: pk, addr }][..]); } else { let peer = Peer { id: pk, addr }; @@ -460,7 +452,11 @@ impl Basalt { for peer in prev_peers.iter() { if !new_peers.contains(peer) { if let Some(c) = client_conns.get(&peer.id) { - debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr); + debug!( + "Closing connection to {} ({})", + hex::encode(peer.id), + peer.addr + ); c.close(); } } |