diff options
Diffstat (limited to 'examples/basalt.rs')
-rw-r--r-- | examples/basalt.rs | 86 |
1 files changed, 50 insertions, 36 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs index 91f0982..7093e05 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -7,14 +7,15 @@ use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use async_trait::async_trait; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; -use netapp::message::*; +use netapp::endpoint::*; use netapp::peering::basalt::*; use netapp::proto::*; -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -50,6 +51,12 @@ pub struct Opt { reset_count: usize, } +struct Example { + netapp: Arc<NetApp>, + basalt: Arc<Basalt>, + example_endpoint: Arc<Endpoint<ExampleMessage, Self>>, +} + #[tokio::main] async fn main() { env_logger::Builder::new() @@ -104,46 +111,54 @@ async fn main() { reset_interval: Duration::from_secs(opt.reset_interval), reset_count: opt.reset_count, }; - let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); - - netapp.add_msg_handler::<ExampleMessage, _, _>( - |_from: ed25519::PublicKey, msg: ExampleMessage| { - debug!("Got example message: {:?}, sending example response", msg); - async { - ExampleResponse { - example_field: false, - } - } - }, - ); + let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + let example = Arc::new(Example { + netapp: netapp.clone(), + basalt, + example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); let listen_addr = opt.listen_addr.parse().unwrap(); let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); tokio::join!( - sampling_loop(netapp.clone(), peering.clone()), - netapp.listen(listen_addr, public_addr), - peering.run(), + example.clone().sampling_loop(), + example.netapp.clone().listen(listen_addr, public_addr), + example.basalt.clone().run(), ); } -async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - - let peers = basalt.sample(10); - for p in peers { - debug!("kyev S {}", hex::encode(p)); - - let netapp2 = netapp.clone(); - tokio::spawn(async move { - match netapp2 - .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) - .await - { - Ok(resp) => debug!("Got example response: {:?}", resp), - Err(e) => warn!("Error with example request: {}", e), - } - }); +impl Example { + async fn sampling_loop(self: Arc<Self>) { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + let peers = self.basalt.sample(10); + for p in peers { + debug!("kyev S {}", hex::encode(p)); + + let self2 = self.clone(); + tokio::spawn(async move { + match self2 + .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .await + { + Ok(resp) => debug!("Got example response: {:?}", resp), + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl EndpointHandler<ExampleMessage> for Example { + async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse { + debug!("Got example message: {:?}, sending example response", msg); + ExampleResponse { + example_field: false, } } } @@ -159,6 +174,5 @@ struct ExampleResponse { } impl Message for ExampleMessage { - const KIND: MessageKind = 0x99000001; type Response = ExampleResponse; } |