From f35fa7d18d9e0f51bed311355ec1310b1d311ab3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:34:53 +0200 Subject: Move things around --- examples/basalt.rs | 3 ++- examples/fullmesh.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'examples') diff --git a/examples/basalt.rs b/examples/basalt.rs index 318e37c..52fab4b 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -14,8 +14,9 @@ use sodiumoxide::crypto::sign::ed25519; use tokio::sync::watch; use netapp::endpoint::*; +use netapp::message::*; use netapp::peering::basalt::*; -use netapp::proto::*; +use netapp::send::*; use netapp::util::parse_peer_addr; use netapp::{NetApp, NodeID}; diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index b068410..4ab8a8a 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -10,6 +10,7 @@ use sodiumoxide::crypto::sign::ed25519; use netapp::peering::fullmesh::*; use netapp::util::*; + use netapp::NetApp; #[derive(StructOpt, Debug)] -- cgit v1.2.3 From 7d148c7e764d563efa3bccc0f14f50867db38ef1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:25:07 +0200 Subject: One possibility, but I don't like it --- examples/basalt.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'examples') diff --git a/examples/basalt.rs b/examples/basalt.rs index 52fab4b..dd56cd7 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -16,7 +16,6 @@ use tokio::sync::watch; use netapp::endpoint::*; use netapp::message::*; use netapp::peering::basalt::*; -use netapp::send::*; use netapp::util::parse_peer_addr; use netapp::{NetApp, NodeID}; @@ -146,7 +145,7 @@ impl Example { tokio::spawn(async move { match self2 .example_endpoint - .call(&p, &ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) .await { Ok(resp) => debug!("Got example response: {:?}", resp), @@ -160,7 +159,7 @@ impl Example { #[async_trait] impl EndpointHandler for Example { - async fn handle(self: &Arc, msg: &ExampleMessage, _from: NodeID) -> ExampleResponse { + async fn handle(self: &Arc, msg: ExampleMessage, _from: NodeID) -> ExampleResponse { debug!("Got example message: {:?}, sending example response", msg); ExampleResponse { example_field: false, -- cgit v1.2.3 From 4934ed726d51913afd97ca937d0ece39ef8b7371 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 20:22:56 +0200 Subject: Propose alternative API --- examples/basalt.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'examples') diff --git a/examples/basalt.rs b/examples/basalt.rs index dd56cd7..3841786 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -159,11 +159,15 @@ impl Example { #[async_trait] impl EndpointHandler for Example { - async fn handle(self: &Arc, msg: ExampleMessage, _from: NodeID) -> ExampleResponse { + async fn handle( + self: &Arc, + msg: Req, + _from: NodeID, + ) -> Resp { debug!("Got example message: {:?}, sending example response", msg); - ExampleResponse { + Resp::new(ExampleResponse { example_field: false, - } + }) } } -- cgit v1.2.3 From c358fe3c92da8a8454e461484737efe2a14dfd73 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 10:55:37 +0200 Subject: Hide streaming versions as much as possible --- examples/basalt.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'examples') diff --git a/examples/basalt.rs b/examples/basalt.rs index 3841786..a5a25c3 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -159,15 +159,11 @@ impl Example { #[async_trait] impl EndpointHandler for Example { - async fn handle( - self: &Arc, - msg: Req, - _from: NodeID, - ) -> Resp { + async fn handle(self: &Arc, msg: &ExampleMessage, _from: NodeID) -> ExampleResponse { debug!("Got example message: {:?}, sending example response", msg); - Resp::new(ExampleResponse { + ExampleResponse { example_field: false, - }) + } } } -- cgit v1.2.3 From bdf7d4731dcd2e9b523758272fdc41b374044a9f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Jul 2022 12:01:13 +0200 Subject: Add stream example to fullmesh example --- examples/fullmesh.rs | 135 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 5 deletions(-) (limited to 'examples') diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 4ab8a8a..82e45c3 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -1,17 +1,24 @@ use std::io::Write; use std::net::SocketAddr; - -use log::info; - +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::{stream, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use tokio::sync::watch; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; +use netapp::endpoint::*; +use netapp::message::*; use netapp::peering::fullmesh::*; use netapp::util::*; - -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -92,8 +99,126 @@ async fn main() { let watch_cancel = netapp::util::watch_ctrl_c(); + let example = Arc::new(Example { + netapp: netapp.clone(), + fullmesh: peering.clone(), + example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); + tokio::join!( + example.exchange_loop(watch_cancel.clone()), netapp.listen(listen_addr, public_addr, watch_cancel.clone()), peering.run(watch_cancel), ); } + +// ---- + +struct Example { + netapp: Arc, + fullmesh: Arc, + example_endpoint: Arc>, +} + +impl Example { + async fn exchange_loop(self: Arc, must_exit: watch::Receiver) { + let mut i = 12000; + while !*must_exit.borrow() { + tokio::time::sleep(Duration::from_secs(7)).await; + + let peers = self.fullmesh.get_peer_list(); + for p in peers.iter() { + let id = p.id; + if id == self.netapp.id { + continue; + } + i += 1; + let example_field = i; + let self2 = self.clone(); + tokio::spawn(async move { + info!( + "Send example query {} to {}", + example_field, + hex::encode(id) + ); + let stream = + Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) + })); + match self2 + .example_endpoint + .call_streaming( + &id, + Req::new(ExampleMessage { example_field }) + .unwrap() + .with_stream(stream), + PRIO_NORMAL, + ) + .await + { + Ok(resp) => { + let (resp, stream) = resp.into_parts(); + info!( + "Got example response to {} from {}: {:?}", + example_field, + hex::encode(id), + resp + ); + let mut stream = stream.unwrap(); + while let Some(x) = stream.next().await { + info!("Response: stream got bytes {:?}", x.map(|b| b.len())); + } + } + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl StreamingEndpointHandler for Example { + async fn handle( + self: &Arc, + mut msg: Req, + _from: NodeID, + ) -> Resp { + info!( + "Got example message: {:?}, sending example response", + msg.msg() + ); + let source_stream = msg.take_stream().unwrap(); + let new_stream = Box::pin(source_stream.then(|x| async move { + info!( + "Handler: stream got bytes {:?}", + x.as_ref().map(|b| b.len()) + ); + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(Bytes::from(vec![ + 10u8; + x.map(|b| b.len()).unwrap_or(1422) * 2 + ])) + })); + Resp::new(ExampleResponse { + example_field: false, + }) + .with_stream(new_stream) + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleMessage { + example_field: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleResponse { + example_field: bool, +} + +impl Message for ExampleMessage { + type Response = ExampleResponse; +} -- cgit v1.2.3 From b55f61c38b01da01314d99ced543aba713dbd2a9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Jul 2022 12:11:48 +0200 Subject: Fix things going wrong when sending chan is closed --- examples/fullmesh.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'examples') diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 82e45c3..972bec0 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -125,7 +125,7 @@ impl Example { async fn exchange_loop(self: Arc, must_exit: watch::Receiver) { let mut i = 12000; while !*must_exit.borrow() { - tokio::time::sleep(Duration::from_secs(7)).await; + tokio::time::sleep(Duration::from_secs(2)).await; let peers = self.fullmesh.get_peer_list(); for p in peers.iter() { @@ -144,7 +144,7 @@ impl Example { ); let stream = Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(500)).await; Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) })); match self2 @@ -196,7 +196,7 @@ impl StreamingEndpointHandler for Example { "Handler: stream got bytes {:?}", x.as_ref().map(|b| b.len()) ); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(300)).await; Ok(Bytes::from(vec![ 10u8; x.map(|b| b.len()).unwrap_or(1422) * 2 -- cgit v1.2.3 From f0326607eebcded55fd01c41895a5b6e23bac55f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 17:19:26 +0200 Subject: slightly change example --- examples/fullmesh.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) (limited to 'examples') diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 972bec0..d0190ef 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -142,6 +142,7 @@ impl Example { example_field, hex::encode(id) ); + // Fake data stream with some delays in item production let stream = Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { tokio::time::sleep(Duration::from_millis(500)).await; @@ -191,16 +192,10 @@ impl StreamingEndpointHandler for Example { msg.msg() ); let source_stream = msg.take_stream().unwrap(); + // Return same stream with 300ms delay let new_stream = Box::pin(source_stream.then(|x| async move { - info!( - "Handler: stream got bytes {:?}", - x.as_ref().map(|b| b.len()) - ); tokio::time::sleep(Duration::from_millis(300)).await; - Ok(Bytes::from(vec![ - 10u8; - x.map(|b| b.len()).unwrap_or(1422) * 2 - ])) + x })); Resp::new(ExampleResponse { example_field: false, -- cgit v1.2.3