aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-27 16:00:46 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-27 16:22:40 +0100
commit85f580cbde4913fe8382316ff3c27b8443c61dd7 (patch)
treeb5bf432281b746706c38fd4e494240997b66e9d1 /src/block/manager.rs
parent0d3e285d133459fd53e28f879a86c0de1a0c36df (diff)
downloadgarage-85f580cbde4913fe8382316ff3c27b8443c61dd7.tar.gz
garage-85f580cbde4913fe8382316ff3c27b8443c61dd7.zip
[fix-buffering] change request sending strategy and fix prioritiesfix-buffering
remove LAS, priorize new requests but otherwise just do standard queuing
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs20
1 files changed, 15 insertions, 5 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 2c7c7aba..62829a24 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -238,10 +238,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,9 +255,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -264,6 +271,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -279,7 +287,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -331,7 +339,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),