aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
commit9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch)
tree855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/server.rs
parent0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff)
downloadnetapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz
netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/server.rs b/src/server.rs
index ae1196c..4b232af 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -5,7 +5,6 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use log::{debug, trace};
-use futures::channel::mpsc::UnboundedReceiver;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use kuska_handshake::async_std::{handshake_server, BoxStream};
use tokio::net::TcpStream;
@@ -171,21 +170,24 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
let resp_send = self.resp_send.load_full().unwrap();
let self2 = self.clone();
tokio::spawn(async move {
trace!("ServerConn recv_handler {}", id);
- let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await {
+ let (prio, resp_enc) = match ReqEnc::decode(stream).await {
Ok(req_enc) => {
let prio = req_enc.prio;
let resp = self2.recv_handler_aux(req_enc).await;
- (prio, match resp {
- Ok(resp_enc) => resp_enc,
- Err(e) => RespEnc::from_err(e),
- })
+ (
+ prio,
+ match resp {
+ Ok(resp_enc) => resp_enc,
+ Err(e) => RespEnc::from_err(e),
+ },
+ )
}
Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)),
};