aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:01:52 +0200
commit9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc (patch)
tree855113a2db9414eabf0f1c2402942ad9bd09fea8 /src/recv.rs
parent0b71ca12f910c17eaf2291076438dff3b70dc9cd (diff)
downloadnetapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.tar.gz
netapp-9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc.zip
Use bounded channels on receive side for backpressure
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs38
1 files changed, 18 insertions, 20 deletions
diff --git a/src/recv.rs b/src/recv.rs
index 19288f2..b2f5530 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -5,8 +5,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use log::trace;
-use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::AsyncReadExt;
+use tokio::sync::mpsc;
use crate::error::*;
use crate::send::*;
@@ -15,33 +15,28 @@ use crate::stream::*;
/// Structure to warn when the sender is dropped before end of stream was reached, like when
/// connection to some remote drops while transmitting data
struct Sender {
- inner: UnboundedSender<Packet>,
- closed: bool,
+ inner: Option<mpsc::Sender<Packet>>,
}
impl Sender {
- fn new(inner: UnboundedSender<Packet>) -> Self {
- Sender {
- inner,
- closed: false,
- }
+ fn new(inner: mpsc::Sender<Packet>) -> Self {
+ Sender { inner: Some(inner) }
}
- fn send(&self, packet: Packet) {
- let _ = self.inner.unbounded_send(packet);
+ async fn send(&self, packet: Packet) {
+ let _ = self.inner.as_ref().unwrap().send(packet).await;
}
fn end(&mut self) {
- self.closed = true;
+ self.inner = None;
}
}
impl Drop for Sender {
fn drop(&mut self) {
- if !self.closed {
- self.send(Err(255));
+ if let Some(inner) = self.inner.take() {
+ let _ = inner.blocking_send(Err(255));
}
- self.inner.close_channel();
}
}
@@ -54,7 +49,7 @@ impl Drop for Sender {
/// the full message is passed to the receive handler.
#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
- fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>);
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
where
@@ -92,14 +87,17 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut sender = if let Some(send) = streams.remove(&(id)) {
send
} else {
- let (send, recv) = unbounded();
- self.recv_handler(id, recv);
+ let (send, recv) = mpsc::channel(4);
+ self.recv_handler(
+ id,
+ Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)),
+ );
Sender::new(send)
};
- // if we get an error, the receiving end is disconnected. We still need to
- // reach eos before dropping this sender
- sender.send(packet);
+ // If we get an error, the receiving end is disconnected.
+ // We still need to reach eos before dropping this sender
+ let _ = sender.send(packet).await;
if has_cont {
streams.insert(id, sender);