diff options
author | Alex Auvolat <alex@adnab.me> | 2024-03-27 16:00:46 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-03-27 16:22:40 +0100 |
commit | 85f580cbde4913fe8382316ff3c27b8443c61dd7 (patch) | |
tree | b5bf432281b746706c38fd4e494240997b66e9d1 /src/block/manager.rs | |
parent | 0d3e285d133459fd53e28f879a86c0de1a0c36df (diff) | |
download | garage-85f580cbde4913fe8382316ff3c27b8443c61dd7.tar.gz garage-85f580cbde4913fe8382316ff3c27b8443c61dd7.zip |
[fix-buffering] change request sending strategy and fix prioritiesfix-buffering
remove LAS, priorize new requests but otherwise just do standard queuing
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 2c7c7aba..62829a24 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -238,10 +238,16 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlockStream, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) - .await + self.rpc_get_raw_block_internal( + hash, + priority, + order_tag, + |stream| async move { Ok(stream) }, + ) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,9 +255,10 @@ impl BlockManager { pub(crate) async fn rpc_get_raw_block( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move { let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await @@ -264,6 +271,7 @@ impl BlockManager { async fn rpc_get_raw_block_internal<F, Fut, T>( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, f: F, ) -> Result<T, Error> @@ -279,7 +287,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, + priority, ); tokio::select! { res = rpc => { @@ -331,7 +339,9 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<ByteStream, Error> { - let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self + .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag) + .await?; let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), |