aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
commit9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch)
tree855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/client.rs
parent0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff)
downloadnetapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz
netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs11
1 files changed, 6 insertions, 5 deletions
diff --git a/src/client.rs b/src/client.rs
index 42eeaa3..d51236b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -8,7 +8,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use log::{debug, error, trace};
-use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::io::AsyncReadExt;
use kuska_handshake::async_std::{handshake_client, BoxStream};
use tokio::net::TcpStream;
@@ -39,7 +38,7 @@ pub(crate) struct ClientConn {
query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, ByteStream)>>,
next_query_number: AtomicU32,
- inflight: Mutex<HashMap<RequestID, oneshot::Sender<UnboundedReceiver<Packet>>>>,
+ inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
}
impl ClientConn {
@@ -175,7 +174,9 @@ impl ClientConn {
error!(
"Too many inflight requests! RequestID collision. Interrupting previous request."
);
- let _ = old_ch.send(unbounded().1);
+ let _ = old_ch.send(Box::pin(futures::stream::once(async move {
+ Err(Error::IdCollision.code())
+ })));
}
trace!(
@@ -199,7 +200,7 @@ impl ClientConn {
}
}
- let resp_enc = RespEnc::decode(Box::pin(stream)).await?;
+ let resp_enc = RespEnc::decode(stream).await?;
trace!("request response {}", id);
Resp::from_enc(resp_enc)
}
@@ -209,7 +210,7 @@ impl SendLoop for ClientConn {}
#[async_trait]
impl RecvLoop for ClientConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
trace!("ClientConn recv_handler {}", id);
let mut inflight = self.inflight.lock().unwrap();