aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/copy.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 18:20:27 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:02 +0200
commit605a630333c8ee60c55fe011a375c01277bba173 (patch)
tree1ae21efe1070806ddc9af7b85cda718e64e105c8 /src/api/s3/copy.rs
parenta35d4da721db3550a2833d8576d4283bc999e8df (diff)
downloadgarage-605a630333c8ee60c55fe011a375c01277bba173.tar.gz
garage-605a630333c8ee60c55fe011a375c01277bba173.zip
Use streaming in block manager
Diffstat (limited to 'src/api/s3/copy.rs')
-rw-r--r--src/api/s3/copy.rs12
1 files changed, 8 insertions, 4 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4415a037..54a565e0 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,6 +5,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
@@ -311,7 +312,7 @@ pub async fn handle_upload_part_copy(
stream::once(async move {
let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -556,7 +557,7 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
@@ -589,7 +590,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -600,7 +601,10 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((
+ Bytes::from(std::mem::take(&mut self.buffer)),
+ self.hash.take(),
+ ))
}
}