aboutsummaryrefslogtreecommitdiff
path: root/src/message.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:44:48 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:44:48 +0200
commitaa1b29d41a680f9ae266ed4bdecec89db58226c1 (patch)
tree58e2fc9eae1b7c32d337a347c964c279af127795 /src/message.rs
parent67ea3a48fa1c9c462f1c4912c9658ad002d3336d (diff)
downloadnetapp-aa1b29d41a680f9ae266ed4bdecec89db58226c1.tar.gz
netapp-aa1b29d41a680f9ae266ed4bdecec89db58226c1.zip
Terminology: don't use the word "body" anymore, talk of "attached stream"
Diffstat (limited to 'src/message.rs')
-rw-r--r--src/message.rs83
1 files changed, 40 insertions, 43 deletions
diff --git a/src/message.rs b/src/message.rs
index 8e4bc2f..2ed5c98 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -49,30 +49,27 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
// ----
/// The Req<M> is a helper object used to create requests and attach them
-/// a streaming body. If the body is a fixed Bytes and not a ByteStream,
+/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
/// Req<M> is cheaply clonable to allow the request to be sent to different
-/// peers (Clone will panic if the body is a ByteStream).
-///
-/// Internally, this is also used to encode and decode requests
-/// from/to byte streams to be sent over the network.
+/// peers (Clone will panic if the stream is a ByteStream).
pub struct Req<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: Arc<M>,
pub(crate) msg_ser: Option<Bytes>,
- pub(crate) body: BodyData,
+ pub(crate) stream: AttachedStream,
}
impl<M: Message> Req<M> {
- pub fn with_fixed_body(self, b: Bytes) -> Self {
+ pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
- body: BodyData::Fixed(b),
+ stream: AttachedStream::Fixed(b),
..self
}
}
- pub fn with_streaming_body(self, b: ByteStream) -> Self {
+ pub fn with_stream(self, b: ByteStream) -> Self {
Self {
- body: BodyData::Stream(b),
+ stream: AttachedStream::Stream(b),
..self
}
}
@@ -82,7 +79,7 @@ impl<M: Message> Req<M> {
}
pub fn take_stream(&mut self) -> Option<ByteStream> {
- std::mem::replace(&mut self.body, BodyData::None).into_stream()
+ std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
}
pub(crate) fn into_enc(
@@ -96,7 +93,7 @@ impl<M: Message> Req<M> {
path,
telemetry_id,
msg: self.msg_ser.unwrap(),
- stream: self.body.into_stream(),
+ stream: self.stream.into_stream(),
}
}
@@ -106,7 +103,7 @@ impl<M: Message> Req<M> {
_phantom: Default::default(),
msg: Arc::new(msg),
msg_ser: Some(enc.msg),
- body: enc.stream.map(BodyData::Stream).unwrap_or(BodyData::None),
+ stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
})
}
}
@@ -123,7 +120,7 @@ impl<M: Message> IntoReq<M> for M {
_phantom: Default::default(),
msg: Arc::new(self),
msg_ser: Some(Bytes::from(msg_ser)),
- body: BodyData::None,
+ stream: AttachedStream::None,
})
}
fn into_req_local(self) -> Req<M> {
@@ -131,7 +128,7 @@ impl<M: Message> IntoReq<M> for M {
_phantom: Default::default(),
msg: Arc::new(self),
msg_ser: None,
- body: BodyData::None,
+ stream: AttachedStream::None,
}
}
}
@@ -147,16 +144,16 @@ impl<M: Message> IntoReq<M> for Req<M> {
impl<M: Message> Clone for Req<M> {
fn clone(&self) -> Self {
- let body = match &self.body {
- BodyData::None => BodyData::None,
- BodyData::Fixed(b) => BodyData::Fixed(b.clone()),
- BodyData::Stream(_) => panic!("Cannot clone a Req<_> with a stream body"),
+ let stream = match &self.stream {
+ AttachedStream::None => AttachedStream::None,
+ AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
+ AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"),
};
Self {
_phantom: Default::default(),
msg: self.msg.clone(),
msg_ser: self.msg_ser.clone(),
- body,
+ stream,
}
}
}
@@ -167,10 +164,10 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Req[{:?}", self.msg)?;
- match &self.body {
- BodyData::None => write!(f, "]"),
- BodyData::Fixed(b) => write!(f, "; body={}]", b.len()),
- BodyData::Stream(_) => write!(f, "; body=stream]"),
+ match &self.stream {
+ AttachedStream::None => write!(f, "]"),
+ AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
+ AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
@@ -178,11 +175,11 @@ where
// ----
/// The Resp<M> represents a full response from a RPC that may have
-/// an attached body stream.
+/// an attached stream.
pub struct Resp<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: M::Response,
- pub(crate) body: BodyData,
+ pub(crate) stream: AttachedStream,
}
impl<M: Message> Resp<M> {
@@ -190,20 +187,20 @@ impl<M: Message> Resp<M> {
Resp {
_phantom: Default::default(),
msg: v,
- body: BodyData::None,
+ stream: AttachedStream::None,
}
}
- pub fn with_fixed_body(self, b: Bytes) -> Self {
+ pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
- body: BodyData::Fixed(b),
+ stream: AttachedStream::Fixed(b),
..self
}
}
- pub fn with_streaming_body(self, b: ByteStream) -> Self {
+ pub fn with_stream(self, b: ByteStream) -> Self {
Self {
- body: BodyData::Stream(b),
+ stream: AttachedStream::Stream(b),
..self
}
}
@@ -217,13 +214,13 @@ impl<M: Message> Resp<M> {
}
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
- (self.msg, self.body.into_stream())
+ (self.msg, self.stream.into_stream())
}
pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
Ok(RespEnc::Success {
msg: rmp_to_vec_all_named(&self.msg)?.into(),
- stream: self.body.into_stream(),
+ stream: self.stream.into_stream(),
})
}
@@ -234,7 +231,7 @@ impl<M: Message> Resp<M> {
Ok(Self {
_phantom: Default::default(),
msg,
- body: stream.map(BodyData::Stream).unwrap_or(BodyData::None),
+ stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
})
}
RespEnc::Error { code, message } => Err(Error::Remote(code, message)),
@@ -249,28 +246,28 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Resp[{:?}", self.msg)?;
- match &self.body {
- BodyData::None => write!(f, "]"),
- BodyData::Fixed(b) => write!(f, "; body={}]", b.len()),
- BodyData::Stream(_) => write!(f, "; body=stream]"),
+ match &self.stream {
+ AttachedStream::None => write!(f, "]"),
+ AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
+ AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
// ----
-pub(crate) enum BodyData {
+pub(crate) enum AttachedStream {
None,
Fixed(Bytes),
Stream(ByteStream),
}
-impl BodyData {
+impl AttachedStream {
pub fn into_stream(self) -> Option<ByteStream> {
match self {
- BodyData::None => None,
- BodyData::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
- BodyData::Stream(s) => Some(s),
+ AttachedStream::None => None,
+ AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
+ AttachedStream::Stream(s) => Some(s),
}
}
}