aboutsummaryrefslogtreecommitdiff
path: root/src/peering/basalt.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
commit46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (patch)
treef4456300e4ed12ffa6dd918236ad74d4c89b0249 /src/peering/basalt.rs
parent9ed776d16ad40a4d47900814b2b7f1ef1c02fa4e (diff)
downloadnetapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.tar.gz
netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.zip
Better handle requests to ourself
Diffstat (limited to 'src/peering/basalt.rs')
-rw-r--r--src/peering/basalt.rs40
1 files changed, 18 insertions, 22 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index be807a8..27461ab 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
use sodiumoxide::crypto::sign::ed25519;
-use crate::conn::*;
use crate::message::*;
use crate::netapp::*;
use crate::proto::*;
@@ -282,7 +281,7 @@ impl Basalt {
netapp.add_msg_handler::<PullMessage, _, _>(
move |_from: ed25519::PublicKey, _pullmsg: PullMessage| {
let push_msg = basalt2.make_push_message();
- async move { Ok(push_msg) }
+ async move { push_msg }
},
);
@@ -290,7 +289,7 @@ impl Basalt {
netapp.add_msg_handler::<PushMessage, _, _>(
move |_from: ed25519::PublicKey, push_msg: PushMessage| {
basalt2.handle_peer_list(&push_msg.peers[..]);
- async move { Ok(()) }
+ async move { () }
},
);
@@ -323,25 +322,18 @@ impl Basalt {
let peers = self.view.read().unwrap().sample(2);
if peers.len() == 2 {
- let (c1, c2) = {
- let client_conns = self.netapp.client_conns.read().unwrap();
- (
- client_conns.get(&peers[0].id).cloned(),
- client_conns.get(&peers[1].id).cloned(),
- )
- };
- if let Some(c) = c1 {
- tokio::spawn(self.clone().do_pull(c));
- }
- if let Some(c) = c2 {
- tokio::spawn(self.clone().do_push(c));
- }
+ tokio::spawn(self.clone().do_pull(peers[0].id));
+ tokio::spawn(self.clone().do_push(peers[1].id));
}
}
}
- async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) {
- match peer.request(PullMessage {}, prio::NORMAL).await {
+ async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
+ match self
+ .netapp
+ .request(&peer, PullMessage {}, prio::NORMAL)
+ .await
+ {
Ok(resp) => {
self.handle_peer_list(&resp.peers[..]);
}
@@ -351,9 +343,9 @@ impl Basalt {
};
}
- async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) {
+ async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
let push_msg = self.make_push_message();
- if let Err(e) = peer.request(push_msg, prio::NORMAL).await {
+ if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await {
warn!("Error during push exchange: {}", e);
}
}
@@ -427,7 +419,7 @@ impl Basalt {
fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) {
if is_incoming {
- self.handle_peer_list(&[Peer{id: pk, addr}][..]);
+ self.handle_peer_list(&[Peer { id: pk, addr }][..]);
} else {
let peer = Peer { id: pk, addr };
@@ -460,7 +452,11 @@ impl Basalt {
for peer in prev_peers.iter() {
if !new_peers.contains(peer) {
if let Some(c) = client_conns.get(&peer.id) {
- debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr);
+ debug!(
+ "Closing connection to {} ({})",
+ hex::encode(peer.id),
+ peer.addr
+ );
c.close();
}
}