diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 12:45:38 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 12:45:38 +0200 |
commit | 0b71ca12f910c17eaf2291076438dff3b70dc9cd (patch) | |
tree | 28c4239938b1bd585052c9a1b8b6a752b9c3bbe0 /src/endpoint.rs | |
parent | c358fe3c92da8a8454e461484737efe2a14dfd73 (diff) | |
download | netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.tar.gz netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.zip |
Clean up framing protocol
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r-- | src/endpoint.rs | 30 |
1 files changed, 4 insertions, 26 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index ff626d8..d8dc6c4 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -158,12 +158,7 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; #[async_trait] pub(crate) trait GenericEndpoint { - async fn handle( - &self, - buf: &[u8], - stream: ByteStream, - from: NodeID, - ) -> Result<(Vec<u8>, Option<ByteStream>), Error>; + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -180,30 +175,13 @@ where M: Message + 'static, H: StreamingEndpointHandler<M> + 'static, { - async fn handle( - &self, - buf: &[u8], - stream: ByteStream, - from: NodeID, - ) -> Result<(Vec<u8>, Option<ByteStream>), Error> { + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { match self.0.handler.load_full() { None => Err(Error::NoHandler), Some(h) => { - let req = rmp_serde::decode::from_read_ref(buf)?; - let req = Req { - _phantom: Default::default(), - msg: Arc::new(req), - msg_ser: None, - body: BodyData::Stream(stream), - }; + let req = Req::from_enc(req_enc)?; let res = h.handle(req, from).await; - let Resp { - msg, - body, - _phantom, - } = res; - let res_bytes = rmp_to_vec_all_named(&msg)?; - Ok((res_bytes, body.into_stream())) + Ok(res.into_enc()?) } } } |