diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-28 15:01:05 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-28 15:01:05 +0100 |
commit | 8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860 (patch) | |
tree | 49d6c32376708147e90ba64ea32cea7835e751c1 /src/net | |
parent | 25c196f34d958f4f61d50c89a1c5d40b96d7cd24 (diff) | |
parent | ecf641d88c264f7278d13a6d988288feb24a5dfe (diff) | |
download | garage-8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860.tar.gz garage-8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860.zip |
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/net')
-rw-r--r-- | src/net/message.rs | 18 | ||||
-rw-r--r-- | src/net/send.rs | 85 | ||||
-rw-r--r-- | src/net/server.rs | 2 |
3 files changed, 64 insertions, 41 deletions
diff --git a/src/net/message.rs b/src/net/message.rs index b0d255c6..af98ca12 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -28,12 +28,30 @@ use crate::util::*; /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; +// Usage of priority levels in Garage: +// +// PRIO_HIGH +// for liveness check events such as pings and important +// reconfiguration events such as layout changes +// +// PRIO_NORMAL +// for standard interactive requests to exchange metadata +// +// PRIO_NORMAL | PRIO_SECONDARY +// for standard interactive requests to exchange block data +// +// PRIO_BACKGROUND +// for background resync requests to exchange metadata +// PRIO_BACKGROUND | PRIO_SECONDARY +// for background resync requests to exchange block data + /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; + /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) diff --git a/src/net/send.rs b/src/net/send.rs index 0db0ba77..c60fc6b2 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -109,7 +109,7 @@ impl SendQueuePriority { let i = order_vec.iter().take_while(|o2| **o2 < order).count(); order_vec.insert(i, order); } - self.items.push_front(item); + self.items.push_back(item); } fn remove(&mut self, id: RequestID) { if let Some(i) = self.items.iter().position(|x| x.id == id) { @@ -128,51 +128,56 @@ impl SendQueuePriority { self.items.is_empty() } fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> { - for (j, item) in self.items.iter_mut().enumerate() { - if let Some(OrderTag(stream, order)) = item.order_tag { - if order > *self.order.get(&stream).unwrap().front().unwrap() { + // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority + // as they most likely represent small requests to be sent first + // in step 2: poll all streams + for step in 0..2 { + for (j, item) in self.items.iter_mut().enumerate() { + if let Some(OrderTag(stream, order)) = item.order_tag { + if order > *self.order.get(&stream).unwrap().front().unwrap() { + continue; + } + } + + if step == 0 && item.sent > 0 { continue; } - } - let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); - if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { - let id = item.id; - let eos = item.data.eos(); - - let packet = bytes_or_err.map_err(|e| match e { - ReadExactError::Stream(err) => err, - _ => unreachable!(), - }); - - let is_err = packet.is_err(); - let data_frame = DataFrame::from_packet(packet, !eos); - item.sent += data_frame.data().len(); - - if eos || is_err { - // If item had an order tag, remove it from the corresponding ordering list - if let Some(OrderTag(stream, order)) = item.order_tag { - let order_stream = self.order.get_mut(&stream).unwrap(); - assert_eq!(order_stream.pop_front(), Some(order)); - if order_stream.is_empty() { - self.order.remove(&stream); + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { + let id = item.id; + let eos = item.data.eos(); + + let packet = bytes_or_err.map_err(|e| match e { + ReadExactError::Stream(err) => err, + _ => unreachable!(), + }); + + let is_err = packet.is_err(); + let data_frame = DataFrame::from_packet(packet, !eos); + item.sent += data_frame.data().len(); + + if eos || is_err { + // If item had an order tag, remove it from the corresponding ordering list + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_stream = self.order.get_mut(&stream).unwrap(); + assert_eq!(order_stream.pop_front(), Some(order)); + if order_stream.is_empty() { + self.order.remove(&stream); + } } + // Remove item from sending queue + self.items.remove(j); + } else if step == 0 { + // Step 0 means that this stream had not sent any bytes yet. + // Now that it has, and it was not an EOS, we know that it is bigger + // than one chunk so move it at the end of the queue. + let item = self.items.remove(j).unwrap(); + self.items.push_back(item); } - // Remove item from sending queue - self.items.remove(j); - } else { - // Move item later in send queue to implement LAS scheduling - // (LAS = Least Attained Service) - for k in j..self.items.len() - 1 { - if self.items[k].sent >= self.items[k + 1].sent { - self.items.swap(k, k + 1); - } else { - break; - } - } - } - return Poll::Ready((id, data_frame)); + return Poll::Ready((id, data_frame)); + } } } diff --git a/src/net/server.rs b/src/net/server.rs index 55b9e678..36dccb2f 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -190,7 +190,7 @@ impl RecvLoop for ServerConn { let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await), - Err(e) => (PRIO_HIGH, Err(e)), + Err(e) => (PRIO_NORMAL, Err(e)), }; debug!("server: sending response to {}", id); |