aboutsummaryrefslogtreecommitdiff
path: root/examples/fullmesh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/fullmesh.rs')
-rw-r--r--examples/fullmesh.rs135
1 files changed, 130 insertions, 5 deletions
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
index 4ab8a8a..82e45c3 100644
--- a/examples/fullmesh.rs
+++ b/examples/fullmesh.rs
@@ -1,17 +1,24 @@
use std::io::Write;
use std::net::SocketAddr;
-
-use log::info;
-
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::{stream, StreamExt};
+use log::*;
+use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use tokio::sync::watch;
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
+use netapp::endpoint::*;
+use netapp::message::*;
use netapp::peering::fullmesh::*;
use netapp::util::*;
-
-use netapp::NetApp;
+use netapp::{NetApp, NodeID};
#[derive(StructOpt, Debug)]
#[structopt(name = "netapp")]
@@ -92,8 +99,126 @@ async fn main() {
let watch_cancel = netapp::util::watch_ctrl_c();
+ let example = Arc::new(Example {
+ netapp: netapp.clone(),
+ fullmesh: peering.clone(),
+ example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()),
+ });
+ example.example_endpoint.set_handler(example.clone());
+
tokio::join!(
+ example.exchange_loop(watch_cancel.clone()),
netapp.listen(listen_addr, public_addr, watch_cancel.clone()),
peering.run(watch_cancel),
);
}
+
+// ----
+
+struct Example {
+ netapp: Arc<NetApp>,
+ fullmesh: Arc<FullMeshPeeringStrategy>,
+ example_endpoint: Arc<Endpoint<ExampleMessage, Self>>,
+}
+
+impl Example {
+ async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
+ let mut i = 12000;
+ while !*must_exit.borrow() {
+ tokio::time::sleep(Duration::from_secs(7)).await;
+
+ let peers = self.fullmesh.get_peer_list();
+ for p in peers.iter() {
+ let id = p.id;
+ if id == self.netapp.id {
+ continue;
+ }
+ i += 1;
+ let example_field = i;
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ info!(
+ "Send example query {} to {}",
+ example_field,
+ hex::encode(id)
+ );
+ let stream =
+ Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move {
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ Ok(Bytes::from(vec![(x % 256) as u8; 133 * x]))
+ }));
+ match self2
+ .example_endpoint
+ .call_streaming(
+ &id,
+ Req::new(ExampleMessage { example_field })
+ .unwrap()
+ .with_stream(stream),
+ PRIO_NORMAL,
+ )
+ .await
+ {
+ Ok(resp) => {
+ let (resp, stream) = resp.into_parts();
+ info!(
+ "Got example response to {} from {}: {:?}",
+ example_field,
+ hex::encode(id),
+ resp
+ );
+ let mut stream = stream.unwrap();
+ while let Some(x) = stream.next().await {
+ info!("Response: stream got bytes {:?}", x.map(|b| b.len()));
+ }
+ }
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl StreamingEndpointHandler<ExampleMessage> for Example {
+ async fn handle(
+ self: &Arc<Self>,
+ mut msg: Req<ExampleMessage>,
+ _from: NodeID,
+ ) -> Resp<ExampleMessage> {
+ info!(
+ "Got example message: {:?}, sending example response",
+ msg.msg()
+ );
+ let source_stream = msg.take_stream().unwrap();
+ let new_stream = Box::pin(source_stream.then(|x| async move {
+ info!(
+ "Handler: stream got bytes {:?}",
+ x.as_ref().map(|b| b.len())
+ );
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ Ok(Bytes::from(vec![
+ 10u8;
+ x.map(|b| b.len()).unwrap_or(1422) * 2
+ ]))
+ }));
+ Resp::new(ExampleResponse {
+ example_field: false,
+ })
+ .with_stream(new_stream)
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleMessage {
+ example_field: usize,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleResponse {
+ example_field: bool,
+}
+
+impl Message for ExampleMessage {
+ type Response = ExampleResponse;
+}