aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 09:45:24 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 09:45:24 +0200
commit3fd30c6e280fba41377c8b563352d756e8bc1caf (patch)
tree2a544ae196e8b321e8cc032fee54355f9d873ac0 /src/recv.rs
parent2c9d595da03ae7a95e962cea78e68afff7410cc5 (diff)
downloadnetapp-3fd30c6e280fba41377c8b563352d756e8bc1caf.tar.gz
netapp-3fd30c6e280fba41377c8b563352d756e8bc1caf.zip
recv side: use unbounded channel to remove deadlock
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs18
1 files changed, 8 insertions, 10 deletions
diff --git a/src/recv.rs b/src/recv.rs
index 4d1047b..3bea709 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -15,16 +15,16 @@ use crate::stream::*;
/// Structure to warn when the sender is dropped before end of stream was reached, like when
/// connection to some remote drops while transmitting data
struct Sender {
- inner: Option<mpsc::Sender<Packet>>,
+ inner: Option<mpsc::UnboundedSender<Packet>>,
}
impl Sender {
- fn new(inner: mpsc::Sender<Packet>) -> Self {
+ fn new(inner: mpsc::UnboundedSender<Packet>) -> Self {
Sender { inner: Some(inner) }
}
- async fn send(&self, packet: Packet) {
- let _ = self.inner.as_ref().unwrap().send(packet).await;
+ fn send(&self, packet: Packet) {
+ let _ = self.inner.as_ref().unwrap().send(packet);
}
fn end(&mut self) {
@@ -35,9 +35,7 @@ impl Sender {
impl Drop for Sender {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
- tokio::spawn(async move {
- let _ = inner.send(Err(255)).await;
- });
+ let _ = inner.send(Err(255));
}
}
}
@@ -102,18 +100,18 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut sender = if let Some(send) = streams.remove(&(id)) {
send
} else {
- let (send, recv) = mpsc::channel(4);
+ let (send, recv) = mpsc::unbounded_channel();
trace!("recv_loop: id {} is new channel", id);
self.recv_handler(
id,
- Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)),
+ Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)),
);
Sender::new(send)
};
// If we get an error, the receiving end is disconnected.
// We still need to reach eos before dropping this sender
- let _ = sender.send(packet).await;
+ let _ = sender.send(packet);
if has_cont {
assert!(!is_error);