aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-25 15:04:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-25 15:04:52 +0200
commit74e57016f63b6052cf6d539812859c3a46138eee (patch)
tree9e0dd9a7176d8f977b0dac55242c2870966ee90a /src
parent7499721a10d1d9e977024224b9d80d91ce93628b (diff)
downloadnetapp-74e57016f63b6052cf6d539812859c3a46138eee.tar.gz
netapp-74e57016f63b6052cf6d539812859c3a46138eee.zip
Add some debugging
Diffstat (limited to 'src')
-rw-r--r--src/client.rs9
-rw-r--r--src/recv.rs19
-rw-r--r--src/send.rs46
-rw-r--r--src/server.rs7
-rw-r--r--src/stream.rs8
5 files changed, 63 insertions, 26 deletions
diff --git a/src/client.rs b/src/client.rs
index d51236b..2fccdb8 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -179,10 +179,9 @@ impl ClientConn {
})));
}
- trace!(
- "request: query_send {} (serialized message: {} bytes)",
- id,
- req_msg_len
+ debug!(
+ "request: query_send {}, path {}, prio {} (serialized message: {} bytes)",
+ id, path, prio, req_msg_len
);
#[cfg(feature = "telemetry")]
@@ -201,7 +200,7 @@ impl ClientConn {
}
let resp_enc = RespEnc::decode(stream).await?;
- trace!("request response {}", id);
+ debug!("client: got response to request {} (path {})", id, path);
Resp::from_enc(resp_enc)
}
}
diff --git a/src/recv.rs b/src/recv.rs
index e748f18..cba42cb 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -59,7 +59,6 @@ pub(crate) trait RecvLoop: Sync + 'static {
{
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop {
- trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await {
Ok(_) => (),
@@ -67,22 +66,31 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()),
};
let id = RequestID::from_be_bytes(header_id);
- trace!("recv_loop: got header id: {:04x}", id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size);
- trace!("recv_loop: got header size: {:04x}", size);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let is_error = (size & ERROR_MARKER) != 0;
let packet = if is_error {
+ trace!(
+ "recv_loop: got id {}, header_size {:04x}, error {}",
+ id,
+ size,
+ size & !ERROR_MARKER
+ );
Err((size & !ERROR_MARKER) as u8)
} else {
let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
- trace!("recv_loop: read {} bytes", next_slice.len());
+ trace!(
+ "recv_loop: got id {}, header_size {:04x}, {} bytes",
+ id,
+ size,
+ next_slice.len()
+ );
Ok(Bytes::from(next_slice))
};
@@ -90,6 +98,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
send
} else {
let (send, recv) = mpsc::channel(4);
+ trace!("recv_loop: id {} is new channel", id);
self.recv_handler(
id,
Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)),
@@ -102,8 +111,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
let _ = sender.send(packet).await;
if has_cont {
+ assert!(!is_error);
streams.insert(id, sender);
} else {
+ trace!("recv_loop: close channel id {}", id);
sender.end();
}
}
diff --git a/src/send.rs b/src/send.rs
index 46c4383..256fe4c 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
type Output = (RequestID, DataFrame);
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
- for i in 0..self.queue.items.len() {
- let (_prio, items_at_prio) = &mut self.queue.items[i];
-
+ for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() {
let mut ready_item = None;
for (j, item) in items_at_prio.iter_mut().enumerate() {
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
match Pin::new(&mut item_reader).poll(ctx) {
Poll::Pending => (),
Poll::Ready(ready_v) => {
- ready_item = Some((j, ready_v, item.data.eos()));
+ ready_item = Some((j, ready_v));
break;
}
}
}
- if let Some((j, bytes_or_err, eos)) = ready_item {
+ if let Some((j, bytes_or_err)) = ready_item {
+ let item = items_at_prio.remove(j).unwrap();
+ let id = item.id;
+ let eos = item.data.eos();
+
let data_frame = match bytes_or_err {
- Ok(bytes) => DataFrame::Data(bytes, !eos),
+ Ok(bytes) => {
+ trace!(
+ "send queue poll next ready: id {} eos {:?} bytes {}",
+ id,
+ eos,
+ bytes.len()
+ );
+ DataFrame::Data(bytes, !eos)
+ }
Err(e) => DataFrame::Error(match e {
- ReadExactError::Stream(code) => code,
+ ReadExactError::Stream(code) => {
+ trace!(
+ "send queue poll next ready: id {} eos {:?} ERROR {}",
+ id,
+ eos,
+ code
+ );
+ code
+ }
_ => unreachable!(),
}),
};
- let item = items_at_prio.remove(j).unwrap();
- let id = item.id;
- if !eos {
+
+ if !eos && !matches!(data_frame, DataFrame::Error(_)) {
items_at_prio.push_back(item);
} else if items_at_prio.is_empty() {
self.queue.items.remove(i);
}
+
return Poll::Ready((id, data_frame));
}
}
@@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync {
match futures::future::select(recv_fut, send_fut).await {
Either::Left((sth, _send_fut)) => {
if let Some((id, prio, data)) = sth {
+ trace!("send_loop: add stream {} to send", id);
sending.push(SendQueueItem {
id,
prio,
@@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync {
};
}
Either::Right(((id, data), _recv_fut)) => {
- trace!("send_loop: sending bytes for {}", id);
+ trace!(
+ "send_loop: id {}, send {} bytes, header_size {}",
+ id,
+ data.data().len(),
+ hex::encode(data.header())
+ );
let header_id = RequestID::to_be_bytes(id);
write.write_all(&header_id[..]).await?;
diff --git a/src/server.rs b/src/server.rs
index 4b232af..57062d8 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
-use log::{debug, trace};
+use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use kuska_handshake::async_std::{handshake_server, BoxStream};
@@ -175,7 +175,8 @@ impl RecvLoop for ServerConn {
let self2 = self.clone();
tokio::spawn(async move {
- trace!("ServerConn recv_handler {}", id);
+ debug!("server: recv_handler got {}", id);
+
let (prio, resp_enc) = match ReqEnc::decode(stream).await {
Ok(req_enc) => {
let prio = req_enc.prio;
@@ -192,7 +193,7 @@ impl RecvLoop for ServerConn {
Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)),
};
- trace!("ServerConn sending response to {}: ", id);
+ debug!("server: sending response to {}", id);
resp_send
.send((id, prio, resp_enc.encode()))
diff --git a/src/stream.rs b/src/stream.rs
index beb6b9c..5ba2ed4 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -93,7 +93,7 @@ impl ByteStreamReader {
}
pub fn eos(&self) -> bool {
- self.buf.is_empty() && self.eos
+ self.buf_len == 0 && self.eos
}
fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
@@ -164,8 +164,10 @@ impl<'a> Future for ByteStreamReadExact<'a> {
match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
Some(Ok(slice)) => {
- this.reader.buf_len += slice.len();
- this.reader.buf.push_back(slice);
+ if !slice.is_empty() {
+ this.reader.buf_len += slice.len();
+ this.reader.buf.push_back(slice);
+ }
}
Some(Err(e)) => {
this.reader.err = Some(e);