diff options
-rw-r--r-- | src/api/s3/get.rs | 12 | ||||
-rw-r--r-- | src/block/manager.rs | 7 |
2 files changed, 7 insertions, 12 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 53f0a345..efb8d4ab 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -13,7 +13,7 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_block::manager::BlockStream; +use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -286,7 +286,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel::<BlockStream>(2); + let (tx, rx) = mpsc::channel::<ByteStream>(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -494,7 +494,7 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let (tx, rx) = mpsc::channel::<BlockStream>(2); + let (tx, rx) = mpsc::channel::<ByteStream>(2); tokio::spawn(async move { match async { @@ -542,7 +542,7 @@ fn body_from_blocks_range( }) .filter_map(futures::future::ready); - let block_stream: BlockStream = Box::pin(block_stream); + let block_stream: ByteStream = Box::pin(block_stream); tx.send(Box::pin(block_stream)) .await .ok_or_message("channel closed")?; @@ -562,7 +562,7 @@ fn body_from_blocks_range( response_body_from_block_stream(rx) } -fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { +fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream { +fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("Error while getting object data: {}", e), diff --git a/src/block/manager.rs b/src/block/manager.rs index 848d9141..96ea2c96 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -9,7 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -53,9 +51,6 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -pub type BlockStream = - Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -327,7 +322,7 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option<OrderTag>, - ) -> Result<BlockStream, Error> { + ) -> Result<ByteStream, Error> { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(stream), |