diff options
author | Alex <alex@adnab.me> | 2022-09-13 15:26:08 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-09-13 15:26:08 +0200 |
commit | 11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf (patch) | |
tree | ca460324ea6848c61c6c698a51456ff27485ea7b /src/api/s3/get.rs | |
parent | 309d7aef3f05657e2b969ab72442b2f2c350da03 (diff) | |
parent | ff30891999b5be5421b80b89da1037e943179d2d (diff) | |
download | garage-11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf.tar.gz garage-11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf.zip |
Merge pull request 'use netapp streaming body' (#343) from netapp-stream-body into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/343
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r-- | src/api/s3/get.rs | 110 |
1 files changed, 78 insertions, 32 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7fa1a177..ae4c287d 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -7,9 +7,9 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; @@ -242,10 +242,15 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); + let order_stream = OrderTag::stream(); + + let read_first_block = garage + .block_manager + .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let (first_block_stream, version) = + futures::try_join!(read_first_block, get_next_blocks)?; let version = version.ok_or(Error::NoSuchKey)?; let mut blocks = version @@ -254,24 +259,26 @@ pub async fn handle_get( .iter() .map(|(_, vb)| (vb.hash, None)) .collect::<Vec<_>>(); - blocks[0].1 = Some(first_block); + blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { + .enumerate() + .map(move |(i, (hash, stream_opt))| { let garage = garage.clone(); async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) + if let Some(stream) = stream_opt { + stream } else { garage .block_manager - .rpc_get_block(&hash) + .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .map(Bytes::from) + .unwrap_or_else(|e| error_stream(i, e)) } } }) - .buffered(2); + .buffered(2) + .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) @@ -422,40 +429,79 @@ fn body_from_blocks_range( all_blocks.len(), 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, )); - let mut true_offset = 0; + let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { - if true_offset >= end { + if block_offset >= end { break; } // Keep only blocks that have an intersection with the requested range - if true_offset < end && true_offset + b.size > begin { - blocks.push((*b, true_offset)); + if block_offset < end && block_offset + b.size > begin { + blocks.push((*b, block_offset)); } - true_offset += b.size; + block_offset += b.size as u64; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, block_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let data = Bytes::from(data); - let start_in_block = if true_offset > begin { - 0 - } else { - begin - true_offset - }; - let end_in_block = if true_offset + block.size < end { - block.size - } else { - end - true_offset - }; - Result::<Bytes, Error>::Ok( - data.slice(start_in_block as usize..end_in_block as usize), - ) + garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) } }) - .buffered(2); + .buffered(2) + .flatten(); hyper::body::Body::wrap_stream(body_stream) } + +fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Could not get block {}: {}", i, e), + )) + })) +} |