From 8dede69dee20b812ad1dcab5b374c60232409f4f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Oct 2021 12:33:14 +0200 Subject: Fix netapp protocol & adapt basalt to new api --- src/peering/basalt.rs | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 3109e72..e0c8301 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -7,10 +7,11 @@ use log::{debug, info, trace, warn}; use lru::LruCache; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use sodiumoxide::crypto::hash; -use crate::message::*; +use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -21,7 +22,6 @@ use crate::NodeID; struct PullMessage {} impl Message for PullMessage { - const KIND: MessageKind = 0x42001100; type Response = PushMessage; } @@ -31,7 +31,6 @@ struct PushMessage { } impl Message for PushMessage { - const KIND: MessageKind = 0x42001101; type Response = (); } @@ -236,6 +235,8 @@ pub struct BasaltParams { pub struct Basalt { netapp: Arc, + pull_endpoint: Arc>, + push_endpoint: Arc>, param: BasaltParams, bootstrap_peers: Vec, @@ -264,6 +265,8 @@ impl Basalt { let basalt = Arc::new(Self { netapp: netapp.clone(), + pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()), + push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()), param, bootstrap_peers, view: RwLock::new(view), @@ -271,6 +274,9 @@ impl Basalt { backlog: RwLock::new(backlog), }); + basalt.pull_endpoint.set_handler(basalt.clone()); + basalt.push_endpoint.set_handler(basalt.clone()); + let basalt2 = basalt.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming); @@ -281,18 +287,6 @@ impl Basalt { basalt2.on_disconnected(id, is_incoming); }); - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move {} - }); - basalt } @@ -333,8 +327,8 @@ impl Basalt { async fn do_pull(self: Arc, peer: NodeID) { match self - .netapp - .request(&peer, PullMessage {}, PRIO_NORMAL) + .pull_endpoint + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -349,7 +343,7 @@ impl Basalt { async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); - match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,6 +463,20 @@ impl Basalt { } } +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { + self.make_push_message() + } +} + +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { + self.handle_peer_list(&pushmsg.peers[..]); + } +} + fn rand_seed() -> Seed { let mut seed = [0u8; 32]; sodiumoxide::randombytes::randombytes_into(&mut seed[..]); -- cgit v1.2.3