diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/src/send.rs b/src/send.rs index 780bbcf..2606434 100644 --- a/src/send.rs +++ b/src/send.rs @@ -59,6 +59,7 @@ struct SendQueueItem { prio: RequestPriority, order_tag: Option<OrderTag>, data: ByteStreamReader, + sent: usize, } impl SendQueue { @@ -106,7 +107,7 @@ impl SendQueuePriority { let i = order_vec.iter().take_while(|o2| **o2 < order).count(); order_vec.insert(i, order); } - self.items.push_back(item); + self.items.push_front(item); } fn remove(&mut self, id: RequestID) { if let Some(i) = self.items.iter().position(|x| x.id == id) { @@ -139,7 +140,11 @@ impl SendQueuePriority { _ => unreachable!(), }); - if eos || packet.is_err() { + let is_err = packet.is_err(); + let data_frame = DataFrame::from_packet(packet, !eos); + item.sent += data_frame.data().len(); + + if eos || is_err { if let Some(OrderTag(stream, order)) = item.order_tag { assert_eq!( self.order.get_mut(&stream).unwrap().pop_front(), @@ -147,10 +152,16 @@ impl SendQueuePriority { ) } self.items.remove(j); + } else { + for k in j..self.items.len() - 1 { + if self.items[k].sent >= self.items[k + 1].sent { + self.items.swap(k, k + 1); + } else { + break; + } + } } - let data_frame = DataFrame::from_packet(packet, !eos); - return Poll::Ready((id, data_frame)); } } @@ -160,7 +171,7 @@ impl SendQueuePriority { fn dump(&self, prio: u8) -> String { self.items .iter() - .map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag)) + .map(|i| format!("[{} {} {:?} @{}]", prio, i.id, i.order_tag, i.sent)) .collect::<Vec<_>>() .join(" ") } @@ -294,6 +305,7 @@ pub(crate) trait SendLoop: Sync { prio, order_tag, data: ByteStreamReader::new(data), + sent: 0, }) } Some(SendItem::Cancel(id)) => { |