diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 09:45:24 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 09:45:24 +0200 |
commit | 3fd30c6e280fba41377c8b563352d756e8bc1caf (patch) | |
tree | 2a544ae196e8b321e8cc032fee54355f9d873ac0 /src/recv.rs | |
parent | 2c9d595da03ae7a95e962cea78e68afff7410cc5 (diff) | |
download | netapp-3fd30c6e280fba41377c8b563352d756e8bc1caf.tar.gz netapp-3fd30c6e280fba41377c8b563352d756e8bc1caf.zip |
recv side: use unbounded channel to remove deadlock
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 18 |
1 files changed, 8 insertions, 10 deletions
diff --git a/src/recv.rs b/src/recv.rs index 4d1047b..3bea709 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -15,16 +15,16 @@ use crate::stream::*; /// Structure to warn when the sender is dropped before end of stream was reached, like when /// connection to some remote drops while transmitting data struct Sender { - inner: Option<mpsc::Sender<Packet>>, + inner: Option<mpsc::UnboundedSender<Packet>>, } impl Sender { - fn new(inner: mpsc::Sender<Packet>) -> Self { + fn new(inner: mpsc::UnboundedSender<Packet>) -> Self { Sender { inner: Some(inner) } } - async fn send(&self, packet: Packet) { - let _ = self.inner.as_ref().unwrap().send(packet).await; + fn send(&self, packet: Packet) { + let _ = self.inner.as_ref().unwrap().send(packet); } fn end(&mut self) { @@ -35,9 +35,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - tokio::spawn(async move { - let _ = inner.send(Err(255)).await; - }); + let _ = inner.send(Err(255)); } } } @@ -102,18 +100,18 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut sender = if let Some(send) = streams.remove(&(id)) { send } else { - let (send, recv) = mpsc::channel(4); + let (send, recv) = mpsc::unbounded_channel(); trace!("recv_loop: id {} is new channel", id); self.recv_handler( id, - Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), + Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)), ); Sender::new(send) }; // If we get an error, the receiving end is disconnected. // We still need to reach eos before dropping this sender - let _ = sender.send(packet).await; + let _ = sender.send(packet); if has_cont { assert!(!is_error); |