aboutsummaryrefslogtreecommitdiff
path: root/src/net/send.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-03-28 12:40:27 +0000
committerAlex <alex@adnab.me>2024-03-28 12:40:27 +0000
commitecf641d88c264f7278d13a6d988288feb24a5dfe (patch)
tree5cd60dfa4f0d6d32a66d2e32d7912c9e289067c8 /src/net/send.rs
parent75cd14926d8dec8c36289197822df78391686c6a (diff)
parent85f580cbde4913fe8382316ff3c27b8443c61dd7 (diff)
downloadgarage-ecf641d88c264f7278d13a6d988288feb24a5dfe.tar.gz
garage-ecf641d88c264f7278d13a6d988288feb24a5dfe.zip
Merge pull request 'Fix unbounded buffering when one node has slower network' (#792) from fix-buffering into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/792
Diffstat (limited to 'src/net/send.rs')
-rw-r--r--src/net/send.rs85
1 files changed, 45 insertions, 40 deletions
diff --git a/src/net/send.rs b/src/net/send.rs
index 0db0ba77..c60fc6b2 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -109,7 +109,7 @@ impl SendQueuePriority {
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
order_vec.insert(i, order);
}
- self.items.push_front(item);
+ self.items.push_back(item);
}
fn remove(&mut self, id: RequestID) {
if let Some(i) = self.items.iter().position(|x| x.id == id) {
@@ -128,51 +128,56 @@ impl SendQueuePriority {
self.items.is_empty()
}
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
- for (j, item) in self.items.iter_mut().enumerate() {
- if let Some(OrderTag(stream, order)) = item.order_tag {
- if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
+ // as they most likely represent small requests to be sent first
+ // in step 2: poll all streams
+ for step in 0..2 {
+ for (j, item) in self.items.iter_mut().enumerate() {
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ continue;
+ }
+ }
+
+ if step == 0 && item.sent > 0 {
continue;
}
- }
- let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
- if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
- let id = item.id;
- let eos = item.data.eos();
-
- let packet = bytes_or_err.map_err(|e| match e {
- ReadExactError::Stream(err) => err,
- _ => unreachable!(),
- });
-
- let is_err = packet.is_err();
- let data_frame = DataFrame::from_packet(packet, !eos);
- item.sent += data_frame.data().len();
-
- if eos || is_err {
- // If item had an order tag, remove it from the corresponding ordering list
- if let Some(OrderTag(stream, order)) = item.order_tag {
- let order_stream = self.order.get_mut(&stream).unwrap();
- assert_eq!(order_stream.pop_front(), Some(order));
- if order_stream.is_empty() {
- self.order.remove(&stream);
+ let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
+ if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
+ let id = item.id;
+ let eos = item.data.eos();
+
+ let packet = bytes_or_err.map_err(|e| match e {
+ ReadExactError::Stream(err) => err,
+ _ => unreachable!(),
+ });
+
+ let is_err = packet.is_err();
+ let data_frame = DataFrame::from_packet(packet, !eos);
+ item.sent += data_frame.data().len();
+
+ if eos || is_err {
+ // If item had an order tag, remove it from the corresponding ordering list
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ let order_stream = self.order.get_mut(&stream).unwrap();
+ assert_eq!(order_stream.pop_front(), Some(order));
+ if order_stream.is_empty() {
+ self.order.remove(&stream);
+ }
}
+ // Remove item from sending queue
+ self.items.remove(j);
+ } else if step == 0 {
+ // Step 0 means that this stream had not sent any bytes yet.
+ // Now that it has, and it was not an EOS, we know that it is bigger
+ // than one chunk so move it at the end of the queue.
+ let item = self.items.remove(j).unwrap();
+ self.items.push_back(item);
}
- // Remove item from sending queue
- self.items.remove(j);
- } else {
- // Move item later in send queue to implement LAS scheduling
- // (LAS = Least Attained Service)
- for k in j..self.items.len() - 1 {
- if self.items[k].sent >= self.items[k + 1].sent {
- self.items.swap(k, k + 1);
- } else {
- break;
- }
- }
- }
- return Poll::Ready((id, data_frame));
+ return Poll::Ready((id, data_frame));
+ }
}
}