diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/s3/copy.rs | 10 | ||||
-rw-r--r-- | src/api/s3/get.rs | 21 |
2 files changed, 23 insertions, 8 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index b54cbd23..10cf5935 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -306,11 +307,16 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index c7621ade..dfc284fe 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -10,6 +10,7 @@ use http::header::{ use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -242,9 +243,11 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { + let order_stream = OrderTag::stream(); + let read_first_block = garage .block_manager - .rpc_get_block_streaming(first_block_hash); + .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let (first_block_stream, version) = @@ -260,7 +263,8 @@ pub async fn handle_get( blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, stream_opt)| { + .enumerate() + .map(move |(i, (hash, stream_opt))| { let garage = garage.clone(); async move { if let Some(stream) = stream_opt { @@ -268,7 +272,7 @@ pub async fn handle_get( } else { garage .block_manager - .rpc_get_block_streaming(&hash) + .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await .unwrap_or_else(|_| { Box::pin(futures::stream::once(async move { @@ -281,7 +285,7 @@ pub async fn handle_get( } } }) - .buffered(3) + .buffered(2) .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); @@ -445,11 +449,16 @@ fn body_from_blocks_range( true_offset += b.size; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, true_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; + let data = garage + .block_manager + .rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) + .await?; let start_in_block = if true_offset > begin { 0 } else { |