aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/basalt.rs86
-rw-r--r--examples/fullmesh.rs8
2 files changed, 57 insertions, 37 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs
index 91f0982..7093e05 100644
--- a/examples/basalt.rs
+++ b/examples/basalt.rs
@@ -7,14 +7,15 @@ use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use async_trait::async_trait;
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
-use netapp::message::*;
+use netapp::endpoint::*;
use netapp::peering::basalt::*;
use netapp::proto::*;
-use netapp::NetApp;
+use netapp::{NetApp, NodeID};
#[derive(StructOpt, Debug)]
#[structopt(name = "netapp")]
@@ -50,6 +51,12 @@ pub struct Opt {
reset_count: usize,
}
+struct Example {
+ netapp: Arc<NetApp>,
+ basalt: Arc<Basalt>,
+ example_endpoint: Arc<Endpoint<ExampleMessage, Self>>,
+}
+
#[tokio::main]
async fn main() {
env_logger::Builder::new()
@@ -104,46 +111,54 @@ async fn main() {
reset_interval: Duration::from_secs(opt.reset_interval),
reset_count: opt.reset_count,
};
- let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
-
- netapp.add_msg_handler::<ExampleMessage, _, _>(
- |_from: ed25519::PublicKey, msg: ExampleMessage| {
- debug!("Got example message: {:?}, sending example response", msg);
- async {
- ExampleResponse {
- example_field: false,
- }
- }
- },
- );
+ let basalt = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ let example = Arc::new(Example {
+ netapp: netapp.clone(),
+ basalt,
+ example_endpoint: netapp.endpoint("__netapp/examples/basalt.rs/Example".into()),
+ });
+ example.example_endpoint.set_handler(example.clone());
let listen_addr = opt.listen_addr.parse().unwrap();
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(
- sampling_loop(netapp.clone(), peering.clone()),
- netapp.listen(listen_addr, public_addr),
- peering.run(),
+ example.clone().sampling_loop(),
+ example.netapp.clone().listen(listen_addr, public_addr),
+ example.basalt.clone().run(),
);
}
-async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
- loop {
- tokio::time::sleep(Duration::from_secs(10)).await;
-
- let peers = basalt.sample(10);
- for p in peers {
- debug!("kyev S {}", hex::encode(p));
-
- let netapp2 = netapp.clone();
- tokio::spawn(async move {
- match netapp2
- .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
- .await
- {
- Ok(resp) => debug!("Got example response: {:?}", resp),
- Err(e) => warn!("Error with example request: {}", e),
- }
- });
+impl Example {
+ async fn sampling_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ let peers = self.basalt.sample(10);
+ for p in peers {
+ debug!("kyev S {}", hex::encode(p));
+
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ match self2
+ .example_endpoint.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => debug!("Got example response: {:?}", resp),
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<ExampleMessage> for Example {
+ async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
+ debug!("Got example message: {:?}, sending example response", msg);
+ ExampleResponse {
+ example_field: false,
}
}
}
@@ -159,6 +174,5 @@ struct ExampleResponse {
}
impl Message for ExampleMessage {
- const KIND: MessageKind = 0x99000001;
type Response = ExampleResponse;
}
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
index acc0a7b..f40591a 100644
--- a/examples/fullmesh.rs
+++ b/examples/fullmesh.rs
@@ -66,7 +66,7 @@ async fn main() {
info!("Node private key: {}", hex::encode(&privkey));
info!("Node public key: {}", hex::encode(&privkey.public_key()));
- let netapp = NetApp::new(netid, privkey);
+ let netapp = NetApp::new(netid.clone(), privkey.clone());
let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() {
@@ -81,6 +81,12 @@ async fn main() {
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
let listen_addr = opt.listen_addr.parse().unwrap();
+
+ info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
+ hex::encode(&netid),
+ hex::encode(&privkey.public_key()),
+ listen_addr);
+
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),);
}