aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
commit46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (patch)
treef4456300e4ed12ffa6dd918236ad74d4c89b0249 /src/peering
parent9ed776d16ad40a4d47900814b2b7f1ef1c02fa4e (diff)
downloadnetapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.tar.gz
netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.zip
Better handle requests to ourself
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/basalt.rs40
-rw-r--r--src/peering/fullmesh.rs27
2 files changed, 25 insertions, 42 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index be807a8..27461ab 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
use sodiumoxide::crypto::sign::ed25519;
-use crate::conn::*;
use crate::message::*;
use crate::netapp::*;
use crate::proto::*;
@@ -282,7 +281,7 @@ impl Basalt {
netapp.add_msg_handler::<PullMessage, _, _>(
move |_from: ed25519::PublicKey, _pullmsg: PullMessage| {
let push_msg = basalt2.make_push_message();
- async move { Ok(push_msg) }
+ async move { push_msg }
},
);
@@ -290,7 +289,7 @@ impl Basalt {
netapp.add_msg_handler::<PushMessage, _, _>(
move |_from: ed25519::PublicKey, push_msg: PushMessage| {
basalt2.handle_peer_list(&push_msg.peers[..]);
- async move { Ok(()) }
+ async move { () }
},
);
@@ -323,25 +322,18 @@ impl Basalt {
let peers = self.view.read().unwrap().sample(2);
if peers.len() == 2 {
- let (c1, c2) = {
- let client_conns = self.netapp.client_conns.read().unwrap();
- (
- client_conns.get(&peers[0].id).cloned(),
- client_conns.get(&peers[1].id).cloned(),
- )
- };
- if let Some(c) = c1 {
- tokio::spawn(self.clone().do_pull(c));
- }
- if let Some(c) = c2 {
- tokio::spawn(self.clone().do_push(c));
- }
+ tokio::spawn(self.clone().do_pull(peers[0].id));
+ tokio::spawn(self.clone().do_push(peers[1].id));
}
}
}
- async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) {
- match peer.request(PullMessage {}, prio::NORMAL).await {
+ async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
+ match self
+ .netapp
+ .request(&peer, PullMessage {}, prio::NORMAL)
+ .await
+ {
Ok(resp) => {
self.handle_peer_list(&resp.peers[..]);
}
@@ -351,9 +343,9 @@ impl Basalt {
};
}
- async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) {
+ async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
let push_msg = self.make_push_message();
- if let Err(e) = peer.request(push_msg, prio::NORMAL).await {
+ if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await {
warn!("Error during push exchange: {}", e);
}
}
@@ -427,7 +419,7 @@ impl Basalt {
fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) {
if is_incoming {
- self.handle_peer_list(&[Peer{id: pk, addr}][..]);
+ self.handle_peer_list(&[Peer { id: pk, addr }][..]);
} else {
let peer = Peer { id: pk, addr };
@@ -460,7 +452,11 @@ impl Basalt {
for peer in prev_peers.iter() {
if !new_peers.contains(peer) {
if let Some(c) = client_conns.get(&peer.id) {
- debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr);
+ debug!(
+ "Closing connection to {} ({})",
+ hex::encode(peer.id),
+ peer.addr
+ );
c.close();
}
}
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index e04beb6..4e9a78d 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
use sodiumoxide::crypto::sign::ed25519;
-use crate::conn::*;
use crate::message::*;
use crate::netapp::*;
use crate::proto::*;
@@ -162,10 +161,8 @@ impl FullMeshPeeringStrategy {
id: ping.id,
peer_list_hash: strat2.known_hosts.read().unwrap().hash,
};
- async move {
- debug!("Ping from {}", hex::encode(&from));
- Ok(ping_resp)
- }
+ debug!("Ping from {}", hex::encode(&from));
+ async move { ping_resp }
},
);
@@ -175,7 +172,7 @@ impl FullMeshPeeringStrategy {
strat2.handle_peer_list(&peer_list.list[..]);
let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
let resp = PeerListMessage { list: peer_list };
- async move { Ok(resp) }
+ async move { resp }
},
);
@@ -260,16 +257,6 @@ impl FullMeshPeeringStrategy {
}
async fn ping(self: Arc<Self>, id: ed25519::PublicKey) {
- let peer = {
- match self.netapp.client_conns.read().unwrap().get(&id) {
- None => {
- warn!("Should ping {}, but no connection", hex::encode(id));
- return;
- }
- Some(peer) => peer.clone(),
- }
- };
-
let peer_list_hash = self.known_hosts.read().unwrap().hash;
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
let ping_time = Instant::now();
@@ -284,7 +271,7 @@ impl FullMeshPeeringStrategy {
hex::encode(id),
ping_time
);
- match peer.clone().request(ping_msg, prio::HIGH).await {
+ match self.netapp.request(&id, ping_msg, prio::HIGH).await {
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
Ok(ping_resp) => {
let resp_time = Instant::now();
@@ -304,16 +291,16 @@ impl FullMeshPeeringStrategy {
}
}
if ping_resp.peer_list_hash != peer_list_hash {
- self.exchange_peers(peer).await;
+ self.exchange_peers(&id).await;
}
}
}
}
- async fn exchange_peers(self: Arc<Self>, peer: Arc<ClientConn>) {
+ async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
let pex_message = PeerListMessage { list: peer_list };
- match peer.request(pex_message, prio::BACKGROUND).await {
+ match self.netapp.request(id, pex_message, prio::BACKGROUND).await {
Err(e) => warn!("Error doing peer exchange: {}", e),
Ok(resp) => {
self.handle_peer_list(&resp.list[..]);