aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-23 12:22:29 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-23 12:22:29 +0100
commite9c42bca347e3a67f8d6bae953bdf0b53ce37d00 (patch)
treee553ba7d962767b9797aca1ad973de8e533f782a /src/block/manager.rs
parentcd1069c1d4219deb7d48758fb9d0a45cc4c1eddb (diff)
downloadgarage-e9c42bca347e3a67f8d6bae953bdf0b53ce37d00.tar.gz
garage-e9c42bca347e3a67f8d6bae953bdf0b53ce37d00.zip
[refactor-block] add DataBlockStream type
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs27
1 files changed, 13 insertions, 14 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 6303248a..6773dfd1 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -229,11 +229,9 @@ impl BlockManager {
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
- ) -> Result<(DataBlockHeader, ByteStream), Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
- Ok((header, stream))
- })
- .await
+ ) -> Result<DataBlockStream, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -243,7 +241,8 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
.err_context("error in block data stream")
@@ -259,7 +258,7 @@ impl BlockManager {
f: F,
) -> Result<T, Error>
where
- F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self.replication.read_nodes(hash);
@@ -281,8 +280,8 @@ impl BlockManager {
continue;
}
};
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ let block_stream = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream),
(Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue;
@@ -292,7 +291,7 @@ impl BlockManager {
continue;
}
};
- match f(header, stream).await {
+ match f(block_stream).await {
Ok(ret) => return Ok(ret),
Err(e) => {
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
@@ -316,14 +315,14 @@ impl BlockManager {
// ---- Public interface ----
- /// Ask nodes that might have a block for it,
- /// return it as a stream
+ /// Ask nodes that might have a block for it, return it as a stream
pub async fn rpc_get_block_streaming(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => {
@@ -336,7 +335,7 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it
+ /// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block(
&self,
hash: &Hash,