From e9c42bca347e3a67f8d6bae953bdf0b53ce37d00 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:22:29 +0100 Subject: [refactor-block] add DataBlockStream type --- src/block/block.rs | 5 +++++ src/block/manager.rs | 27 +++++++++++++-------------- 2 files changed, 18 insertions(+), 14 deletions(-) (limited to 'src/block') diff --git a/src/block/block.rs b/src/block/block.rs index 0b14bad4..3f5c4f94 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -7,6 +7,8 @@ use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; +use garage_net::stream::ByteStream; + #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum DataBlockHeader { Plain, @@ -25,6 +27,9 @@ pub type DataBlock = DataBlockElem; /// A path to a possibly compressed block of data pub type DataBlockPath = DataBlockElem; +/// A stream of possibly compressed block data +pub type DataBlockStream = DataBlockElem; + impl DataBlockHeader { pub fn is_compressed(&self) -> bool { matches!(self, DataBlockHeader::Compressed) diff --git a/src/block/manager.rs b/src/block/manager.rs index 6303248a..6773dfd1 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -229,11 +229,9 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result<(DataBlockHeader, ByteStream), Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { - Ok((header, stream)) - }) - .await + ) -> Result { + self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -243,7 +241,8 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await .err_context("error in block data stream") @@ -259,7 +258,7 @@ impl BlockManager { f: F, ) -> Result where - F: Fn(DataBlockHeader, ByteStream) -> Fut, + F: Fn(DataBlockStream) -> Fut, Fut: futures::Future>, { let who = self.replication.read_nodes(hash); @@ -281,8 +280,8 @@ impl BlockManager { continue; } }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + let block_stream = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream), (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; @@ -292,7 +291,7 @@ impl BlockManager { continue; } }; - match f(header, stream).await { + match f(block_stream).await { Ok(ret) => return Ok(ret), Err(e) => { debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); @@ -316,14 +315,14 @@ impl BlockManager { // ---- Public interface ---- - /// Ask nodes that might have a block for it, - /// return it as a stream + /// Ask nodes that might have a block for it, return it as a stream pub async fn rpc_get_block_streaming( &self, hash: &Hash, order_tag: Option, ) -> Result { - let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { @@ -336,7 +335,7 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it + /// Ask nodes that might have a block for it, return it as one big Bytes pub async fn rpc_get_block( &self, hash: &Hash, -- cgit v1.2.3