From 8dede69dee20b812ad1dcab5b374c60232409f4f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Oct 2021 12:33:14 +0200 Subject: Fix netapp protocol & adapt basalt to new api --- src/client.rs | 7 ++++--- src/endpoint.rs | 1 + src/netapp.rs | 10 ++++------ src/peering/basalt.rs | 44 ++++++++++++++++++++++++++------------------ src/server.rs | 9 ++++++++- 5 files changed, 43 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index a436d53..127ff46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -145,12 +145,13 @@ impl ClientConn { return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); } + trace!("request response {}: ", id); + let code = resp[0]; if code == 0 { - rmp_serde::decode::from_read_ref::<_, Result<::Response, String>>( + Ok(rmp_serde::decode::from_read_ref::<_, ::Response>( &resp[1..], - )? - .map_err(Error::Remote) + )?) } else { Err(Error::Remote(format!("Remote error code {}", code))) } diff --git a/src/endpoint.rs b/src/endpoint.rs index 0e1f5c8..83957e2 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -123,3 +123,4 @@ where Box::new(Self(self.0.clone())) } } + diff --git a/src/netapp.rs b/src/netapp.rs index afdd3c9..b6994ea 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; -use log::{debug, info}; +use log::{debug, info, error}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -34,10 +34,6 @@ type OnDisconnectHandler = Box; /// NetApp is the main class that handles incoming and outgoing connections. /// -/// The `request()` method can be used to send a message to any peer to which we have -/// an outgoing connection, or to ourself. On the server side, these messages are -/// processed by the handlers that have been defined using `add_msg_handler()`. -/// /// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// in order to manage information about the current peer list. @@ -151,7 +147,9 @@ impl NetApp { listen_addr, public_addr, }; - self.listen_params.store(Some(Arc::new(listen_params))); + if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() { + error!("Trying to listen on NetApp but we're already listening!"); + } let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 3109e72..e0c8301 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -7,10 +7,11 @@ use log::{debug, info, trace, warn}; use lru::LruCache; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use sodiumoxide::crypto::hash; -use crate::message::*; +use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -21,7 +22,6 @@ use crate::NodeID; struct PullMessage {} impl Message for PullMessage { - const KIND: MessageKind = 0x42001100; type Response = PushMessage; } @@ -31,7 +31,6 @@ struct PushMessage { } impl Message for PushMessage { - const KIND: MessageKind = 0x42001101; type Response = (); } @@ -236,6 +235,8 @@ pub struct BasaltParams { pub struct Basalt { netapp: Arc, + pull_endpoint: Arc>, + push_endpoint: Arc>, param: BasaltParams, bootstrap_peers: Vec, @@ -264,6 +265,8 @@ impl Basalt { let basalt = Arc::new(Self { netapp: netapp.clone(), + pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()), + push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()), param, bootstrap_peers, view: RwLock::new(view), @@ -271,6 +274,9 @@ impl Basalt { backlog: RwLock::new(backlog), }); + basalt.pull_endpoint.set_handler(basalt.clone()); + basalt.push_endpoint.set_handler(basalt.clone()); + let basalt2 = basalt.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming); @@ -281,18 +287,6 @@ impl Basalt { basalt2.on_disconnected(id, is_incoming); }); - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move {} - }); - basalt } @@ -333,8 +327,8 @@ impl Basalt { async fn do_pull(self: Arc, peer: NodeID) { match self - .netapp - .request(&peer, PullMessage {}, PRIO_NORMAL) + .pull_endpoint + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -349,7 +343,7 @@ impl Basalt { async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); - match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,6 +463,20 @@ impl Basalt { } } +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { + self.make_push_message() + } +} + +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { + self.handle_peer_list(&pushmsg.peers[..]); + } +} + fn rand_seed() -> Seed { let mut seed = [0u8; 32]; sodiumoxide::randombytes::randombytes_into(&mut seed[..]); diff --git a/src/server.rs b/src/server.rs index 73ae267..c7d99b5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -150,8 +150,12 @@ impl RecvLoop for ServerConn { trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); let bytes: Bytes = bytes.into(); + let prio = if !bytes.is_empty() { + bytes[0] + } else { + 0u8 + }; let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; let mut resp_bytes = vec![]; match resp { @@ -164,8 +168,11 @@ impl RecvLoop for ServerConn { } } + trace!("ServerConn sending response to {}: ", id); + self.resp_send .send(Some((id, prio, resp_bytes))) .log_err("ServerConn recv_handler send resp"); } } + -- cgit v1.2.3