aboutsummaryrefslogtreecommitdiff
path: root/src/endpoint.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 12:45:38 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 12:45:38 +0200
commit0b71ca12f910c17eaf2291076438dff3b70dc9cd (patch)
tree28c4239938b1bd585052c9a1b8b6a752b9c3bbe0 /src/endpoint.rs
parentc358fe3c92da8a8454e461484737efe2a14dfd73 (diff)
downloadnetapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.tar.gz
netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.zip
Clean up framing protocol
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r--src/endpoint.rs30
1 files changed, 4 insertions, 26 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs
index ff626d8..d8dc6c4 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -158,12 +158,7 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub(crate) trait GenericEndpoint {
- async fn handle(
- &self,
- buf: &[u8],
- stream: ByteStream,
- from: NodeID,
- ) -> Result<(Vec<u8>, Option<ByteStream>), Error>;
+ async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
@@ -180,30 +175,13 @@ where
M: Message + 'static,
H: StreamingEndpointHandler<M> + 'static,
{
- async fn handle(
- &self,
- buf: &[u8],
- stream: ByteStream,
- from: NodeID,
- ) -> Result<(Vec<u8>, Option<ByteStream>), Error> {
+ async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
- let req = rmp_serde::decode::from_read_ref(buf)?;
- let req = Req {
- _phantom: Default::default(),
- msg: Arc::new(req),
- msg_ser: None,
- body: BodyData::Stream(stream),
- };
+ let req = Req::from_enc(req_enc)?;
let res = h.handle(req, from).await;
- let Resp {
- msg,
- body,
- _phantom,
- } = res;
- let res_bytes = rmp_to_vec_all_named(&msg)?;
- Ok((res_bytes, body.into_stream()))
+ Ok(res.into_enc()?)
}
}
}