From 0b71ca12f910c17eaf2291076438dff3b70dc9cd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 12:45:38 +0200 Subject: Clean up framing protocol --- src/endpoint.rs | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) (limited to 'src/endpoint.rs') diff --git a/src/endpoint.rs b/src/endpoint.rs index ff626d8..d8dc6c4 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -158,12 +158,7 @@ pub(crate) type DynEndpoint = Box; #[async_trait] pub(crate) trait GenericEndpoint { - async fn handle( - &self, - buf: &[u8], - stream: ByteStream, - from: NodeID, - ) -> Result<(Vec, Option), Error>; + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -180,30 +175,13 @@ where M: Message + 'static, H: StreamingEndpointHandler + 'static, { - async fn handle( - &self, - buf: &[u8], - stream: ByteStream, - from: NodeID, - ) -> Result<(Vec, Option), Error> { + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result { match self.0.handler.load_full() { None => Err(Error::NoHandler), Some(h) => { - let req = rmp_serde::decode::from_read_ref(buf)?; - let req = Req { - _phantom: Default::default(), - msg: Arc::new(req), - msg_ser: None, - body: BodyData::Stream(stream), - }; + let req = Req::from_enc(req_enc)?; let res = h.handle(req, from).await; - let Resp { - msg, - body, - _phantom, - } = res; - let res_bytes = rmp_to_vec_all_named(&msg)?; - Ok((res_bytes, body.into_stream())) + Ok(res.into_enc()?) } } } -- cgit v1.2.3