aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.rs7
-rw-r--r--src/endpoint.rs1
-rw-r--r--src/netapp.rs10
-rw-r--r--src/peering/basalt.rs44
-rw-r--r--src/server.rs9
5 files changed, 43 insertions, 28 deletions
diff --git a/src/client.rs b/src/client.rs
index a436d53..127ff46 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -145,12 +145,13 @@ impl ClientConn {
return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into()));
}
+ trace!("request response {}: ", id);
+
let code = resp[0];
if code == 0 {
- rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>(
+ Ok(rmp_serde::decode::from_read_ref::<_, <T as Message>::Response>(
&resp[1..],
- )?
- .map_err(Error::Remote)
+ )?)
} else {
Err(Error::Remote(format!("Remote error code {}", code)))
}
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 0e1f5c8..83957e2 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -123,3 +123,4 @@ where
Box::new(Self(self.0.clone()))
}
}
+
diff --git a/src/netapp.rs b/src/netapp.rs
index afdd3c9..b6994ea 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
-use log::{debug, info};
+use log::{debug, info, error};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -34,10 +34,6 @@ type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp is the main class that handles incoming and outgoing connections.
///
-/// The `request()` method can be used to send a message to any peer to which we have
-/// an outgoing connection, or to ourself. On the server side, these messages are
-/// processed by the handlers that have been defined using `add_msg_handler()`.
-///
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list.
@@ -151,7 +147,9 @@ impl NetApp {
listen_addr,
public_addr,
};
- self.listen_params.store(Some(Arc::new(listen_params)));
+ if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() {
+ error!("Trying to listen on NetApp but we're already listening!");
+ }
let listener = TcpListener::bind(listen_addr).await.unwrap();
info!("Listening on {}", listen_addr);
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 3109e72..e0c8301 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -7,10 +7,11 @@ use log::{debug, info, trace, warn};
use lru::LruCache;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
+use async_trait::async_trait;
use sodiumoxide::crypto::hash;
-use crate::message::*;
+use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;
@@ -21,7 +22,6 @@ use crate::NodeID;
struct PullMessage {}
impl Message for PullMessage {
- const KIND: MessageKind = 0x42001100;
type Response = PushMessage;
}
@@ -31,7 +31,6 @@ struct PushMessage {
}
impl Message for PushMessage {
- const KIND: MessageKind = 0x42001101;
type Response = ();
}
@@ -236,6 +235,8 @@ pub struct BasaltParams {
pub struct Basalt {
netapp: Arc<NetApp>,
+ pull_endpoint: Arc<Endpoint<PullMessage, Self>>,
+ push_endpoint: Arc<Endpoint<PushMessage, Self>>,
param: BasaltParams,
bootstrap_peers: Vec<Peer>,
@@ -264,6 +265,8 @@ impl Basalt {
let basalt = Arc::new(Self {
netapp: netapp.clone(),
+ pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()),
+ push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()),
param,
bootstrap_peers,
view: RwLock::new(view),
@@ -271,6 +274,9 @@ impl Basalt {
backlog: RwLock::new(backlog),
});
+ basalt.pull_endpoint.set_handler(basalt.clone());
+ basalt.push_endpoint.set_handler(basalt.clone());
+
let basalt2 = basalt.clone();
netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
basalt2.on_connected(id, addr, is_incoming);
@@ -281,18 +287,6 @@ impl Basalt {
basalt2.on_disconnected(id, is_incoming);
});
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| {
- let push_msg = basalt2.make_push_message();
- async move { push_msg }
- });
-
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| {
- basalt2.handle_peer_list(&push_msg.peers[..]);
- async move {}
- });
-
basalt
}
@@ -333,8 +327,8 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self
- .netapp
- .request(&peer, PullMessage {}, PRIO_NORMAL)
+ .pull_endpoint
+ .call(&peer, PullMessage {}, PRIO_NORMAL)
.await
{
Ok(resp) => {
@@ -349,7 +343,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message();
- match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
+ match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => {
trace!("KYEV PEXo {}", hex::encode(peer));
}
@@ -469,6 +463,20 @@ impl Basalt {
}
}
+#[async_trait]
+impl EndpointHandler<PullMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
+ self.make_push_message()
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<PushMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
+ self.handle_peer_list(&pushmsg.peers[..]);
+ }
+}
+
fn rand_seed() -> Seed {
let mut seed = [0u8; 32];
sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
diff --git a/src/server.rs b/src/server.rs
index 73ae267..c7d99b5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -150,8 +150,12 @@ impl RecvLoop for ServerConn {
trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
let bytes: Bytes = bytes.into();
+ let prio = if !bytes.is_empty() {
+ bytes[0]
+ } else {
+ 0u8
+ };
let resp = self.recv_handler_aux(&bytes[..]).await;
- let prio = bytes[0];
let mut resp_bytes = vec![];
match resp {
@@ -164,8 +168,11 @@ impl RecvLoop for ServerConn {
}
}
+ trace!("ServerConn sending response to {}: ", id);
+
self.resp_send
.send(Some((id, prio, resp_bytes)))
.log_err("ServerConn recv_handler send resp");
}
}
+