diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 14:23:10 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 14:23:10 +0200 |
commit | 22d96929d5416750e1f5889ee6cc16b382293104 (patch) | |
tree | 7b9407132ffa02fa5c7e9f6040f90f0e071b4bf3 /src/recv.rs | |
parent | 4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e (diff) | |
parent | d75146fb8157dd03c156e5f7ce4834fa1d72b581 (diff) | |
download | netapp-22d96929d5416750e1f5889ee6cc16b382293104.tar.gz netapp-22d96929d5416750e1f5889ee6cc16b382293104.zip |
Merge branch 'fix-ping' into stream-body
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/recv.rs b/src/recv.rs index b5289fb..ac93c4b 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -54,14 +54,15 @@ impl Drop for Sender { pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); - async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> + async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut streams: HashMap<RequestID, Sender> = HashMap::new(); loop { - debug!( - "Receiving: {:?}", + trace!( + "recv_loop({}): in_progress = {:?}", + debug_name, streams.iter().map(|(id, _)| id).collect::<Vec<_>>() ); @@ -87,11 +88,12 @@ pub(crate) trait RecvLoop: Sync + 'static { let kind = u8_to_io_errorkind(next_slice[0]); let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>"); - debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg); + debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg); Some(Err(std::io::Error::new(kind, msg.to_string()))) } else { trace!( - "recv_loop: got id {}, size {}, has_cont {}", + "recv_loop({}): got id {}, size {}, has_cont {}", + debug_name, id, size, has_cont @@ -107,7 +109,7 @@ pub(crate) trait RecvLoop: Sync + 'static { send } else { let (send, recv) = mpsc::unbounded_channel(); - trace!("recv_loop: id {} is new channel", id); + trace!("recv_loop({}): id {} is new channel", debug_name, id); self.recv_handler( id, Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)), @@ -126,7 +128,7 @@ pub(crate) trait RecvLoop: Sync + 'static { assert!(!is_error); streams.insert(id, sender); } else { - trace!("recv_loop: close channel id {}", id); + trace!("recv_loop({}): close channel id {}", debug_name, id); sender.end(); } } |