diff options
author | Alex <alex@adnab.me> | 2022-09-13 12:56:53 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-09-13 12:56:53 +0200 |
commit | 8ac109e3a84bd34550d66baf65fe59b86b63bca2 (patch) | |
tree | a49a199a1049d18afaa60f47f46e04cb798aa4b2 /examples | |
parent | a82700c5a27612002e6ee029ae77915b8114182f (diff) | |
parent | 298e956a199711b65ce3820931ca943108b78225 (diff) | |
download | netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.tar.gz netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.zip |
Merge pull request 'add streaming body to requests and responses' (#3) from stream-body into mainv0.5.0
Reviewed-on: https://git.deuxfleurs.fr/lx/netapp/pulls/3
Diffstat (limited to 'examples')
-rw-r--r-- | examples/basalt.rs | 4 | ||||
-rw-r--r-- | examples/fullmesh.rs | 129 |
2 files changed, 127 insertions, 6 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs index 318e37c..a5a25c3 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -14,8 +14,8 @@ use sodiumoxide::crypto::sign::ed25519; use tokio::sync::watch; use netapp::endpoint::*; +use netapp::message::*; use netapp::peering::basalt::*; -use netapp::proto::*; use netapp::util::parse_peer_addr; use netapp::{NetApp, NodeID}; @@ -145,7 +145,7 @@ impl Example { tokio::spawn(async move { match self2 .example_endpoint - .call(&p, &ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) .await { Ok(resp) => debug!("Got example response: {:?}", resp), diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index b068410..d0190ef 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -1,16 +1,24 @@ use std::io::Write; use std::net::SocketAddr; - -use log::info; - +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::{stream, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use tokio::sync::watch; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; +use netapp::endpoint::*; +use netapp::message::*; use netapp::peering::fullmesh::*; use netapp::util::*; -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -91,8 +99,121 @@ async fn main() { let watch_cancel = netapp::util::watch_ctrl_c(); + let example = Arc::new(Example { + netapp: netapp.clone(), + fullmesh: peering.clone(), + example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); + tokio::join!( + example.exchange_loop(watch_cancel.clone()), netapp.listen(listen_addr, public_addr, watch_cancel.clone()), peering.run(watch_cancel), ); } + +// ---- + +struct Example { + netapp: Arc<NetApp>, + fullmesh: Arc<FullMeshPeeringStrategy>, + example_endpoint: Arc<Endpoint<ExampleMessage, Self>>, +} + +impl Example { + async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) { + let mut i = 12000; + while !*must_exit.borrow() { + tokio::time::sleep(Duration::from_secs(2)).await; + + let peers = self.fullmesh.get_peer_list(); + for p in peers.iter() { + let id = p.id; + if id == self.netapp.id { + continue; + } + i += 1; + let example_field = i; + let self2 = self.clone(); + tokio::spawn(async move { + info!( + "Send example query {} to {}", + example_field, + hex::encode(id) + ); + // Fake data stream with some delays in item production + let stream = + Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) + })); + match self2 + .example_endpoint + .call_streaming( + &id, + Req::new(ExampleMessage { example_field }) + .unwrap() + .with_stream(stream), + PRIO_NORMAL, + ) + .await + { + Ok(resp) => { + let (resp, stream) = resp.into_parts(); + info!( + "Got example response to {} from {}: {:?}", + example_field, + hex::encode(id), + resp + ); + let mut stream = stream.unwrap(); + while let Some(x) = stream.next().await { + info!("Response: stream got bytes {:?}", x.map(|b| b.len())); + } + } + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl StreamingEndpointHandler<ExampleMessage> for Example { + async fn handle( + self: &Arc<Self>, + mut msg: Req<ExampleMessage>, + _from: NodeID, + ) -> Resp<ExampleMessage> { + info!( + "Got example message: {:?}, sending example response", + msg.msg() + ); + let source_stream = msg.take_stream().unwrap(); + // Return same stream with 300ms delay + let new_stream = Box::pin(source_stream.then(|x| async move { + tokio::time::sleep(Duration::from_millis(300)).await; + x + })); + Resp::new(ExampleResponse { + example_field: false, + }) + .with_stream(new_stream) + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleMessage { + example_field: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleResponse { + example_field: bool, +} + +impl Message for ExampleMessage { + type Response = ExampleResponse; +} |