aboutsummaryrefslogtreecommitdiff
path: root/src/send.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-25 15:04:52 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-25 15:04:52 +0200
commit74e57016f63b6052cf6d539812859c3a46138eee (patch)
tree9e0dd9a7176d8f977b0dac55242c2870966ee90a /src/send.rs
parent7499721a10d1d9e977024224b9d80d91ce93628b (diff)
downloadnetapp-74e57016f63b6052cf6d539812859c3a46138eee.tar.gz
netapp-74e57016f63b6052cf6d539812859c3a46138eee.zip
Add some debugging
Diffstat (limited to 'src/send.rs')
-rw-r--r--src/send.rs46
1 files changed, 35 insertions, 11 deletions
diff --git a/src/send.rs b/src/send.rs
index 46c4383..256fe4c 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
type Output = (RequestID, DataFrame);
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
- for i in 0..self.queue.items.len() {
- let (_prio, items_at_prio) = &mut self.queue.items[i];
-
+ for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() {
let mut ready_item = None;
for (j, item) in items_at_prio.iter_mut().enumerate() {
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
match Pin::new(&mut item_reader).poll(ctx) {
Poll::Pending => (),
Poll::Ready(ready_v) => {
- ready_item = Some((j, ready_v, item.data.eos()));
+ ready_item = Some((j, ready_v));
break;
}
}
}
- if let Some((j, bytes_or_err, eos)) = ready_item {
+ if let Some((j, bytes_or_err)) = ready_item {
+ let item = items_at_prio.remove(j).unwrap();
+ let id = item.id;
+ let eos = item.data.eos();
+
let data_frame = match bytes_or_err {
- Ok(bytes) => DataFrame::Data(bytes, !eos),
+ Ok(bytes) => {
+ trace!(
+ "send queue poll next ready: id {} eos {:?} bytes {}",
+ id,
+ eos,
+ bytes.len()
+ );
+ DataFrame::Data(bytes, !eos)
+ }
Err(e) => DataFrame::Error(match e {
- ReadExactError::Stream(code) => code,
+ ReadExactError::Stream(code) => {
+ trace!(
+ "send queue poll next ready: id {} eos {:?} ERROR {}",
+ id,
+ eos,
+ code
+ );
+ code
+ }
_ => unreachable!(),
}),
};
- let item = items_at_prio.remove(j).unwrap();
- let id = item.id;
- if !eos {
+
+ if !eos && !matches!(data_frame, DataFrame::Error(_)) {
items_at_prio.push_back(item);
} else if items_at_prio.is_empty() {
self.queue.items.remove(i);
}
+
return Poll::Ready((id, data_frame));
}
}
@@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync {
match futures::future::select(recv_fut, send_fut).await {
Either::Left((sth, _send_fut)) => {
if let Some((id, prio, data)) = sth {
+ trace!("send_loop: add stream {} to send", id);
sending.push(SendQueueItem {
id,
prio,
@@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync {
};
}
Either::Right(((id, data), _recv_fut)) => {
- trace!("send_loop: sending bytes for {}", id);
+ trace!(
+ "send_loop: id {}, send {} bytes, header_size {}",
+ id,
+ data.data().len(),
+ hex::encode(data.header())
+ );
let header_id = RequestID::to_be_bytes(id);
write.write_all(&header_id[..]).await?;