aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-06 22:25:23 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-06 22:25:23 +0200
commit907054775dc71a10a92ab96112889db9113130ab (patch)
treeebe24031883a8293d9add6ca6b49b80cb15d1d15
parent6b958979bd898a576ee9c1021cc908b3ec75ffc7 (diff)
downloadgarage-907054775dc71a10a92ab96112889db9113130ab.tar.gz
garage-907054775dc71a10a92ab96112889db9113130ab.zip
Faster copy, better get error message
-rw-r--r--src/api/s3/copy.rs12
-rw-r--r--src/api/s3/get.rs4
2 files changed, 7 insertions, 9 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 10cf5935..a1a8c9a4 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -9,6 +9,7 @@ use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::netapp::bytes_buf::BytesBuf;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
@@ -566,7 +567,7 @@ type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
block_size: usize,
block_stream: Pin<Box<stream::Peekable<S>>>,
- buffer: Vec<u8>,
+ buffer: BytesBuf,
hash: Option<Hash>,
}
@@ -575,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
Self {
block_size,
block_stream,
- buffer: vec![],
+ buffer: BytesBuf::new(),
hash: None,
}
}
@@ -593,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
+ self.buffer.extend(next_block);
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -604,10 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((
- Bytes::from(std::mem::take(&mut self.buffer)),
- self.hash.take(),
- ))
+ Ok((self.buffer.take_all(), self.hash.take()))
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index dfc284fe..dd95f6e7 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -274,11 +274,11 @@ pub async fn handle_get(
.block_manager
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await
- .unwrap_or_else(|_| {
+ .unwrap_or_else(|e| {
Box::pin(futures::stream::once(async move {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
- "Could not get next block",
+ format!("Could not get block {}: {}", i, e),
))
}))
})