aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/recv.rs b/src/recv.rs
index b5289fb..ac93c4b 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -54,14 +54,15 @@ impl Drop for Sender {
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
- async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
+ async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where
R: AsyncReadExt + Unpin + Send + Sync,
{
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop {
- debug!(
- "Receiving: {:?}",
+ trace!(
+ "recv_loop({}): in_progress = {:?}",
+ debug_name,
streams.iter().map(|(id, _)| id).collect::<Vec<_>>()
);
@@ -87,11 +88,12 @@ pub(crate) trait RecvLoop: Sync + 'static {
let kind = u8_to_io_errorkind(next_slice[0]);
let msg =
std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
- debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg);
+ debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg);
Some(Err(std::io::Error::new(kind, msg.to_string())))
} else {
trace!(
- "recv_loop: got id {}, size {}, has_cont {}",
+ "recv_loop({}): got id {}, size {}, has_cont {}",
+ debug_name,
id,
size,
has_cont
@@ -107,7 +109,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
send
} else {
let (send, recv) = mpsc::unbounded_channel();
- trace!("recv_loop: id {} is new channel", id);
+ trace!("recv_loop({}): id {} is new channel", debug_name, id);
self.recv_handler(
id,
Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)),
@@ -126,7 +128,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
assert!(!is_error);
streams.insert(id, sender);
} else {
- trace!("recv_loop: close channel id {}", id);
+ trace!("recv_loop({}): close channel id {}", debug_name, id);
sender.end();
}
}