From 8dede69dee20b812ad1dcab5b374c60232409f4f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Oct 2021 12:33:14 +0200 Subject: Fix netapp protocol & adapt basalt to new api --- examples/basalt.rs | 86 ++++++++++++++++++++++++++++++---------------------- examples/fullmesh.rs | 8 ++++- 2 files changed, 57 insertions(+), 37 deletions(-) (limited to 'examples') diff --git a/examples/basalt.rs b/examples/basalt.rs index 91f0982..7093e05 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -7,14 +7,15 @@ use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use async_trait::async_trait; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; -use netapp::message::*; +use netapp::endpoint::*; use netapp::peering::basalt::*; use netapp::proto::*; -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -50,6 +51,12 @@ pub struct Opt { reset_count: usize, } +struct Example { + netapp: Arc, + basalt: Arc, + example_endpoint: Arc>, +} + #[tokio::main] async fn main() { env_logger::Builder::new() @@ -104,46 +111,54 @@ async fn main() { reset_interval: Duration::from_secs(opt.reset_interval), reset_count: opt.reset_count, }; - let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); - - netapp.add_msg_handler::( - |_from: ed25519::PublicKey, msg: ExampleMessage| { - debug!("Got example message: {:?}, sending example response", msg); - async { - ExampleResponse { - example_field: false, - } - } - }, - ); + let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + let example = Arc::new(Example { + netapp: netapp.clone(), + basalt, + example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); let listen_addr = opt.listen_addr.parse().unwrap(); let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!( - sampling_loop(netapp.clone(), peering.clone()), - netapp.listen(listen_addr, public_addr), - peering.run(), + example.clone().sampling_loop(), + example.netapp.clone().listen(listen_addr, public_addr), + example.basalt.clone().run(), ); } -async fn sampling_loop(netapp: Arc, basalt: Arc) { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - - let peers = basalt.sample(10); - for p in peers { - debug!("kyev S {}", hex::encode(p)); - - let netapp2 = netapp.clone(); - tokio::spawn(async move { - match netapp2 - .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) - .await - { - Ok(resp) => debug!("Got example response: {:?}", resp), - Err(e) => warn!("Error with example request: {}", e), - } - }); +impl Example { + async fn sampling_loop(self: Arc) { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + let peers = self.basalt.sample(10); + for p in peers { + debug!("kyev S {}", hex::encode(p)); + + let self2 = self.clone(); + tokio::spawn(async move { + match self2 + .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .await + { + Ok(resp) => debug!("Got example response: {:?}", resp), + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl EndpointHandler for Example { + async fn handle(self: &Arc, msg: ExampleMessage, _from: NodeID) -> ExampleResponse { + debug!("Got example message: {:?}, sending example response", msg); + ExampleResponse { + example_field: false, } } } @@ -159,6 +174,5 @@ struct ExampleResponse { } impl Message for ExampleMessage { - const KIND: MessageKind = 0x99000001; type Response = ExampleResponse; } diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index acc0a7b..f40591a 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -66,7 +66,7 @@ async fn main() { info!("Node private key: {}", hex::encode(&privkey)); info!("Node public key: {}", hex::encode(&privkey.public_key())); - let netapp = NetApp::new(netid, privkey); + let netapp = NetApp::new(netid.clone(), privkey.clone()); let mut bootstrap_peers = vec![]; for peer in opt.bootstrap_peers.iter() { @@ -81,6 +81,12 @@ async fn main() { let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers); let listen_addr = opt.listen_addr.parse().unwrap(); + + info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", + hex::encode(&netid), + hex::encode(&privkey.public_key()), + listen_addr); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),); } -- cgit v1.2.3