diff options
Diffstat (limited to 'src/message.rs')
-rw-r--r-- | src/message.rs | 83 |
1 files changed, 40 insertions, 43 deletions
diff --git a/src/message.rs b/src/message.rs index 8e4bc2f..2ed5c98 100644 --- a/src/message.rs +++ b/src/message.rs @@ -49,30 +49,27 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { // ---- /// The Req<M> is a helper object used to create requests and attach them -/// a streaming body. If the body is a fixed Bytes and not a ByteStream, +/// a stream of data. If the stream is a fixed Bytes and not a ByteStream, /// Req<M> is cheaply clonable to allow the request to be sent to different -/// peers (Clone will panic if the body is a ByteStream). -/// -/// Internally, this is also used to encode and decode requests -/// from/to byte streams to be sent over the network. +/// peers (Clone will panic if the stream is a ByteStream). pub struct Req<M: Message> { pub(crate) _phantom: PhantomData<M>, pub(crate) msg: Arc<M>, pub(crate) msg_ser: Option<Bytes>, - pub(crate) body: BodyData, + pub(crate) stream: AttachedStream, } impl<M: Message> Req<M> { - pub fn with_fixed_body(self, b: Bytes) -> Self { + pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { - body: BodyData::Fixed(b), + stream: AttachedStream::Fixed(b), ..self } } - pub fn with_streaming_body(self, b: ByteStream) -> Self { + pub fn with_stream(self, b: ByteStream) -> Self { Self { - body: BodyData::Stream(b), + stream: AttachedStream::Stream(b), ..self } } @@ -82,7 +79,7 @@ impl<M: Message> Req<M> { } pub fn take_stream(&mut self) -> Option<ByteStream> { - std::mem::replace(&mut self.body, BodyData::None).into_stream() + std::mem::replace(&mut self.stream, AttachedStream::None).into_stream() } pub(crate) fn into_enc( @@ -96,7 +93,7 @@ impl<M: Message> Req<M> { path, telemetry_id, msg: self.msg_ser.unwrap(), - stream: self.body.into_stream(), + stream: self.stream.into_stream(), } } @@ -106,7 +103,7 @@ impl<M: Message> Req<M> { _phantom: Default::default(), msg: Arc::new(msg), msg_ser: Some(enc.msg), - body: enc.stream.map(BodyData::Stream).unwrap_or(BodyData::None), + stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), }) } } @@ -123,7 +120,7 @@ impl<M: Message> IntoReq<M> for M { _phantom: Default::default(), msg: Arc::new(self), msg_ser: Some(Bytes::from(msg_ser)), - body: BodyData::None, + stream: AttachedStream::None, }) } fn into_req_local(self) -> Req<M> { @@ -131,7 +128,7 @@ impl<M: Message> IntoReq<M> for M { _phantom: Default::default(), msg: Arc::new(self), msg_ser: None, - body: BodyData::None, + stream: AttachedStream::None, } } } @@ -147,16 +144,16 @@ impl<M: Message> IntoReq<M> for Req<M> { impl<M: Message> Clone for Req<M> { fn clone(&self) -> Self { - let body = match &self.body { - BodyData::None => BodyData::None, - BodyData::Fixed(b) => BodyData::Fixed(b.clone()), - BodyData::Stream(_) => panic!("Cannot clone a Req<_> with a stream body"), + let stream = match &self.stream { + AttachedStream::None => AttachedStream::None, + AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()), + AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"), }; Self { _phantom: Default::default(), msg: self.msg.clone(), msg_ser: self.msg_ser.clone(), - body, + stream, } } } @@ -167,10 +164,10 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!(f, "Req[{:?}", self.msg)?; - match &self.body { - BodyData::None => write!(f, "]"), - BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), - BodyData::Stream(_) => write!(f, "; body=stream]"), + match &self.stream { + AttachedStream::None => write!(f, "]"), + AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()), + AttachedStream::Stream(_) => write!(f, "; stream]"), } } } @@ -178,11 +175,11 @@ where // ---- /// The Resp<M> represents a full response from a RPC that may have -/// an attached body stream. +/// an attached stream. pub struct Resp<M: Message> { pub(crate) _phantom: PhantomData<M>, pub(crate) msg: M::Response, - pub(crate) body: BodyData, + pub(crate) stream: AttachedStream, } impl<M: Message> Resp<M> { @@ -190,20 +187,20 @@ impl<M: Message> Resp<M> { Resp { _phantom: Default::default(), msg: v, - body: BodyData::None, + stream: AttachedStream::None, } } - pub fn with_fixed_body(self, b: Bytes) -> Self { + pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { - body: BodyData::Fixed(b), + stream: AttachedStream::Fixed(b), ..self } } - pub fn with_streaming_body(self, b: ByteStream) -> Self { + pub fn with_stream(self, b: ByteStream) -> Self { Self { - body: BodyData::Stream(b), + stream: AttachedStream::Stream(b), ..self } } @@ -217,13 +214,13 @@ impl<M: Message> Resp<M> { } pub fn into_parts(self) -> (M::Response, Option<ByteStream>) { - (self.msg, self.body.into_stream()) + (self.msg, self.stream.into_stream()) } pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> { Ok(RespEnc::Success { msg: rmp_to_vec_all_named(&self.msg)?.into(), - stream: self.body.into_stream(), + stream: self.stream.into_stream(), }) } @@ -234,7 +231,7 @@ impl<M: Message> Resp<M> { Ok(Self { _phantom: Default::default(), msg, - body: stream.map(BodyData::Stream).unwrap_or(BodyData::None), + stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), }) } RespEnc::Error { code, message } => Err(Error::Remote(code, message)), @@ -249,28 +246,28 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!(f, "Resp[{:?}", self.msg)?; - match &self.body { - BodyData::None => write!(f, "]"), - BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), - BodyData::Stream(_) => write!(f, "; body=stream]"), + match &self.stream { + AttachedStream::None => write!(f, "]"), + AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()), + AttachedStream::Stream(_) => write!(f, "; stream]"), } } } // ---- -pub(crate) enum BodyData { +pub(crate) enum AttachedStream { None, Fixed(Bytes), Stream(ByteStream), } -impl BodyData { +impl AttachedStream { pub fn into_stream(self) -> Option<ByteStream> { match self { - BodyData::None => None, - BodyData::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))), - BodyData::Stream(s) => Some(s), + AttachedStream::None => None, + AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))), + AttachedStream::Stream(s) => Some(s), } } } |