aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-13 12:33:14 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-13 12:33:14 +0200
commit8dede69dee20b812ad1dcab5b374c60232409f4f (patch)
tree0b9ae54f27b4fb7bd6cfd09e1d78a70facf01a33 /src/peering
parentd9bd1182f7b980df8e631ae8eeca444f5d997909 (diff)
downloadnetapp-8dede69dee20b812ad1dcab5b374c60232409f4f.tar.gz
netapp-8dede69dee20b812ad1dcab5b374c60232409f4f.zip
Fix netapp protocol & adapt basalt to new api
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/basalt.rs44
1 files changed, 26 insertions, 18 deletions
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 3109e72..e0c8301 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -7,10 +7,11 @@ use log::{debug, info, trace, warn};
use lru::LruCache;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
+use async_trait::async_trait;
use sodiumoxide::crypto::hash;
-use crate::message::*;
+use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;
@@ -21,7 +22,6 @@ use crate::NodeID;
struct PullMessage {}
impl Message for PullMessage {
- const KIND: MessageKind = 0x42001100;
type Response = PushMessage;
}
@@ -31,7 +31,6 @@ struct PushMessage {
}
impl Message for PushMessage {
- const KIND: MessageKind = 0x42001101;
type Response = ();
}
@@ -236,6 +235,8 @@ pub struct BasaltParams {
pub struct Basalt {
netapp: Arc<NetApp>,
+ pull_endpoint: Arc<Endpoint<PullMessage, Self>>,
+ push_endpoint: Arc<Endpoint<PushMessage, Self>>,
param: BasaltParams,
bootstrap_peers: Vec<Peer>,
@@ -264,6 +265,8 @@ impl Basalt {
let basalt = Arc::new(Self {
netapp: netapp.clone(),
+ pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()),
+ push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()),
param,
bootstrap_peers,
view: RwLock::new(view),
@@ -271,6 +274,9 @@ impl Basalt {
backlog: RwLock::new(backlog),
});
+ basalt.pull_endpoint.set_handler(basalt.clone());
+ basalt.push_endpoint.set_handler(basalt.clone());
+
let basalt2 = basalt.clone();
netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
basalt2.on_connected(id, addr, is_incoming);
@@ -281,18 +287,6 @@ impl Basalt {
basalt2.on_disconnected(id, is_incoming);
});
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| {
- let push_msg = basalt2.make_push_message();
- async move { push_msg }
- });
-
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| {
- basalt2.handle_peer_list(&push_msg.peers[..]);
- async move {}
- });
-
basalt
}
@@ -333,8 +327,8 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self
- .netapp
- .request(&peer, PullMessage {}, PRIO_NORMAL)
+ .pull_endpoint
+ .call(&peer, PullMessage {}, PRIO_NORMAL)
.await
{
Ok(resp) => {
@@ -349,7 +343,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message();
- match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
+ match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => {
trace!("KYEV PEXo {}", hex::encode(peer));
}
@@ -469,6 +463,20 @@ impl Basalt {
}
}
+#[async_trait]
+impl EndpointHandler<PullMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
+ self.make_push_message()
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<PushMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
+ self.handle_peer_list(&pushmsg.peers[..]);
+ }
+}
+
fn rand_seed() -> Seed {
let mut seed = [0u8; 32];
sodiumoxide::randombytes::randombytes_into(&mut seed[..]);