diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
commit | 9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch) | |
tree | 855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/client.rs | |
parent | 0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff) | |
download | netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip |
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/src/client.rs b/src/client.rs index 42eeaa3..d51236b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; -use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use futures::io::AsyncReadExt; use kuska_handshake::async_std::{handshake_client, BoxStream}; use tokio::net::TcpStream; @@ -39,7 +38,7 @@ pub(crate) struct ClientConn { query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, ByteStream)>>, next_query_number: AtomicU32, - inflight: Mutex<HashMap<RequestID, oneshot::Sender<UnboundedReceiver<Packet>>>>, + inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>, } impl ClientConn { @@ -175,7 +174,9 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - let _ = old_ch.send(unbounded().1); + let _ = old_ch.send(Box::pin(futures::stream::once(async move { + Err(Error::IdCollision.code()) + }))); } trace!( @@ -199,7 +200,7 @@ impl ClientConn { } } - let resp_enc = RespEnc::decode(Box::pin(stream)).await?; + let resp_enc = RespEnc::decode(stream).await?; trace!("request response {}", id); Resp::from_enc(resp_enc) } @@ -209,7 +210,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) { + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); |