diff options
author | Alex <alex@adnab.me> | 2024-02-23 15:49:43 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-02-23 15:49:43 +0000 |
commit | 61758ce0f91b930542bd2ee3c72735000cc12e75 (patch) | |
tree | 0a8c9c8d57abfa084188873dab2987ecfe93c0d7 /src/api | |
parent | 74d0c47f21ae2f9998a7dcbca3a27e3cc51e70b6 (diff) | |
parent | 6ee691e65f2c6f7b337a62cbfacaddb9ba9cd61a (diff) | |
download | garage-61758ce0f91b930542bd2ee3c72735000cc12e75.tar.gz garage-61758ce0f91b930542bd2ee3c72735000cc12e75.zip |
Merge pull request 'some refactoring on data read/write path' (#729) from refactor-block into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/729
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/s3/get.rs | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 53f0a345..efb8d4ab 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -13,7 +13,7 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_block::manager::BlockStream; +use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -286,7 +286,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel::<BlockStream>(2); + let (tx, rx) = mpsc::channel::<ByteStream>(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -494,7 +494,7 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let (tx, rx) = mpsc::channel::<BlockStream>(2); + let (tx, rx) = mpsc::channel::<ByteStream>(2); tokio::spawn(async move { match async { @@ -542,7 +542,7 @@ fn body_from_blocks_range( }) .filter_map(futures::future::ready); - let block_stream: BlockStream = Box::pin(block_stream); + let block_stream: ByteStream = Box::pin(block_stream); tx.send(Box::pin(block_stream)) .await .ok_or_message("channel closed")?; @@ -562,7 +562,7 @@ fn body_from_blocks_range( response_body_from_block_stream(rx) } -fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { +fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody { ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream { +fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("Error while getting object data: {}", e), |