aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/copy.rs10
-rw-r--r--src/api/s3/get.rs21
2 files changed, 23 insertions, 8 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index b54cbd23..10cf5935 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -9,6 +9,7 @@ use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -306,11 +307,16 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone();
+ let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
- .flat_map(|(block_hash, range_to_copy)| {
+ .enumerate()
+ .flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
- let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ let data = garage3
+ .block_manager
+ .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ .await?;
match range_to_copy {
Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index c7621ade..dfc284fe 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -10,6 +10,7 @@ use http::header::{
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
@@ -242,9 +243,11 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
+ let order_stream = OrderTag::stream();
+
let read_first_block = garage
.block_manager
- .rpc_get_block_streaming(first_block_hash);
+ .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block_stream, version) =
@@ -260,7 +263,8 @@ pub async fn handle_get(
blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks)
- .map(move |(hash, stream_opt)| {
+ .enumerate()
+ .map(move |(i, (hash, stream_opt))| {
let garage = garage.clone();
async move {
if let Some(stream) = stream_opt {
@@ -268,7 +272,7 @@ pub async fn handle_get(
} else {
garage
.block_manager
- .rpc_get_block_streaming(&hash)
+ .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await
.unwrap_or_else(|_| {
Box::pin(futures::stream::once(async move {
@@ -281,7 +285,7 @@ pub async fn handle_get(
}
}
})
- .buffered(3)
+ .buffered(2)
.flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
@@ -445,11 +449,16 @@ fn body_from_blocks_range(
true_offset += b.size;
}
+ let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks)
- .map(move |(block, true_offset)| {
+ .enumerate()
+ .map(move |(i, (block, true_offset))| {
let garage = garage.clone();
async move {
- let data = garage.block_manager.rpc_get_block(&block.hash).await?;
+ let data = garage
+ .block_manager
+ .rpc_get_block(&block.hash, Some(order_stream.order(i as u64)))
+ .await?;
let start_in_block = if true_offset > begin {
0
} else {