aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/copy.rs17
-rw-r--r--src/api/s3/get.rs30
2 files changed, 29 insertions, 18 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4415a037..b54cbd23 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,6 +5,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
@@ -311,7 +312,7 @@ pub async fn handle_upload_part_copy(
stream::once(async move {
let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -365,10 +366,7 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2
- .block_manager
- .rpc_put_block(final_hash, data.into())
- .await
+ garage2.block_manager.rpc_put_block(final_hash, data).await
} else {
Ok(())
}
@@ -556,7 +554,7 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
@@ -589,7 +587,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -600,7 +598,10 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((
+ Bytes::from(std::mem::take(&mut self.buffer)),
+ self.hash.take(),
+ ))
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 7fa1a177..c7621ade 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -242,10 +242,13 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
+ let read_first_block = garage
+ .block_manager
+ .rpc_get_block_streaming(first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
- let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
+ let (first_block_stream, version) =
+ futures::try_join!(read_first_block, get_next_blocks)?;
let version = version.ok_or(Error::NoSuchKey)?;
let mut blocks = version
@@ -254,24 +257,32 @@ pub async fn handle_get(
.iter()
.map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
- blocks[0].1 = Some(first_block);
+ blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks)
- .map(move |(hash, data_opt)| {
+ .map(move |(hash, stream_opt)| {
let garage = garage.clone();
async move {
- if let Some(data) = data_opt {
- Ok(Bytes::from(data))
+ if let Some(stream) = stream_opt {
+ stream
} else {
garage
.block_manager
- .rpc_get_block(&hash)
+ .rpc_get_block_streaming(&hash)
.await
- .map(Bytes::from)
+ .unwrap_or_else(|_| {
+ Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Could not get next block",
+ ))
+ }))
+ })
}
}
})
- .buffered(2);
+ .buffered(3)
+ .flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
@@ -439,7 +450,6 @@ fn body_from_blocks_range(
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let data = Bytes::from(data);
let start_in_block = if true_offset > begin {
0
} else {