From 8dede69dee20b812ad1dcab5b374c60232409f4f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Oct 2021 12:33:14 +0200 Subject: Fix netapp protocol & adapt basalt to new api --- Cargo.lock | 23 ++++++++++++++ Cargo.toml | 1 + Makefile | 6 ++-- examples/basalt.rs | 86 ++++++++++++++++++++++++++++++--------------------- examples/fullmesh.rs | 8 ++++- src/client.rs | 7 +++-- src/endpoint.rs | 1 + src/netapp.rs | 10 +++--- src/peering/basalt.rs | 44 +++++++++++++++----------- src/server.rs | 9 +++++- 10 files changed, 128 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d00d37f..bce6ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-trait" version = "0.1.51" @@ -302,12 +308,28 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hexdump" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e40283dadb02f3af778878be1d717b17b4e4ab92e1d935ab03a730b0542905f2" +dependencies = [ + "arrayvec", + "itertools", +] + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "itertools" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f" + [[package]] name = "kuska-handshake" version = "0.2.0" @@ -414,6 +436,7 @@ dependencies = [ "err-derive", "futures", "hex", + "hexdump", "kuska-handshake", "kuska-sodiumoxide", "log", diff --git a/Cargo.toml b/Cargo.toml index e007e5e..64e3401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } rmp-serde = "0.14.3" hex = "0.4.2" base64 = "0.12.1" +hexdump = "0.1.1" rand = { version = "0.5.5", optional = true } diff --git a/Makefile b/Makefile index 0ef8cef..0f680f3 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,7 @@ all: - cargo build + cargo build --all-features cargo build --example fullmesh - RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 + cargo build --all-features --example basalt + RUST_LOG=netapp=debug,fullmesh=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 + #RUST_LOG=netapp=debug,fullmesh=debug cargo run --example fullmesh diff --git a/examples/basalt.rs b/examples/basalt.rs index 91f0982..7093e05 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -7,14 +7,15 @@ use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use async_trait::async_trait; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; -use netapp::message::*; +use netapp::endpoint::*; use netapp::peering::basalt::*; use netapp::proto::*; -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -50,6 +51,12 @@ pub struct Opt { reset_count: usize, } +struct Example { + netapp: Arc, + basalt: Arc, + example_endpoint: Arc>, +} + #[tokio::main] async fn main() { env_logger::Builder::new() @@ -104,46 +111,54 @@ async fn main() { reset_interval: Duration::from_secs(opt.reset_interval), reset_count: opt.reset_count, }; - let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); - - netapp.add_msg_handler::( - |_from: ed25519::PublicKey, msg: ExampleMessage| { - debug!("Got example message: {:?}, sending example response", msg); - async { - ExampleResponse { - example_field: false, - } - } - }, - ); + let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + let example = Arc::new(Example { + netapp: netapp.clone(), + basalt, + example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); let listen_addr = opt.listen_addr.parse().unwrap(); let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!( - sampling_loop(netapp.clone(), peering.clone()), - netapp.listen(listen_addr, public_addr), - peering.run(), + example.clone().sampling_loop(), + example.netapp.clone().listen(listen_addr, public_addr), + example.basalt.clone().run(), ); } -async fn sampling_loop(netapp: Arc, basalt: Arc) { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - - let peers = basalt.sample(10); - for p in peers { - debug!("kyev S {}", hex::encode(p)); - - let netapp2 = netapp.clone(); - tokio::spawn(async move { - match netapp2 - .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) - .await - { - Ok(resp) => debug!("Got example response: {:?}", resp), - Err(e) => warn!("Error with example request: {}", e), - } - }); +impl Example { + async fn sampling_loop(self: Arc) { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + let peers = self.basalt.sample(10); + for p in peers { + debug!("kyev S {}", hex::encode(p)); + + let self2 = self.clone(); + tokio::spawn(async move { + match self2 + .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .await + { + Ok(resp) => debug!("Got example response: {:?}", resp), + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl EndpointHandler for Example { + async fn handle(self: &Arc, msg: ExampleMessage, _from: NodeID) -> ExampleResponse { + debug!("Got example message: {:?}, sending example response", msg); + ExampleResponse { + example_field: false, } } } @@ -159,6 +174,5 @@ struct ExampleResponse { } impl Message for ExampleMessage { - const KIND: MessageKind = 0x99000001; type Response = ExampleResponse; } diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index acc0a7b..f40591a 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -66,7 +66,7 @@ async fn main() { info!("Node private key: {}", hex::encode(&privkey)); info!("Node public key: {}", hex::encode(&privkey.public_key())); - let netapp = NetApp::new(netid, privkey); + let netapp = NetApp::new(netid.clone(), privkey.clone()); let mut bootstrap_peers = vec![]; for peer in opt.bootstrap_peers.iter() { @@ -81,6 +81,12 @@ async fn main() { let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers); let listen_addr = opt.listen_addr.parse().unwrap(); + + info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", + hex::encode(&netid), + hex::encode(&privkey.public_key()), + listen_addr); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),); } diff --git a/src/client.rs b/src/client.rs index a436d53..127ff46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -145,12 +145,13 @@ impl ClientConn { return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); } + trace!("request response {}: ", id); + let code = resp[0]; if code == 0 { - rmp_serde::decode::from_read_ref::<_, Result<::Response, String>>( + Ok(rmp_serde::decode::from_read_ref::<_, ::Response>( &resp[1..], - )? - .map_err(Error::Remote) + )?) } else { Err(Error::Remote(format!("Remote error code {}", code))) } diff --git a/src/endpoint.rs b/src/endpoint.rs index 0e1f5c8..83957e2 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -123,3 +123,4 @@ where Box::new(Self(self.0.clone())) } } + diff --git a/src/netapp.rs b/src/netapp.rs index afdd3c9..b6994ea 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; -use log::{debug, info}; +use log::{debug, info, error}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -34,10 +34,6 @@ type OnDisconnectHandler = Box; /// NetApp is the main class that handles incoming and outgoing connections. /// -/// The `request()` method can be used to send a message to any peer to which we have -/// an outgoing connection, or to ourself. On the server side, these messages are -/// processed by the handlers that have been defined using `add_msg_handler()`. -/// /// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// in order to manage information about the current peer list. @@ -151,7 +147,9 @@ impl NetApp { listen_addr, public_addr, }; - self.listen_params.store(Some(Arc::new(listen_params))); + if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() { + error!("Trying to listen on NetApp but we're already listening!"); + } let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 3109e72..e0c8301 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -7,10 +7,11 @@ use log::{debug, info, trace, warn}; use lru::LruCache; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use sodiumoxide::crypto::hash; -use crate::message::*; +use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -21,7 +22,6 @@ use crate::NodeID; struct PullMessage {} impl Message for PullMessage { - const KIND: MessageKind = 0x42001100; type Response = PushMessage; } @@ -31,7 +31,6 @@ struct PushMessage { } impl Message for PushMessage { - const KIND: MessageKind = 0x42001101; type Response = (); } @@ -236,6 +235,8 @@ pub struct BasaltParams { pub struct Basalt { netapp: Arc, + pull_endpoint: Arc>, + push_endpoint: Arc>, param: BasaltParams, bootstrap_peers: Vec, @@ -264,6 +265,8 @@ impl Basalt { let basalt = Arc::new(Self { netapp: netapp.clone(), + pull_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Pull".into()), + push_endpoint: netapp.endpoint("__netapp/peering/basalt.rs/Push".into()), param, bootstrap_peers, view: RwLock::new(view), @@ -271,6 +274,9 @@ impl Basalt { backlog: RwLock::new(backlog), }); + basalt.pull_endpoint.set_handler(basalt.clone()); + basalt.push_endpoint.set_handler(basalt.clone()); + let basalt2 = basalt.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming); @@ -281,18 +287,6 @@ impl Basalt { basalt2.on_disconnected(id, is_incoming); }); - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::(move |_from: NodeID, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move {} - }); - basalt } @@ -333,8 +327,8 @@ impl Basalt { async fn do_pull(self: Arc, peer: NodeID) { match self - .netapp - .request(&peer, PullMessage {}, PRIO_NORMAL) + .pull_endpoint + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -349,7 +343,7 @@ impl Basalt { async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); - match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,6 +463,20 @@ impl Basalt { } } +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { + self.make_push_message() + } +} + +#[async_trait] +impl EndpointHandler for Basalt { + async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { + self.handle_peer_list(&pushmsg.peers[..]); + } +} + fn rand_seed() -> Seed { let mut seed = [0u8; 32]; sodiumoxide::randombytes::randombytes_into(&mut seed[..]); diff --git a/src/server.rs b/src/server.rs index 73ae267..c7d99b5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -150,8 +150,12 @@ impl RecvLoop for ServerConn { trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); let bytes: Bytes = bytes.into(); + let prio = if !bytes.is_empty() { + bytes[0] + } else { + 0u8 + }; let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; let mut resp_bytes = vec![]; match resp { @@ -164,8 +168,11 @@ impl RecvLoop for ServerConn { } } + trace!("ServerConn sending response to {}: ", id); + self.resp_send .send(Some((id, prio, resp_bytes))) .log_err("ServerConn recv_handler send resp"); } } + -- cgit v1.2.3