aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-13 15:13:07 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-13 15:13:07 +0200
commitff30891999b5be5421b80b89da1037e943179d2d (patch)
treef1f1b3afa06f7034f4b0faa43b27535c12349c55
parent28a4af73ca8c0f82314157939fc98c46f338e84a (diff)
downloadgarage-netapp-stream-body.tar.gz
garage-netapp-stream-body.zip
Use streaming block API for get with Range requestsnetapp-stream-body
-rw-r--r--src/api/s3/get.rs93
1 files changed, 60 insertions, 33 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index dd95f6e7..ae4c287d 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -7,10 +7,9 @@ use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
-use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
-use garage_rpc::rpc_helper::OrderTag;
+use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
@@ -274,14 +273,7 @@ pub async fn handle_get(
.block_manager
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await
- .unwrap_or_else(|e| {
- Box::pin(futures::stream::once(async move {
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Could not get block {}: {}", i, e),
- ))
- }))
- })
+ .unwrap_or_else(|e| error_stream(i, e))
}
}
})
@@ -437,44 +429,79 @@ fn body_from_blocks_range(
all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
));
- let mut true_offset = 0;
+ let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() {
- if true_offset >= end {
+ if block_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
- if true_offset < end && true_offset + b.size > begin {
- blocks.push((*b, true_offset));
+ if block_offset < end && block_offset + b.size > begin {
+ blocks.push((*b, block_offset));
}
- true_offset += b.size;
+ block_offset += b.size as u64;
}
let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks)
.enumerate()
- .map(move |(i, (block, true_offset))| {
+ .map(move |(i, (block, block_offset))| {
let garage = garage.clone();
async move {
- let data = garage
+ garage
.block_manager
- .rpc_get_block(&block.hash, Some(order_stream.order(i as u64)))
- .await?;
- let start_in_block = if true_offset > begin {
- 0
- } else {
- begin - true_offset
- };
- let end_in_block = if true_offset + block.size < end {
- block.size
- } else {
- end - true_offset
- };
- Result::<Bytes, Error>::Ok(
- data.slice(start_in_block as usize..end_in_block as usize),
- )
+ .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
+ .await
+ .unwrap_or_else(|e| error_stream(i, e))
+ .scan(block_offset, move |chunk_offset, chunk| {
+ let r = match chunk {
+ Ok(chunk_bytes) => {
+ let chunk_len = chunk_bytes.len() as u64;
+ let r = if *chunk_offset >= end {
+ // The current chunk is after the part we want to read.
+ // Returning None here will stop the scan, the rest of the
+ // stream will be ignored
+ None
+ } else if *chunk_offset + chunk_len <= begin {
+ // The current chunk is before the part we want to read.
+ // We return a None that will be removed by the filter_map
+ // below.
+ Some(None)
+ } else {
+ // The chunk has an intersection with the requested range
+ let start_in_chunk = if *chunk_offset > begin {
+ 0
+ } else {
+ begin - *chunk_offset
+ };
+ let end_in_chunk = if *chunk_offset + chunk_len < end {
+ chunk_len
+ } else {
+ end - *chunk_offset
+ };
+ Some(Some(Ok(chunk_bytes
+ .slice(start_in_chunk as usize..end_in_chunk as usize))))
+ };
+ *chunk_offset += chunk_bytes.len() as u64;
+ r
+ }
+ Err(e) => Some(Some(Err(e))),
+ };
+ futures::future::ready(r)
+ })
+ .filter_map(futures::future::ready)
}
})
- .buffered(2);
+ .buffered(2)
+ .flatten();
hyper::body::Body::wrap_stream(body_stream)
}
+
+fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
+ Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Could not get block {}: {}", i, e),
+ ))
+ }))
+}