aboutsummaryrefslogtreecommitdiff
path: root/src/endpoint.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-06-05 15:33:43 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-06-05 15:33:43 +0200
commit368ba908794901bc793c6a087c02241be046bdf2 (patch)
tree389910f1e1476c9531a01d2e53060e1056cca266 /src/endpoint.rs
parent648e015e3a73b96973343e0a1f861c9ea41cc24d (diff)
downloadnetapp-368ba908794901bc793c6a087c02241be046bdf2.tar.gz
netapp-368ba908794901bc793c6a087c02241be046bdf2.zip
initial work on associated stream
still require testing, and fixing a few kinks: - sending packets > 16k truncate them - send one more packet than it could at eos - probably update documentation /!\ contains breaking changes
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r--src/endpoint.rs66
1 files changed, 60 insertions, 6 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 42e9a98..81ed036 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -5,7 +5,8 @@ use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
+use serde::de::Error as DeError;
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::error::Error;
use crate::netapp::*;
@@ -14,8 +15,50 @@ use crate::util::*;
/// This trait should be implemented by all messages your application
/// wants to handle
-pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
- type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait Message: SerializeMessage + Send + Sync {
+ type Response: SerializeMessage + Send + Sync;
+}
+
+/// A trait for de/serializing messages, with possible associated stream.
+#[async_trait]
+pub trait SerializeMessage: Sized {
+ fn serialize_msg<S: Serializer>(
+ &self,
+ serializer: S,
+ ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error>;
+
+ async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
+ deserializer: D,
+ stream: AssociatedStream,
+ ) -> Result<Self, D::Error>;
+}
+
+#[async_trait]
+impl<T> SerializeMessage for T
+where
+ T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
+{
+ fn serialize_msg<S: Serializer>(
+ &self,
+ serializer: S,
+ ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error> {
+ self.serialize(serializer).map(|r| (r, None))
+ }
+
+ async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
+ deserializer: D,
+ mut stream: AssociatedStream,
+ ) -> Result<Self, D::Error> {
+ use futures::StreamExt;
+
+ let res = Self::deserialize(deserializer)?;
+ if stream.next().await.is_some() {
+ return Err(D::Error::custom(
+ "failed to deserialize: found associated stream when none expected",
+ ));
+ }
+ Ok(res)
+ }
}
/// This trait should be implemented by an object of your application
@@ -128,7 +171,12 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub(crate) trait GenericEndpoint {
- async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>;
+ async fn handle(
+ &self,
+ buf: &[u8],
+ stream: AssociatedStream,
+ from: NodeID,
+ ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
@@ -145,11 +193,17 @@ where
M: Message + 'static,
H: EndpointHandler<M> + 'static,
{
- async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error> {
+ async fn handle(
+ &self,
+ buf: &[u8],
+ stream: AssociatedStream,
+ from: NodeID,
+ ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error> {
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
- let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?;
+ let mut deser = rmp_serde::decode::Deserializer::from_read_ref(buf);
+ let req = M::deserialize_msg(&mut deser, stream).await?;
let res = h.handle(&req, from).await;
let res_bytes = rmp_to_vec_all_named(&res)?;
Ok(res_bytes)