diff options
Diffstat (limited to 'src/api/s3/copy.rs')
-rw-r--r-- | src/api/s3/copy.rs | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4415a037..a1a8c9a4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -5,9 +5,12 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; +use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::netapp::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), + Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) @@ -365,10 +373,7 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2 - .block_manager - .rpc_put_block(final_hash, data.into()) - .await + garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } @@ -556,13 +561,13 @@ impl CopyPreconditionHeaders { } } -type BlockStreamItemOk = (Vec<u8>, Option<Hash>); +type BlockStreamItemOk = (Bytes, Option<Hash>); type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>; struct Defragmenter<S: Stream<Item = BlockStreamItem>> { block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>, - buffer: Vec<u8>, + buffer: BytesBuf, hash: Option<Hash>, } @@ -571,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { Self { block_size, block_stream, - buffer: vec![], + buffer: BytesBuf::new(), hash: None, } } @@ -589,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; + self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -600,7 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { } } - Ok((std::mem::take(&mut self.buffer), self.hash.take())) + Ok((self.buffer.take_all(), self.hash.take())) } } |