diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-13 12:33:14 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-13 12:33:14 +0200 |
commit | 8dede69dee20b812ad1dcab5b374c60232409f4f (patch) | |
tree | 0b9ae54f27b4fb7bd6cfd09e1d78a70facf01a33 /src/peering | |
parent | d9bd1182f7b980df8e631ae8eeca444f5d997909 (diff) | |
download | netapp-8dede69dee20b812ad1dcab5b374c60232409f4f.tar.gz netapp-8dede69dee20b812ad1dcab5b374c60232409f4f.zip |
Fix netapp protocol & adapt basalt to new api
Diffstat (limited to 'src/peering')
-rw-r--r-- | src/peering/basalt.rs | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 3109e72..e0c8301 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -7,10 +7,11 @@ use log::{debug, info, trace, warn}; use lru::LruCache; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use sodiumoxide::crypto::hash; -use crate::message::*; +use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -21,7 +22,6 @@ use crate::NodeID; struct PullMessage {} impl Message for PullMessage { - const KIND: MessageKind = 0x42001100; type Response = PushMessage; } @@ -31,7 +31,6 @@ struct PushMessage { } impl Message for PushMessage { - const KIND: MessageKind = 0x42001101; type Response = (); } @@ -236,6 +235,8 @@ pub struct BasaltParams { pub struct Basalt { netapp: Arc<NetApp>, + pull_endpoint: Arc<Endpoint<PullMessage, Self>>, + push_endpoint: Arc<Endpoint<PushMessage, Self>>, param: BasaltParams, bootstrap_peers: Vec<Peer>, @@ -264,6 +265,8 @@ impl Basalt { let basalt = Arc::new(Self { netapp: netapp.clone(), + pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()), + push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()), param, bootstrap_peers, view: RwLock::new(view), @@ -271,6 +274,9 @@ impl Basalt { backlog: RwLock::new(backlog), }); + basalt.pull_endpoint.set_handler(basalt.clone()); + basalt.push_endpoint.set_handler(basalt.clone()); + let basalt2 = basalt.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming); @@ -281,18 +287,6 @@ impl Basalt { basalt2.on_disconnected(id, is_incoming); }); - let basalt2 = basalt.clone(); - netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move {} - }); - basalt } @@ -333,8 +327,8 @@ impl Basalt { async fn do_pull(self: Arc<Self>, peer: NodeID) { match self - .netapp - .request(&peer, PullMessage {}, PRIO_NORMAL) + .pull_endpoint + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -349,7 +343,7 @@ impl Basalt { async fn do_push(self: Arc<Self>, peer: NodeID) { let push_msg = self.make_push_message(); - match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,6 +463,20 @@ impl Basalt { } } +#[async_trait] +impl EndpointHandler<PullMessage> for Basalt { + async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { + self.make_push_message() + } +} + +#[async_trait] +impl EndpointHandler<PushMessage> for Basalt { + async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) { + self.handle_peer_list(&pushmsg.peers[..]); + } +} + fn rand_seed() -> Seed { let mut seed = [0u8; 32]; sodiumoxide::randombytes::randombytes_into(&mut seed[..]); |