aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-05 20:26:33 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-05 20:26:33 +0100
commit22332e6c3536159656e773a85d656352882ffc32 (patch)
tree583d9d7b3ae3d053ca66367bcb5cc52d97f68839
parent81ccd4586ebdf707dfd37d4802e0cf475e11e004 (diff)
downloadgarage-22332e6c3536159656e773a85d656352882ffc32.tar.gz
garage-22332e6c3536159656e773a85d656352882ffc32.zip
[dep-upgrade-202402] simplify/refactor GetObject
-rw-r--r--src/api/s3/get.rs152
-rw-r--r--src/block/manager.rs8
2 files changed, 78 insertions, 82 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 71f0b158..f70dad7d 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -11,7 +11,8 @@ use http::header::{
use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
-use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
+use garage_block::manager::BlockStream;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
@@ -245,7 +246,7 @@ pub async fn handle_get(
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let (tx, rx) = mpsc::channel(2);
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash;
@@ -283,25 +284,13 @@ pub async fn handle_get(
{
Ok(()) => (),
Err(e) => {
- let err = std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Error while getting object data: {}", e),
- );
- let _ = tx
- .send(Box::pin(stream::once(future::ready(Err(err)))))
- .await;
+ let _ = tx.send(error_stream_item(e)).await;
}
}
});
- let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
- .flatten()
- .map(|x| {
- x.map(hyper::body::Frame::data)
- .map_err(|e| Error::from(garage_util::error::Error::from(e)))
- });
- let body = http_body_util::StreamBody::new(body_stream);
- Ok(resp_builder.body(ResBody::new(body))?)
+ let body = response_body_from_block_stream(rx);
+ Ok(resp_builder.body(body)?)
}
}
}
@@ -461,67 +450,75 @@ fn body_from_blocks_range(
}
let order_stream = OrderTag::stream();
- let mut body_stream =
- futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (block, block_offset))| {
- let garage = garage.clone();
- async move {
- garage
- .block_manager
- .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- .scan(block_offset, move |chunk_offset, chunk| {
- let r = match chunk {
- Ok(chunk_bytes) => {
- let chunk_len = chunk_bytes.len() as u64;
- let r = if *chunk_offset >= end {
- // The current chunk is after the part we want to read.
- // Returning None here will stop the scan, the rest of the
- // stream will be ignored
- None
- } else if *chunk_offset + chunk_len <= begin {
- // The current chunk is before the part we want to read.
- // We return a None that will be removed by the filter_map
- // below.
- Some(None)
+ let (tx, rx) = mpsc::channel::<BlockStream>(2);
+
+ tokio::spawn(async move {
+ match async {
+ let garage = garage.clone();
+ for (i, (block, block_offset)) in blocks.iter().enumerate() {
+ let block_stream = garage
+ .block_manager
+ .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
+ .await?
+ .scan(*block_offset, move |chunk_offset, chunk| {
+ let r = match chunk {
+ Ok(chunk_bytes) => {
+ let chunk_len = chunk_bytes.len() as u64;
+ let r = if *chunk_offset >= end {
+ // The current chunk is after the part we want to read.
+ // Returning None here will stop the scan, the rest of the
+ // stream will be ignored
+ None
+ } else if *chunk_offset + chunk_len <= begin {
+ // The current chunk is before the part we want to read.
+ // We return a None that will be removed by the filter_map
+ // below.
+ Some(None)
+ } else {
+ // The chunk has an intersection with the requested range
+ let start_in_chunk = if *chunk_offset > begin {
+ 0
} else {
- // The chunk has an intersection with the requested range
- let start_in_chunk = if *chunk_offset > begin {
- 0
- } else {
- begin - *chunk_offset
- };
- let end_in_chunk = if *chunk_offset + chunk_len < end {
- chunk_len
- } else {
- end - *chunk_offset
- };
- Some(Some(Ok(chunk_bytes.slice(
- start_in_chunk as usize..end_in_chunk as usize,
- ))))
+ begin - *chunk_offset
};
- *chunk_offset += chunk_bytes.len() as u64;
- r
- }
- Err(e) => Some(Some(Err(e))),
- };
- futures::future::ready(r)
- })
- .filter_map(futures::future::ready)
- }
- });
+ let end_in_chunk = if *chunk_offset + chunk_len < end {
+ chunk_len
+ } else {
+ end - *chunk_offset
+ };
+ Some(Some(Ok(chunk_bytes
+ .slice(start_in_chunk as usize..end_in_chunk as usize))))
+ };
+ *chunk_offset += chunk_bytes.len() as u64;
+ r
+ }
+ Err(e) => Some(Some(Err(e))),
+ };
+ futures::future::ready(r)
+ })
+ .filter_map(futures::future::ready);
+
+ let block_stream: BlockStream = Box::pin(block_stream);
+ tx.send(Box::pin(block_stream))
+ .await
+ .ok_or_message("channel closed")?;
+ }
- let (tx, rx) = mpsc::channel(2);
- tokio::spawn(async move {
- while let Some(item) = body_stream.next().await {
- if tx.send(item.await).await.is_err() {
- break; // connection closed by client
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let _ = tx.send(error_stream_item(e)).await;
}
}
});
+ response_body_from_block_stream(rx)
+}
+
+fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.flatten()
.map(|x| {
@@ -531,11 +528,10 @@ fn body_from_blocks_range(
ResBody::new(http_body_util::StreamBody::new(body_stream))
}
-fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
- Box::pin(futures::stream::once(async move {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Could not get block {}: {}", i, e),
- ))
- }))
+fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ Box::pin(stream::once(future::ready(Err(err))))
}
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 2d1b5c67..5388f69d 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
+pub type BlockStream =
+ Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
+
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@@ -324,10 +327,7 @@ impl BlockManager {
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
- ) -> Result<
- Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
- Error,
- > {
+ ) -> Result<BlockStream, Error> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
DataBlockHeader::Plain => Ok(stream),