diff options
author | trinity-1686a <trinity@deuxfleurs.fr> | 2022-06-05 15:33:43 +0200 |
---|---|---|
committer | trinity-1686a <trinity@deuxfleurs.fr> | 2022-06-05 15:33:43 +0200 |
commit | 368ba908794901bc793c6a087c02241be046bdf2 (patch) | |
tree | 389910f1e1476c9531a01d2e53060e1056cca266 /src/endpoint.rs | |
parent | 648e015e3a73b96973343e0a1f861c9ea41cc24d (diff) | |
download | netapp-368ba908794901bc793c6a087c02241be046bdf2.tar.gz netapp-368ba908794901bc793c6a087c02241be046bdf2.zip |
initial work on associated stream
still require testing, and fixing a few kinks:
- sending packets > 16k truncate them
- send one more packet than it could at eos
- probably update documentation
/!\ contains breaking changes
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r-- | src/endpoint.rs | 66 |
1 files changed, 60 insertions, 6 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index 42e9a98..81ed036 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use serde::de::Error as DeError; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::error::Error; use crate::netapp::*; @@ -14,8 +15,50 @@ use crate::util::*; /// This trait should be implemented by all messages your application /// wants to handle -pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { - type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait Message: SerializeMessage + Send + Sync { + type Response: SerializeMessage + Send + Sync; +} + +/// A trait for de/serializing messages, with possible associated stream. +#[async_trait] +pub trait SerializeMessage: Sized { + fn serialize_msg<S: Serializer>( + &self, + serializer: S, + ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error>; + + async fn deserialize_msg<'de, D: Deserializer<'de> + Send>( + deserializer: D, + stream: AssociatedStream, + ) -> Result<Self, D::Error>; +} + +#[async_trait] +impl<T> SerializeMessage for T +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync, +{ + fn serialize_msg<S: Serializer>( + &self, + serializer: S, + ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error> { + self.serialize(serializer).map(|r| (r, None)) + } + + async fn deserialize_msg<'de, D: Deserializer<'de> + Send>( + deserializer: D, + mut stream: AssociatedStream, + ) -> Result<Self, D::Error> { + use futures::StreamExt; + + let res = Self::deserialize(deserializer)?; + if stream.next().await.is_some() { + return Err(D::Error::custom( + "failed to deserialize: found associated stream when none expected", + )); + } + Ok(res) + } } /// This trait should be implemented by an object of your application @@ -128,7 +171,12 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; #[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>; + async fn handle( + &self, + buf: &[u8], + stream: AssociatedStream, + from: NodeID, + ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -145,11 +193,17 @@ where M: Message + 'static, H: EndpointHandler<M> + 'static, { - async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error> { + async fn handle( + &self, + buf: &[u8], + stream: AssociatedStream, + from: NodeID, + ) -> Result<(Vec<u8>, Option<AssociatedStream>), Error> { match self.0.handler.load_full() { None => Err(Error::NoHandler), Some(h) => { - let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?; + let mut deser = rmp_serde::decode::Deserializer::from_read_ref(buf); + let req = M::deserialize_msg(&mut deser, stream).await?; let res = h.handle(&req, from).await; let res_bytes = rmp_to_vec_all_named(&res)?; Ok(res_bytes) |