aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/send.rs22
1 files changed, 17 insertions, 5 deletions
diff --git a/src/send.rs b/src/send.rs
index 780bbcf..2606434 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -59,6 +59,7 @@ struct SendQueueItem {
prio: RequestPriority,
order_tag: Option<OrderTag>,
data: ByteStreamReader,
+ sent: usize,
}
impl SendQueue {
@@ -106,7 +107,7 @@ impl SendQueuePriority {
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
order_vec.insert(i, order);
}
- self.items.push_back(item);
+ self.items.push_front(item);
}
fn remove(&mut self, id: RequestID) {
if let Some(i) = self.items.iter().position(|x| x.id == id) {
@@ -139,7 +140,11 @@ impl SendQueuePriority {
_ => unreachable!(),
});
- if eos || packet.is_err() {
+ let is_err = packet.is_err();
+ let data_frame = DataFrame::from_packet(packet, !eos);
+ item.sent += data_frame.data().len();
+
+ if eos || is_err {
if let Some(OrderTag(stream, order)) = item.order_tag {
assert_eq!(
self.order.get_mut(&stream).unwrap().pop_front(),
@@ -147,10 +152,16 @@ impl SendQueuePriority {
)
}
self.items.remove(j);
+ } else {
+ for k in j..self.items.len() - 1 {
+ if self.items[k].sent >= self.items[k + 1].sent {
+ self.items.swap(k, k + 1);
+ } else {
+ break;
+ }
+ }
}
- let data_frame = DataFrame::from_packet(packet, !eos);
-
return Poll::Ready((id, data_frame));
}
}
@@ -160,7 +171,7 @@ impl SendQueuePriority {
fn dump(&self, prio: u8) -> String {
self.items
.iter()
- .map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag))
+ .map(|i| format!("[{} {} {:?} @{}]", prio, i.id, i.order_tag, i.sent))
.collect::<Vec<_>>()
.join(" ")
}
@@ -294,6 +305,7 @@ pub(crate) trait SendLoop: Sync {
prio,
order_tag,
data: ByteStreamReader::new(data),
+ sent: 0,
})
}
Some(SendItem::Cancel(id)) => {