diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
commit | 9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch) | |
tree | 855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/server.rs | |
parent | 0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff) | |
download | netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip |
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/server.rs b/src/server.rs index ae1196c..4b232af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,7 +5,6 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use log::{debug, trace}; -use futures::channel::mpsc::UnboundedReceiver; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; use tokio::net::TcpStream; @@ -171,21 +170,24 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) { + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); tokio::spawn(async move { trace!("ServerConn recv_handler {}", id); - let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await { + let (prio, resp_enc) = match ReqEnc::decode(stream).await { Ok(req_enc) => { let prio = req_enc.prio; let resp = self2.recv_handler_aux(req_enc).await; - (prio, match resp { - Ok(resp_enc) => resp_enc, - Err(e) => RespEnc::from_err(e), - }) + ( + prio, + match resp { + Ok(resp_enc) => resp_enc, + Err(e) => RespEnc::from_err(e), + }, + ) } Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; |