aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-13 12:33:14 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-13 12:33:14 +0200
commit8dede69dee20b812ad1dcab5b374c60232409f4f (patch)
tree0b9ae54f27b4fb7bd6cfd09e1d78a70facf01a33
parentd9bd1182f7b980df8e631ae8eeca444f5d997909 (diff)
downloadnetapp-8dede69dee20b812ad1dcab5b374c60232409f4f.tar.gz
netapp-8dede69dee20b812ad1dcab5b374c60232409f4f.zip
Fix netapp protocol & adapt basalt to new api
-rw-r--r--Cargo.lock23
-rw-r--r--Cargo.toml1
-rw-r--r--Makefile6
-rw-r--r--examples/basalt.rs86
-rw-r--r--examples/fullmesh.rs8
-rw-r--r--src/client.rs7
-rw-r--r--src/endpoint.rs1
-rw-r--r--src/netapp.rs10
-rw-r--r--src/peering/basalt.rs44
-rw-r--r--src/server.rs9
10 files changed, 128 insertions, 67 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d00d37f..bce6ea2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -29,6 +29,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63"
[[package]]
+name = "arrayvec"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
+
+[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -303,12 +309,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
+name = "hexdump"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e40283dadb02f3af778878be1d717b17b4e4ab92e1d935ab03a730b0542905f2"
+dependencies = [
+ "arrayvec",
+ "itertools",
+]
+
+[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
+name = "itertools"
+version = "0.4.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f"
+
+[[package]]
name = "kuska-handshake"
version = "0.2.0"
source = "git+https://github.com/Alexis211/handshake?branch=tokio1.0#a99e5a9c8591c41c99ce0bdfe18d596e3933bc4e"
@@ -414,6 +436,7 @@ dependencies = [
"err-derive",
"futures",
"hex",
+ "hexdump",
"kuska-handshake",
"kuska-sodiumoxide",
"log",
diff --git a/Cargo.toml b/Cargo.toml
index e007e5e..64e3401 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] }
rmp-serde = "0.14.3"
hex = "0.4.2"
base64 = "0.12.1"
+hexdump = "0.1.1"
rand = { version = "0.5.5", optional = true }
diff --git a/Makefile b/Makefile
index 0ef8cef..0f680f3 100644
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,7 @@
all:
- cargo build
+ cargo build --all-features
cargo build --example fullmesh
- RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7
+ cargo build --all-features --example basalt
+ RUST_LOG=netapp=debug,fullmesh=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7
+ #RUST_LOG=netapp=debug,fullmesh=debug cargo run --example fullmesh
diff --git a/examples/basalt.rs b/examples/basalt.rs
index 91f0982..7093e05 100644
--- a/examples/basalt.rs
+++ b/examples/basalt.rs
@@ -7,14 +7,15 @@ use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use async_trait::async_trait;
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
-use netapp::message::*;
+use netapp::endpoint::*;
use netapp::peering::basalt::*;
use netapp::proto::*;
-use netapp::NetApp;
+use netapp::{NetApp, NodeID};
#[derive(StructOpt, Debug)]
#[structopt(name = "netapp")]
@@ -50,6 +51,12 @@ pub struct Opt {
reset_count: usize,
}
+struct Example {
+ netapp: Arc<NetApp>,
+ basalt: Arc<Basalt>,
+ example_endpoint: Arc<Endpoint<ExampleMessage, Self>>,
+}
+
#[tokio::main]
async fn main() {
env_logger::Builder::new()
@@ -104,46 +111,54 @@ async fn main() {
reset_interval: Duration::from_secs(opt.reset_interval),
reset_count: opt.reset_count,
};
- let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
-
- netapp.add_msg_handler::<ExampleMessage, _, _>(
- |_from: ed25519::PublicKey, msg: ExampleMessage| {
- debug!("Got example message: {:?}, sending example response", msg);
- async {
- ExampleResponse {
- example_field: false,
- }
- }
- },
- );
+ let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ let example = Arc::new(Example {
+ netapp: netapp.clone(),
+ basalt,
+ example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()),
+ });
+ example.example_endpoint.set_handler(example.clone());
let listen_addr = opt.listen_addr.parse().unwrap();
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(
- sampling_loop(netapp.clone(), peering.clone()),
- netapp.listen(listen_addr, public_addr),
- peering.run(),
+ example.clone().sampling_loop(),
+ example.netapp.clone().listen(listen_addr, public_addr),
+ example.basalt.clone().run(),
);
}
-async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
- loop {
- tokio::time::sleep(Duration::from_secs(10)).await;
-
- let peers = basalt.sample(10);
- for p in peers {
- debug!("kyev S {}", hex::encode(p));
-
- let netapp2 = netapp.clone();
- tokio::spawn(async move {
- match netapp2
- .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
- .await
- {
- Ok(resp) => debug!("Got example response: {:?}", resp),
- Err(e) => warn!("Error with example request: {}", e),
- }
- });
+impl Example {
+ async fn sampling_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ let peers = self.basalt.sample(10);
+ for p in peers {
+ debug!("kyev S {}", hex::encode(p));
+
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ match self2
+ .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => debug!("Got example response: {:?}", resp),
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<ExampleMessage> for Example {
+ async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
+ debug!("Got example message: {:?}, sending example response", msg);
+ ExampleResponse {
+ example_field: false,
}
}
}
@@ -159,6 +174,5 @@ struct ExampleResponse {
}
impl Message for ExampleMessage {
- const KIND: MessageKind = 0x99000001;
type Response = ExampleResponse;
}
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
index acc0a7b..f40591a 100644
--- a/examples/fullmesh.rs
+++ b/examples/fullmesh.rs
@@ -66,7 +66,7 @@ async fn main() {
info!("Node private key: {}", hex::encode(&privkey));
info!("Node public key: {}", hex::encode(&privkey.public_key()));
- let netapp = NetApp::new(netid, privkey);
+ let netapp = NetApp::new(netid.clone(), privkey.clone());
let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() {
@@ -81,6 +81,12 @@ async fn main() {
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
let listen_addr = opt.listen_addr.parse().unwrap();
+
+ info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
+ hex::encode(&netid),
+ hex::encode(&privkey.public_key()),
+ listen_addr);
+
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),);
}
diff --git a/src/client.rs b/src/client.rs
index a436d53..127ff46 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -145,12 +145,13 @@ impl ClientConn {
return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into()));
}
+ trace!("request response {}: ", id);
+
let code = resp[0];
if code == 0 {
- rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>(
+ Ok(rmp_serde::decode::from_read_ref::<_, <T as Message>::Response>(
&resp[1..],
- )?
- .map_err(Error::Remote)
+ )?)
} else {
Err(Error::Remote(format!("Remote error code {}", code)))
}
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 0e1f5c8..83957e2 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -123,3 +123,4 @@ where
Box::new(Self(self.0.clone()))
}
}
+
diff --git a/src/netapp.rs b/src/netapp.rs
index afdd3c9..b6994ea 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
-use log::{debug, info};
+use log::{debug, info, error};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -34,10 +34,6 @@ type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp is the main class that handles incoming and outgoing connections.
///
-/// The `request()` method can be used to send a message to any peer to which we have
-/// an outgoing connection, or to ourself. On the server side, these messages are
-/// processed by the handlers that have been defined using `add_msg_handler()`.
-///
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list.
@@ -151,7 +147,9 @@ impl NetApp {
listen_addr,
public_addr,
};
- self.listen_params.store(Some(Arc::new(listen_params)));
+ if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() {
+ error!("Trying to listen on NetApp but we're already listening!");
+ }
let listener = TcpListener::bind(listen_addr).await.unwrap();
info!("Listening on {}", listen_addr);
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 3109e72..e0c8301 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -7,10 +7,11 @@ use log::{debug, info, trace, warn};
use lru::LruCache;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
+use async_trait::async_trait;
use sodiumoxide::crypto::hash;
-use crate::message::*;
+use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;
@@ -21,7 +22,6 @@ use crate::NodeID;
struct PullMessage {}
impl Message for PullMessage {
- const KIND: MessageKind = 0x42001100;
type Response = PushMessage;
}
@@ -31,7 +31,6 @@ struct PushMessage {
}
impl Message for PushMessage {
- const KIND: MessageKind = 0x42001101;
type Response = ();
}
@@ -236,6 +235,8 @@ pub struct BasaltParams {
pub struct Basalt {
netapp: Arc<NetApp>,
+ pull_endpoint: Arc<Endpoint<PullMessage, Self>>,
+ push_endpoint: Arc<Endpoint<PushMessage, Self>>,
param: BasaltParams,
bootstrap_peers: Vec<Peer>,
@@ -264,6 +265,8 @@ impl Basalt {
let basalt = Arc::new(Self {
netapp: netapp.clone(),
+ pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()),
+ push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()),
param,
bootstrap_peers,
view: RwLock::new(view),
@@ -271,6 +274,9 @@ impl Basalt {
backlog: RwLock::new(backlog),
});
+ basalt.pull_endpoint.set_handler(basalt.clone());
+ basalt.push_endpoint.set_handler(basalt.clone());
+
let basalt2 = basalt.clone();
netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
basalt2.on_connected(id, addr, is_incoming);
@@ -281,18 +287,6 @@ impl Basalt {
basalt2.on_disconnected(id, is_incoming);
});
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| {
- let push_msg = basalt2.make_push_message();
- async move { push_msg }
- });
-
- let basalt2 = basalt.clone();
- netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| {
- basalt2.handle_peer_list(&push_msg.peers[..]);
- async move {}
- });
-
basalt
}
@@ -333,8 +327,8 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self
- .netapp
- .request(&peer, PullMessage {}, PRIO_NORMAL)
+ .pull_endpoint
+ .call(&peer, PullMessage {}, PRIO_NORMAL)
.await
{
Ok(resp) => {
@@ -349,7 +343,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message();
- match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
+ match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => {
trace!("KYEV PEXo {}", hex::encode(peer));
}
@@ -469,6 +463,20 @@ impl Basalt {
}
}
+#[async_trait]
+impl EndpointHandler<PullMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
+ self.make_push_message()
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<PushMessage> for Basalt {
+ async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
+ self.handle_peer_list(&pushmsg.peers[..]);
+ }
+}
+
fn rand_seed() -> Seed {
let mut seed = [0u8; 32];
sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
diff --git a/src/server.rs b/src/server.rs
index 73ae267..c7d99b5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -150,8 +150,12 @@ impl RecvLoop for ServerConn {
trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
let bytes: Bytes = bytes.into();
+ let prio = if !bytes.is_empty() {
+ bytes[0]
+ } else {
+ 0u8
+ };
let resp = self.recv_handler_aux(&bytes[..]).await;
- let prio = bytes[0];
let mut resp_bytes = vec![];
match resp {
@@ -164,8 +168,11 @@ impl RecvLoop for ServerConn {
}
}
+ trace!("ServerConn sending response to {}: ", id);
+
self.resp_send
.send(Some((id, prio, resp_bytes)))
.log_err("ServerConn recv_handler send resp");
}
}
+