diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 13:01:52 +0200 |
commit | 9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch) | |
tree | 855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/recv.rs | |
parent | 0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff) | |
download | netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip |
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 38 |
1 files changed, 18 insertions, 20 deletions
diff --git a/src/recv.rs b/src/recv.rs index 19288f2..b2f5530 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -5,8 +5,8 @@ use async_trait::async_trait; use bytes::Bytes; use log::trace; -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::AsyncReadExt; +use tokio::sync::mpsc; use crate::error::*; use crate::send::*; @@ -15,33 +15,28 @@ use crate::stream::*; /// Structure to warn when the sender is dropped before end of stream was reached, like when /// connection to some remote drops while transmitting data struct Sender { - inner: UnboundedSender<Packet>, - closed: bool, + inner: Option<mpsc::Sender<Packet>>, } impl Sender { - fn new(inner: UnboundedSender<Packet>) -> Self { - Sender { - inner, - closed: false, - } + fn new(inner: mpsc::Sender<Packet>) -> Self { + Sender { inner: Some(inner) } } - fn send(&self, packet: Packet) { - let _ = self.inner.unbounded_send(packet); + async fn send(&self, packet: Packet) { + let _ = self.inner.as_ref().unwrap().send(packet).await; } fn end(&mut self) { - self.closed = true; + self.inner = None; } } impl Drop for Sender { fn drop(&mut self) { - if !self.closed { - self.send(Err(255)); + if let Some(inner) = self.inner.take() { + let _ = inner.blocking_send(Err(255)); } - self.inner.close_channel(); } } @@ -54,7 +49,7 @@ impl Drop for Sender { /// the full message is passed to the receive handler. #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { - fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>); + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> where @@ -92,14 +87,17 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut sender = if let Some(send) = streams.remove(&(id)) { send } else { - let (send, recv) = unbounded(); - self.recv_handler(id, recv); + let (send, recv) = mpsc::channel(4); + self.recv_handler( + id, + Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), + ); Sender::new(send) }; - // if we get an error, the receiving end is disconnected. We still need to - // reach eos before dropping this sender - sender.send(packet); + // If we get an error, the receiving end is disconnected. + // We still need to reach eos before dropping this sender + let _ = sender.send(packet).await; if has_cont { streams.insert(id, sender); |