aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-25 11:59:55 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:24:49 +0200
commitad35b18bb146fcbf5e817c10837c6e835b1af5b7 (patch)
treebab8424417fdb96310c7126101b5dc2ce98c3f44 /src
parent49154a78d88b54cec8b23bc5e54dfabff88af780 (diff)
downloadgarage-ad35b18bb146fcbf5e817c10837c6e835b1af5b7.tar.gz
garage-ad35b18bb146fcbf5e817c10837c6e835b1af5b7.zip
Faster chunker
Diffstat (limited to 'src')
-rw-r--r--src/api/s3/put.rs42
1 files changed, 32 insertions, 10 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 2c51909f..e6698bfa 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -387,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<u8>,
+ buf: VecDeque<Bytes>,
+ buf_len: usize,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -396,29 +397,50 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(2 * block_size),
+ buf: VecDeque::with_capacity(8),
+ buf_len: 0,
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf.len() < self.block_size {
+ while !self.read_all && self.buf_len < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(bytes);
+ self.buf_len += bytes.len();
+ self.buf.push_back(bytes);
} else {
self.read_all = true;
}
}
- if self.buf.is_empty() {
+ if self.buf_len == 0 {
Ok(None)
- } else if self.buf.len() <= self.block_size {
- let block = self.buf.drain(..).collect::<Vec<u8>>();
- Ok(Some(block))
} else {
- let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
- Ok(Some(block))
+ let mut slices = Vec::with_capacity(self.buf.len());
+ let mut taken = 0;
+ while self.buf_len > 0 && taken < self.block_size {
+ let front = self.buf.pop_front().unwrap();
+ if taken + front.len() <= self.block_size {
+ taken += front.len();
+ self.buf_len -= front.len();
+ slices.push(front);
+ } else {
+ let front_take = self.block_size - taken;
+ slices.push(front.slice(..front_take));
+ self.buf.push_front(front.slice(front_take..));
+ self.buf_len -= front_take;
+ break;
+ }
+ }
+ Ok(Some(
+ slices
+ .iter()
+ .map(|x| &x[..])
+ .collect::<Vec<_>>()
+ .concat()
+ .into(),
+ ))
}
}
}