aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-07 16:00:12 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-07 16:00:12 +0100
commit32a0fbcbd919ec45bb6380352190115f701f2c91 (patch)
treefa27de4b69598767e25f884ae8b27b7709db2a5e
parent5a9ae8615ee616b11460a046deaa6981b10d69ab (diff)
downloadnetapp-32a0fbcbd919ec45bb6380352190115f701f2c91.tar.gz
netapp-32a0fbcbd919ec45bb6380352190115f701f2c91.zip
Maybe fix something? Idk
-rw-r--r--src/conn.rs4
-rw-r--r--src/proto.rs8
2 files changed, 8 insertions, 4 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 89bf654..7ba15f9 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -112,6 +112,8 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) {
+ debug!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
+
let bytes: Bytes = bytes.into();
let prio = bytes[0];
@@ -265,6 +267,8 @@ impl SendLoop for ClientConn {}
#[async_trait]
impl RecvLoop for ClientConn {
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) {
+ debug!("ClientConn recv_handler {} ({} bytes)", id, msg.len());
+
let mut inflight = self.inflight.lock().unwrap();
if let Some(ch) = inflight.remove(&id) {
if ch.send(msg).is_err() {
diff --git a/src/proto.rs b/src/proto.rs
index d90042f..3e9fe20 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -81,7 +81,7 @@ impl SendQueue {
if !items_at_prio.is_empty() {
self.items.insert(prio, items_at_prio);
}
- ret
+ ret.or_else(|| self.pop())
}
}
}
@@ -139,7 +139,7 @@ pub(crate) trait SendLoop: Sync {
write.write_all(&item.data[item.cursor..]).await?;
}
- write.flush().await.log_err("Could not flush in send_loop");
+ write.flush().await?;
} else {
let sth = msg_recv
.recv()
@@ -182,14 +182,14 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut header_size = [0u8; 2];
read.read_exact(&mut header_size[..]).await?;
let size = RequestID::from_be_bytes(header_size);
- trace!("recv_loop: got header size: {:04x}", id);
+ trace!("recv_loop: got header size: {:04x}", size);
let has_cont = (size & 0x8000) != 0;
let size = size & !0x8000;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
- trace!("recv_loop: read {} bytes", size);
+ trace!("recv_loop: read {} bytes", next_slice.len());
let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]);
msg_bytes.extend_from_slice(&next_slice[..]);