diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/src/send.rs b/src/send.rs index 46c4383..256fe4c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { type Output = (RequestID, DataFrame); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { - for i in 0..self.queue.items.len() { - let (_prio, items_at_prio) = &mut self.queue.items[i]; - + for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() { let mut ready_item = None; for (j, item) in items_at_prio.iter_mut().enumerate() { let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); match Pin::new(&mut item_reader).poll(ctx) { Poll::Pending => (), Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v, item.data.eos())); + ready_item = Some((j, ready_v)); break; } } } - if let Some((j, bytes_or_err, eos)) = ready_item { + if let Some((j, bytes_or_err)) = ready_item { + let item = items_at_prio.remove(j).unwrap(); + let id = item.id; + let eos = item.data.eos(); + let data_frame = match bytes_or_err { - Ok(bytes) => DataFrame::Data(bytes, !eos), + Ok(bytes) => { + trace!( + "send queue poll next ready: id {} eos {:?} bytes {}", + id, + eos, + bytes.len() + ); + DataFrame::Data(bytes, !eos) + } Err(e) => DataFrame::Error(match e { - ReadExactError::Stream(code) => code, + ReadExactError::Stream(code) => { + trace!( + "send queue poll next ready: id {} eos {:?} ERROR {}", + id, + eos, + code + ); + code + } _ => unreachable!(), }), }; - let item = items_at_prio.remove(j).unwrap(); - let id = item.id; - if !eos { + + if !eos && !matches!(data_frame, DataFrame::Error(_)) { items_at_prio.push_back(item); } else if items_at_prio.is_empty() { self.queue.items.remove(i); } + return Poll::Ready((id, data_frame)); } } @@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync { match futures::future::select(recv_fut, send_fut).await { Either::Left((sth, _send_fut)) => { if let Some((id, prio, data)) = sth { + trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { id, prio, @@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync { }; } Either::Right(((id, data), _recv_fut)) => { - trace!("send_loop: sending bytes for {}", id); + trace!( + "send_loop: id {}, send {} bytes, header_size {}", + id, + data.data().len(), + hex::encode(data.header()) + ); let header_id = RequestID::to_be_bytes(id); write.write_all(&header_id[..]).await?; |