diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/src/send.rs b/src/send.rs index 256fe4c..fd415c6 100644 --- a/src/send.rs +++ b/src/send.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; -use log::trace; +use log::*; use futures::AsyncWriteExt; use kuska_handshake::async_std::BoxStreamWrite; @@ -172,24 +172,38 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, - mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, + msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, mut write: BoxStreamWrite<W>, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); - let mut should_exit = false; - while !should_exit || !sending.is_empty() { - let recv_fut = msg_recv.recv(); - futures::pin_mut!(recv_fut); + let mut msg_recv = Some(msg_recv); + while msg_recv.is_some() || !sending.is_empty() { + debug!( + "Sending: {:?}", + sending + .items + .iter() + .map(|(_, i)| i.iter().map(|x| x.id)) + .flatten() + .collect::<Vec<_>>() + ); + + let recv_fut = async { + if let Some(chan) = &mut msg_recv { + chan.recv().await + } else { + futures::future::pending().await + } + }; let send_fut = sending.next_ready(); // recv_fut is cancellation-safe according to tokio doc, // send_fut is cancellation-safe as implemented above? - use futures::future::Either; - match futures::future::select(recv_fut, send_fut).await { - Either::Left((sth, _send_fut)) => { + tokio::select! { + sth = recv_fut => { if let Some((id, prio, data)) = sth { trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { @@ -198,10 +212,10 @@ pub(crate) trait SendLoop: Sync { data: ByteStreamReader::new(data), }); } else { - should_exit = true; + msg_recv = None; }; } - Either::Right(((id, data), _recv_fut)) => { + (id, data) = send_fut => { trace!( "send_loop: id {}, send {} bytes, header_size {}", id, |