diff options
-rw-r--r-- | src/conn.rs | 4 | ||||
-rw-r--r-- | src/proto.rs | 8 |
2 files changed, 8 insertions, 4 deletions
diff --git a/src/conn.rs b/src/conn.rs index 89bf654..7ba15f9 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -112,6 +112,8 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) { + debug!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); let prio = bytes[0]; @@ -265,6 +267,8 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) { + debug!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); + let mut inflight = self.inflight.lock().unwrap(); if let Some(ch) = inflight.remove(&id) { if ch.send(msg).is_err() { diff --git a/src/proto.rs b/src/proto.rs index d90042f..3e9fe20 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -81,7 +81,7 @@ impl SendQueue { if !items_at_prio.is_empty() { self.items.insert(prio, items_at_prio); } - ret + ret.or_else(|| self.pop()) } } } @@ -139,7 +139,7 @@ pub(crate) trait SendLoop: Sync { write.write_all(&item.data[item.cursor..]).await?; } - write.flush().await.log_err("Could not flush in send_loop"); + write.flush().await?; } else { let sth = msg_recv .recv() @@ -182,14 +182,14 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut header_size = [0u8; 2]; read.read_exact(&mut header_size[..]).await?; let size = RequestID::from_be_bytes(header_size); - trace!("recv_loop: got header size: {:04x}", id); + trace!("recv_loop: got header size: {:04x}", size); let has_cont = (size & 0x8000) != 0; let size = size & !0x8000; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; - trace!("recv_loop: read {} bytes", size); + trace!("recv_loop: read {} bytes", next_slice.len()); let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]); msg_bytes.extend_from_slice(&next_slice[..]); |