aboutsummaryrefslogtreecommitdiff
path: root/src/endpoint.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-06-08 00:30:56 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-06-08 09:54:38 +0200
commit4745e7c4ba5665d3303ae567087781778cec9c34 (patch)
tree667effc91ab5674360ef0211ab32b533a35f36af /src/endpoint.rs
parentfb5462ecdb6b5731a63a902519d3ec9b1061b8dd (diff)
downloadnetapp-4745e7c4ba5665d3303ae567087781778cec9c34.tar.gz
netapp-4745e7c4ba5665d3303ae567087781778cec9c34.zip
further work on streams
most changes still required are related to error handling
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r--src/endpoint.rs80
1 files changed, 49 insertions, 31 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 81ed036..c25365a 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -5,8 +5,7 @@ use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
-use serde::de::Error as DeError;
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
+use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::netapp::*;
@@ -22,42 +21,61 @@ pub trait Message: SerializeMessage + Send + Sync {
/// A trait for de/serializing messages, with possible associated stream.
#[async_trait]
pub trait SerializeMessage: Sized {
- fn serialize_msg<S: Serializer>(
- &self,
- serializer: S,
- ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error>;
+ type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send;
- async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
- deserializer: D,
- stream: AssociatedStream,
- ) -> Result<Self, D::Error>;
+ // TODO should return Result
+ fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>);
+
+ // TODO should return Result
+ async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self;
}
+pub trait AutoSerialize: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {}
+
#[async_trait]
impl<T> SerializeMessage for T
where
- T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
+ T: AutoSerialize,
{
- fn serialize_msg<S: Serializer>(
- &self,
- serializer: S,
- ) -> Result<(S::Ok, Option<AssociatedStream>), S::Error> {
- self.serialize(serializer).map(|r| (r, None))
+ type SerializableSelf = Self;
+ fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
+ (self.clone(), None)
+ }
+
+ async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self {
+ // TODO verify no stream
+ ser_self
+ }
+}
+
+impl AutoSerialize for () {}
+
+#[async_trait]
+impl<T, E> SerializeMessage for Result<T, E>
+where
+ T: SerializeMessage + Send,
+ E: SerializeMessage + Send,
+{
+ type SerializableSelf = Result<T::SerializableSelf, E::SerializableSelf>;
+
+ fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
+ match self {
+ Ok(ok) => {
+ let (msg, stream) = ok.serialize_msg();
+ (Ok(msg), stream)
+ }
+ Err(err) => {
+ let (msg, stream) = err.serialize_msg();
+ (Err(msg), stream)
+ }
+ }
}
- async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
- deserializer: D,
- mut stream: AssociatedStream,
- ) -> Result<Self, D::Error> {
- use futures::StreamExt;
-
- let res = Self::deserialize(deserializer)?;
- if stream.next().await.is_some() {
- return Err(D::Error::custom(
- "failed to deserialize: found associated stream when none expected",
- ));
+ async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self {
+ match ser_self {
+ Ok(ok) => Ok(T::deserialize_msg(ok, stream).await),
+ Err(err) => Err(E::deserialize_msg(err, stream).await),
}
- Ok(res)
}
}
@@ -139,7 +157,7 @@ where
prio: RequestPriority,
) -> Result<<M as Message>::Response, Error>
where
- B: Borrow<M>,
+ B: Borrow<M> + Send + Sync,
{
if *target == self.netapp.id {
match self.handler.load_full() {
@@ -202,8 +220,8 @@ where
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
- let mut deser = rmp_serde::decode::Deserializer::from_read_ref(buf);
- let req = M::deserialize_msg(&mut deser, stream).await?;
+ let req = rmp_serde::decode::from_read_ref(buf)?;
+ let req = M::deserialize_msg(req, stream).await;
let res = h.handle(&req, from).await;
let res_bytes = rmp_to_vec_all_named(&res)?;
Ok(res_bytes)