diff options
author | Alex Auvolat <alex@adnab.me> | 2020-12-02 18:10:07 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-12-02 18:10:07 +0100 |
commit | 46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (patch) | |
tree | f4456300e4ed12ffa6dd918236ad74d4c89b0249 /src/peering | |
parent | 9ed776d16ad40a4d47900814b2b7f1ef1c02fa4e (diff) | |
download | netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.tar.gz netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.zip |
Better handle requests to ourself
Diffstat (limited to 'src/peering')
-rw-r--r-- | src/peering/basalt.rs | 40 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 27 |
2 files changed, 25 insertions, 42 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index be807a8..27461ab 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; use sodiumoxide::crypto::sign::ed25519; -use crate::conn::*; use crate::message::*; use crate::netapp::*; use crate::proto::*; @@ -282,7 +281,7 @@ impl Basalt { netapp.add_msg_handler::<PullMessage, _, _>( move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { let push_msg = basalt2.make_push_message(); - async move { Ok(push_msg) } + async move { push_msg } }, ); @@ -290,7 +289,7 @@ impl Basalt { netapp.add_msg_handler::<PushMessage, _, _>( move |_from: ed25519::PublicKey, push_msg: PushMessage| { basalt2.handle_peer_list(&push_msg.peers[..]); - async move { Ok(()) } + async move { () } }, ); @@ -323,25 +322,18 @@ impl Basalt { let peers = self.view.read().unwrap().sample(2); if peers.len() == 2 { - let (c1, c2) = { - let client_conns = self.netapp.client_conns.read().unwrap(); - ( - client_conns.get(&peers[0].id).cloned(), - client_conns.get(&peers[1].id).cloned(), - ) - }; - if let Some(c) = c1 { - tokio::spawn(self.clone().do_pull(c)); - } - if let Some(c) = c2 { - tokio::spawn(self.clone().do_push(c)); - } + tokio::spawn(self.clone().do_pull(peers[0].id)); + tokio::spawn(self.clone().do_push(peers[1].id)); } } } - async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) { - match peer.request(PullMessage {}, prio::NORMAL).await { + async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) { + match self + .netapp + .request(&peer, PullMessage {}, prio::NORMAL) + .await + { Ok(resp) => { self.handle_peer_list(&resp.peers[..]); } @@ -351,9 +343,9 @@ impl Basalt { }; } - async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) { + async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) { let push_msg = self.make_push_message(); - if let Err(e) = peer.request(push_msg, prio::NORMAL).await { + if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await { warn!("Error during push exchange: {}", e); } } @@ -427,7 +419,7 @@ impl Basalt { fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { if is_incoming { - self.handle_peer_list(&[Peer{id: pk, addr}][..]); + self.handle_peer_list(&[Peer { id: pk, addr }][..]); } else { let peer = Peer { id: pk, addr }; @@ -460,7 +452,11 @@ impl Basalt { for peer in prev_peers.iter() { if !new_peers.contains(peer) { if let Some(c) = client_conns.get(&peer.id) { - debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr); + debug!( + "Closing connection to {} ({})", + hex::encode(peer.id), + peer.addr + ); c.close(); } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index e04beb6..4e9a78d 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; use sodiumoxide::crypto::sign::ed25519; -use crate::conn::*; use crate::message::*; use crate::netapp::*; use crate::proto::*; @@ -162,10 +161,8 @@ impl FullMeshPeeringStrategy { id: ping.id, peer_list_hash: strat2.known_hosts.read().unwrap().hash, }; - async move { - debug!("Ping from {}", hex::encode(&from)); - Ok(ping_resp) - } + debug!("Ping from {}", hex::encode(&from)); + async move { ping_resp } }, ); @@ -175,7 +172,7 @@ impl FullMeshPeeringStrategy { strat2.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); let resp = PeerListMessage { list: peer_list }; - async move { Ok(resp) } + async move { resp } }, ); @@ -260,16 +257,6 @@ impl FullMeshPeeringStrategy { } async fn ping(self: Arc<Self>, id: ed25519::PublicKey) { - let peer = { - match self.netapp.client_conns.read().unwrap().get(&id) { - None => { - warn!("Should ping {}, but no connection", hex::encode(id)); - return; - } - Some(peer) => peer.clone(), - } - }; - let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_time = Instant::now(); @@ -284,7 +271,7 @@ impl FullMeshPeeringStrategy { hex::encode(id), ping_time ); - match peer.clone().request(ping_msg, prio::HIGH).await { + match self.netapp.request(&id, ping_msg, prio::HIGH).await { Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), Ok(ping_resp) => { let resp_time = Instant::now(); @@ -304,16 +291,16 @@ impl FullMeshPeeringStrategy { } } if ping_resp.peer_list_hash != peer_list_hash { - self.exchange_peers(peer).await; + self.exchange_peers(&id).await; } } } } - async fn exchange_peers(self: Arc<Self>, peer: Arc<ClientConn>) { + async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; - match peer.request(pex_message, prio::BACKGROUND).await { + match self.netapp.request(id, pex_message, prio::BACKGROUND).await { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); |