aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-26 12:11:48 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-26 12:11:48 +0200
commitb55f61c38b01da01314d99ced543aba713dbd2a9 (patch)
tree401e95bbb9db8e0454dc31f1af01ef41139c87c8
parentbdf7d4731dcd2e9b523758272fdc41b374044a9f (diff)
downloadnetapp-b55f61c38b01da01314d99ced543aba713dbd2a9.tar.gz
netapp-b55f61c38b01da01314d99ced543aba713dbd2a9.zip
Fix things going wrong when sending chan is closed
-rw-r--r--examples/fullmesh.rs6
-rw-r--r--src/recv.rs7
-rw-r--r--src/send.rs36
3 files changed, 34 insertions, 15 deletions
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
index 82e45c3..972bec0 100644
--- a/examples/fullmesh.rs
+++ b/examples/fullmesh.rs
@@ -125,7 +125,7 @@ impl Example {
async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
let mut i = 12000;
while !*must_exit.borrow() {
- tokio::time::sleep(Duration::from_secs(7)).await;
+ tokio::time::sleep(Duration::from_secs(2)).await;
let peers = self.fullmesh.get_peer_list();
for p in peers.iter() {
@@ -144,7 +144,7 @@ impl Example {
);
let stream =
Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move {
- tokio::time::sleep(Duration::from_millis(100)).await;
+ tokio::time::sleep(Duration::from_millis(500)).await;
Ok(Bytes::from(vec![(x % 256) as u8; 133 * x]))
}));
match self2
@@ -196,7 +196,7 @@ impl StreamingEndpointHandler<ExampleMessage> for Example {
"Handler: stream got bytes {:?}",
x.as_ref().map(|b| b.len())
);
- tokio::time::sleep(Duration::from_millis(100)).await;
+ tokio::time::sleep(Duration::from_millis(300)).await;
Ok(Bytes::from(vec![
10u8;
x.map(|b| b.len()).unwrap_or(1422) * 2
diff --git a/src/recv.rs b/src/recv.rs
index cba42cb..4d1047b 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
-use log::trace;
+use log::*;
use futures::AsyncReadExt;
use tokio::sync::mpsc;
@@ -59,6 +59,11 @@ pub(crate) trait RecvLoop: Sync + 'static {
{
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop {
+ debug!(
+ "Receiving: {:?}",
+ streams.iter().map(|(id, _)| id).collect::<Vec<_>>()
+ );
+
let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await {
Ok(_) => (),
diff --git a/src/send.rs b/src/send.rs
index 256fe4c..fd415c6 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -5,7 +5,7 @@ use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
-use log::trace;
+use log::*;
use futures::AsyncWriteExt;
use kuska_handshake::async_std::BoxStreamWrite;
@@ -172,24 +172,38 @@ impl DataFrame {
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
- mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>,
+ msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>,
mut write: BoxStreamWrite<W>,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin + Send + Sync,
{
let mut sending = SendQueue::new();
- let mut should_exit = false;
- while !should_exit || !sending.is_empty() {
- let recv_fut = msg_recv.recv();
- futures::pin_mut!(recv_fut);
+ let mut msg_recv = Some(msg_recv);
+ while msg_recv.is_some() || !sending.is_empty() {
+ debug!(
+ "Sending: {:?}",
+ sending
+ .items
+ .iter()
+ .map(|(_, i)| i.iter().map(|x| x.id))
+ .flatten()
+ .collect::<Vec<_>>()
+ );
+
+ let recv_fut = async {
+ if let Some(chan) = &mut msg_recv {
+ chan.recv().await
+ } else {
+ futures::future::pending().await
+ }
+ };
let send_fut = sending.next_ready();
// recv_fut is cancellation-safe according to tokio doc,
// send_fut is cancellation-safe as implemented above?
- use futures::future::Either;
- match futures::future::select(recv_fut, send_fut).await {
- Either::Left((sth, _send_fut)) => {
+ tokio::select! {
+ sth = recv_fut => {
if let Some((id, prio, data)) = sth {
trace!("send_loop: add stream {} to send", id);
sending.push(SendQueueItem {
@@ -198,10 +212,10 @@ pub(crate) trait SendLoop: Sync {
data: ByteStreamReader::new(data),
});
} else {
- should_exit = true;
+ msg_recv = None;
};
}
- Either::Right(((id, data), _recv_fut)) => {
+ (id, data) = send_fut => {
trace!(
"send_loop: id {}, send {} bytes, header_size {}",
id,