From 93552b9275a6a83653353e27c0ee0c64c7a23d59 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:33:38 +0100 Subject: [refactor-block] Remove redundant BlockStream type --- src/api/s3/get.rs | 12 ++++++------ src/block/manager.rs | 7 +------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 53f0a345..efb8d4ab 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -13,7 +13,7 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_block::manager::BlockStream; +use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -286,7 +286,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -494,7 +494,7 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); tokio::spawn(async move { match async { @@ -542,7 +542,7 @@ fn body_from_blocks_range( }) .filter_map(futures::future::ready); - let block_stream: BlockStream = Box::pin(block_stream); + let block_stream: ByteStream = Box::pin(block_stream); tx.send(Box::pin(block_stream)) .await .ok_or_message("channel closed")?; @@ -562,7 +562,7 @@ fn body_from_blocks_range( response_body_from_block_stream(rx) } -fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { +fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream_item(e: E) -> BlockStream { +fn error_stream_item(e: E) -> ByteStream { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("Error while getting object data: {}", e), diff --git a/src/block/manager.rs b/src/block/manager.rs index 848d9141..96ea2c96 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -9,7 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -53,9 +51,6 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -pub type BlockStream = - Pin> + Send + Sync + 'static>>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -327,7 +322,7 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result { + ) -> Result { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(stream), -- cgit v1.2.3