diff options
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r-- | src/endpoint.rs | 80 |
1 files changed, 49 insertions, 31 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index 81ed036..c25365a 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -5,8 +5,7 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use serde::de::Error as DeError; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use crate::error::Error; use crate::netapp::*; @@ -22,42 +21,61 @@ pub trait Message: SerializeMessage + Send + Sync { /// A trait for de/serializing messages, with possible associated stream. #[async_trait] pub trait SerializeMessage: Sized { - fn serialize_msg<S: Serializer>( - &self, - serializer: S, - ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error>; + type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; - async fn deserialize_msg<'de, D: Deserializer<'de> + Send>( - deserializer: D, - stream: AssociatedStream, - ) -> Result<Self, D::Error>; + // TODO should return Result + fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>); + + // TODO should return Result + async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self; } +pub trait AutoSerialize: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {} + #[async_trait] impl<T> SerializeMessage for T where - T: Serialize + for<'de> Deserialize<'de> + Send + Sync, + T: AutoSerialize, { - fn serialize_msg<S: Serializer>( - &self, - serializer: S, - ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error> { - self.serialize(serializer).map(|r| (r, None)) + type SerializableSelf = Self; + fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) { + (self.clone(), None) + } + + async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self { + // TODO verify no stream + ser_self + } +} + +impl AutoSerialize for () {} + +#[async_trait] +impl<T, E> SerializeMessage for Result<T, E> +where + T: SerializeMessage + Send, + E: SerializeMessage + Send, +{ + type SerializableSelf = Result<T::SerializableSelf, E::SerializableSelf>; + + fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) { + match self { + Ok(ok) => { + let (msg, stream) = ok.serialize_msg(); + (Ok(msg), stream) + } + Err(err) => { + let (msg, stream) = err.serialize_msg(); + (Err(msg), stream) + } + } } - async fn deserialize_msg<'de, D: Deserializer<'de> + Send>( - deserializer: D, - mut stream: AssociatedStream, - ) -> Result<Self, D::Error> { - use futures::StreamExt; - - let res = Self::deserialize(deserializer)?; - if stream.next().await.is_some() { - return Err(D::Error::custom( - "failed to deserialize: found associated stream when none expected", - )); + async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self { + match ser_self { + Ok(ok) => Ok(T::deserialize_msg(ok, stream).await), + Err(err) => Err(E::deserialize_msg(err, stream).await), } - Ok(res) } } @@ -139,7 +157,7 @@ where prio: RequestPriority, ) -> Result<<M as Message>::Response, Error> where - B: Borrow<M>, + B: Borrow<M> + Send + Sync, { if *target == self.netapp.id { match self.handler.load_full() { @@ -202,8 +220,8 @@ where match self.0.handler.load_full() { None => Err(Error::NoHandler), Some(h) => { - let mut deser = rmp_serde::decode::Deserializer::from_read_ref(buf); - let req = M::deserialize_msg(&mut deser, stream).await?; + let req = rmp_serde::decode::from_read_ref(buf)?; + let req = M::deserialize_msg(req, stream).await; let res = h.handle(&req, from).await; let res_bytes = rmp_to_vec_all_named(&res)?; Ok(res_bytes) |