aboutsummaryrefslogtreecommitdiff
path: root/examples/basalt.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/basalt.rs')
-rw-r--r--examples/basalt.rs86
1 files changed, 50 insertions, 36 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs
index 91f0982..7093e05 100644
--- a/examples/basalt.rs
+++ b/examples/basalt.rs
@@ -7,14 +7,15 @@ use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use async_trait::async_trait;
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
-use netapp::message::*;
+use netapp::endpoint::*;
use netapp::peering::basalt::*;
use netapp::proto::*;
-use netapp::NetApp;
+use netapp::{NetApp, NodeID};
#[derive(StructOpt, Debug)]
#[structopt(name = "netapp")]
@@ -50,6 +51,12 @@ pub struct Opt {
reset_count: usize,
}
+struct Example {
+ netapp: Arc<NetApp>,
+ basalt: Arc<Basalt>,
+ example_endpoint: Arc<Endpoint<ExampleMessage, Self>>,
+}
+
#[tokio::main]
async fn main() {
env_logger::Builder::new()
@@ -104,46 +111,54 @@ async fn main() {
reset_interval: Duration::from_secs(opt.reset_interval),
reset_count: opt.reset_count,
};
- let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
-
- netapp.add_msg_handler::<ExampleMessage, _, _>(
- |_from: ed25519::PublicKey, msg: ExampleMessage| {
- debug!("Got example message: {:?}, sending example response", msg);
- async {
- ExampleResponse {
- example_field: false,
- }
- }
- },
- );
+ let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ let example = Arc::new(Example {
+ netapp: netapp.clone(),
+ basalt,
+ example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()),
+ });
+ example.example_endpoint.set_handler(example.clone());
let listen_addr = opt.listen_addr.parse().unwrap();
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(
- sampling_loop(netapp.clone(), peering.clone()),
- netapp.listen(listen_addr, public_addr),
- peering.run(),
+ example.clone().sampling_loop(),
+ example.netapp.clone().listen(listen_addr, public_addr),
+ example.basalt.clone().run(),
);
}
-async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
- loop {
- tokio::time::sleep(Duration::from_secs(10)).await;
-
- let peers = basalt.sample(10);
- for p in peers {
- debug!("kyev S {}", hex::encode(p));
-
- let netapp2 = netapp.clone();
- tokio::spawn(async move {
- match netapp2
- .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
- .await
- {
- Ok(resp) => debug!("Got example response: {:?}", resp),
- Err(e) => warn!("Error with example request: {}", e),
- }
- });
+impl Example {
+ async fn sampling_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ let peers = self.basalt.sample(10);
+ for p in peers {
+ debug!("kyev S {}", hex::encode(p));
+
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ match self2
+ .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => debug!("Got example response: {:?}", resp),
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<ExampleMessage> for Example {
+ async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
+ debug!("Got example message: {:?}, sending example response", msg);
+ ExampleResponse {
+ example_field: false,
}
}
}
@@ -159,6 +174,5 @@ struct ExampleResponse {
}
impl Message for ExampleMessage {
- const KIND: MessageKind = 0x99000001;
type Response = ExampleResponse;
}