From 4934ed726d51913afd97ca937d0ece39ef8b7371 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 20:22:56 +0200 Subject: Propose alternative API --- src/message.rs | 205 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 142 insertions(+), 63 deletions(-) (limited to 'src/message.rs') diff --git a/src/message.rs b/src/message.rs index 22cae6a..d918c29 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,3 +1,7 @@ +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -37,94 +41,169 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01; /// This trait should be implemented by all messages your application /// wants to handle -pub trait Message: SerializeMessage + Send + Sync { - type Response: SerializeMessage + Send + Sync; +pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { + type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +pub struct Req { + pub(crate) _phantom: PhantomData, + pub(crate) msg: Arc, + pub(crate) msg_ser: Option, + pub(crate) body: BodyData, } -/// A trait for de/serializing messages, with possible associated stream. -/// This is default-implemented by anything that can already be serialized -/// and deserialized. Adapters are provided that implement this for -/// adding a body, either from a fixed Bytes buffer (which allows the thing -/// to be Clone), or from a streaming byte stream. -pub trait SerializeMessage: Sized { - type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; +pub struct Resp { + pub(crate) _phantom: PhantomData, + pub(crate) msg: M::Response, + pub(crate) body: BodyData, +} - fn into_parts(self) -> (Self::SerializableSelf, Option); +pub(crate) enum BodyData { + None, + Fixed(Bytes), + Stream(ByteStream), +} - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self; +impl BodyData { + pub fn into_stream(self) -> Option { + match self { + BodyData::None => None, + BodyData::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))), + BodyData::Stream(s) => Some(s), + } + } } // ---- -impl SerializeMessage for T -where - T: Serialize + for<'de> Deserialize<'de> + Send, -{ - type SerializableSelf = Self; - fn into_parts(self) -> (Self::SerializableSelf, Option) { - (self, None) +impl Req { + pub fn msg(&self) -> &M { + &self.msg } - fn from_parts(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { - // TODO verify no stream - ser_self + pub fn with_fixed_body(self, b: Bytes) -> Self { + Self { + body: BodyData::Fixed(b), + ..self + } + } + + pub fn with_streaming_body(self, b: ByteStream) -> Self { + Self { + body: BodyData::Stream(b), + ..self + } } } -// ---- +pub trait IntoReq { + fn into_req(self) -> Result, rmp_serde::encode::Error>; + fn into_req_local(self) -> Req; +} -/// An adapter that adds a body from a fixed Bytes buffer to a serializable message, -/// implementing the SerializeMessage trait. This allows for the SerializeMessage object -/// to be cloned, which is usefull for requests that must be sent to multiple servers. -/// Note that cloning the body is cheap thanks to Bytes; make sure that your serializable -/// part is also easily clonable (e.g. by wrapping it in an Arc). -/// Note that this CANNOT be used for a response type, as it cannot be reconstructed -/// from a remote stream. -#[derive(Clone)] -pub struct WithFixedBody Deserialize<'de> + Clone + Send + 'static>( - pub T, - pub Bytes, -); - -impl SerializeMessage for WithFixedBody -where - T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static, -{ - type SerializableSelf = T; - - fn into_parts(self) -> (Self::SerializableSelf, Option) { - let body = self.1; - ( - self.0, - Some(Box::pin(futures::stream::once(async move { Ok(body) }))), - ) +impl IntoReq for M { + fn into_req(self) -> Result, rmp_serde::encode::Error> { + let msg_ser = rmp_to_vec_all_named(&self)?; + Ok(Req { + _phantom: Default::default(), + msg: Arc::new(self), + msg_ser: Some(Bytes::from(msg_ser)), + body: BodyData::None, + }) } + fn into_req_local(self) -> Req { + Req { + _phantom: Default::default(), + msg: Arc::new(self), + msg_ser: None, + body: BodyData::None, + } + } +} - fn from_parts(_ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { - panic!("Cannot use a WithFixedBody as a response type"); +impl IntoReq for Req { + fn into_req(self) -> Result, rmp_serde::encode::Error> { + Ok(self) + } + fn into_req_local(self) -> Req { + self } } -/// An adapter that adds a body from a ByteStream. This is usefull for receiving -/// responses to requests that contain attached byte streams. This type is -/// not clonable. -pub struct WithStreamingBody Deserialize<'de> + Send>( - pub T, - pub ByteStream, -); +impl Clone for Req { + fn clone(&self) -> Self { + let body = match &self.body { + BodyData::None => BodyData::None, + BodyData::Fixed(b) => BodyData::Fixed(b.clone()), + BodyData::Stream(_) => panic!("Cannot clone a Req<_> with a stream body"), + }; + Self { + _phantom: Default::default(), + msg: self.msg.clone(), + msg_ser: self.msg_ser.clone(), + body, + } + } +} -impl SerializeMessage for WithStreamingBody +impl fmt::Debug for Req where - T: Serialize + for<'de> Deserialize<'de> + Send, + M: Message + fmt::Debug, { - type SerializableSelf = T; + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Req[{:?}", self.msg)?; + match &self.body { + BodyData::None => write!(f, "]"), + BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), + BodyData::Stream(_) => write!(f, "; body=stream]"), + } + } +} + +impl fmt::Debug for Resp +where + M: Message, + ::Response: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Resp[{:?}", self.msg)?; + match &self.body { + BodyData::None => write!(f, "]"), + BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), + BodyData::Stream(_) => write!(f, "; body=stream]"), + } + } +} + +impl Resp { + pub fn new(v: M::Response) -> Self { + Resp { + _phantom: Default::default(), + msg: v, + body: BodyData::None, + } + } + + pub fn with_fixed_body(self, b: Bytes) -> Self { + Self { + body: BodyData::Fixed(b), + ..self + } + } + + pub fn with_streaming_body(self, b: ByteStream) -> Self { + Self { + body: BodyData::Stream(b), + ..self + } + } - fn into_parts(self) -> (Self::SerializableSelf, Option) { - (self.0, Some(self.1)) + pub fn msg(&self) -> &M::Response { + &self.msg } - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { - WithStreamingBody(ser_self, stream) + pub fn into_msg(self) -> M::Response { + self.msg } } -- cgit v1.2.3