aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block/manager.rs20
-rw-r--r--src/block/resync.rs6
-rw-r--r--src/net/message.rs18
-rw-r--r--src/net/send.rs85
-rw-r--r--src/net/server.rs2
5 files changed, 83 insertions, 48 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 2c7c7aba..62829a24 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -238,10 +238,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,9 +255,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -264,6 +271,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -279,7 +287,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -331,7 +339,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..7221b093 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -436,7 +436,7 @@ impl BlockResyncManager {
&manager.endpoint,
&need_nodes[..],
put_block_message,
- RequestStrategy::with_priority(PRIO_BACKGROUND)
+ RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
.with_quorum(need_nodes.len()),
)
.await
@@ -460,7 +460,9 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await?;
+ let block_data = manager
+ .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
+ .await?;
manager.metrics.resync_recv_counter.add(1);
diff --git a/src/net/message.rs b/src/net/message.rs
index b0d255c6..af98ca12 100644
--- a/src/net/message.rs
+++ b/src/net/message.rs
@@ -28,12 +28,30 @@ use crate::util::*;
/// The same priority value is given to a request and to its associated response.
pub type RequestPriority = u8;
+// Usage of priority levels in Garage:
+//
+// PRIO_HIGH
+// for liveness check events such as pings and important
+// reconfiguration events such as layout changes
+//
+// PRIO_NORMAL
+// for standard interactive requests to exchange metadata
+//
+// PRIO_NORMAL | PRIO_SECONDARY
+// for standard interactive requests to exchange block data
+//
+// PRIO_BACKGROUND
+// for background resync requests to exchange metadata
+// PRIO_BACKGROUND | PRIO_SECONDARY
+// for background resync requests to exchange block data
+
/// Priority class: high
pub const PRIO_HIGH: RequestPriority = 0x20;
/// Priority class: normal
pub const PRIO_NORMAL: RequestPriority = 0x40;
/// Priority class: background
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
+
/// Priority: primary among given class
pub const PRIO_PRIMARY: RequestPriority = 0x00;
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
diff --git a/src/net/send.rs b/src/net/send.rs
index 0db0ba77..c60fc6b2 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -109,7 +109,7 @@ impl SendQueuePriority {
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
order_vec.insert(i, order);
}
- self.items.push_front(item);
+ self.items.push_back(item);
}
fn remove(&mut self, id: RequestID) {
if let Some(i) = self.items.iter().position(|x| x.id == id) {
@@ -128,51 +128,56 @@ impl SendQueuePriority {
self.items.is_empty()
}
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
- for (j, item) in self.items.iter_mut().enumerate() {
- if let Some(OrderTag(stream, order)) = item.order_tag {
- if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
+ // as they most likely represent small requests to be sent first
+ // in step 2: poll all streams
+ for step in 0..2 {
+ for (j, item) in self.items.iter_mut().enumerate() {
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ continue;
+ }
+ }
+
+ if step == 0 && item.sent > 0 {
continue;
}
- }
- let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
- if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
- let id = item.id;
- let eos = item.data.eos();
-
- let packet = bytes_or_err.map_err(|e| match e {
- ReadExactError::Stream(err) => err,
- _ => unreachable!(),
- });
-
- let is_err = packet.is_err();
- let data_frame = DataFrame::from_packet(packet, !eos);
- item.sent += data_frame.data().len();
-
- if eos || is_err {
- // If item had an order tag, remove it from the corresponding ordering list
- if let Some(OrderTag(stream, order)) = item.order_tag {
- let order_stream = self.order.get_mut(&stream).unwrap();
- assert_eq!(order_stream.pop_front(), Some(order));
- if order_stream.is_empty() {
- self.order.remove(&stream);
+ let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
+ if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
+ let id = item.id;
+ let eos = item.data.eos();
+
+ let packet = bytes_or_err.map_err(|e| match e {
+ ReadExactError::Stream(err) => err,
+ _ => unreachable!(),
+ });
+
+ let is_err = packet.is_err();
+ let data_frame = DataFrame::from_packet(packet, !eos);
+ item.sent += data_frame.data().len();
+
+ if eos || is_err {
+ // If item had an order tag, remove it from the corresponding ordering list
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ let order_stream = self.order.get_mut(&stream).unwrap();
+ assert_eq!(order_stream.pop_front(), Some(order));
+ if order_stream.is_empty() {
+ self.order.remove(&stream);
+ }
}
+ // Remove item from sending queue
+ self.items.remove(j);
+ } else if step == 0 {
+ // Step 0 means that this stream had not sent any bytes yet.
+ // Now that it has, and it was not an EOS, we know that it is bigger
+ // than one chunk so move it at the end of the queue.
+ let item = self.items.remove(j).unwrap();
+ self.items.push_back(item);
}
- // Remove item from sending queue
- self.items.remove(j);
- } else {
- // Move item later in send queue to implement LAS scheduling
- // (LAS = Least Attained Service)
- for k in j..self.items.len() - 1 {
- if self.items[k].sent >= self.items[k + 1].sent {
- self.items.swap(k, k + 1);
- } else {
- break;
- }
- }
- }
- return Poll::Ready((id, data_frame));
+ return Poll::Ready((id, data_frame));
+ }
}
}
diff --git a/src/net/server.rs b/src/net/server.rs
index 55b9e678..36dccb2f 100644
--- a/src/net/server.rs
+++ b/src/net/server.rs
@@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
- Err(e) => (PRIO_HIGH, Err(e)),
+ Err(e) => (PRIO_NORMAL, Err(e)),
};
debug!("server: sending response to {}", id);