aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/copy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/copy.rs')
-rw-r--r--src/api/s3/copy.rs27
1 files changed, 17 insertions, 10 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4415a037..10cf5935 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,9 +5,11 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -305,13 +307,18 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone();
+ let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
- .flat_map(|(block_hash, range_to_copy)| {
+ .enumerate()
+ .flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
- let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ let data = garage3
+ .block_manager
+ .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ .await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -365,10 +372,7 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2
- .block_manager
- .rpc_put_block(final_hash, data.into())
- .await
+ garage2.block_manager.rpc_put_block(final_hash, data).await
} else {
Ok(())
}
@@ -556,7 +560,7 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
@@ -589,7 +593,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -600,7 +604,10 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((
+ Bytes::from(std::mem::take(&mut self.buffer)),
+ self.hash.take(),
+ ))
}
}