diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-02 13:46:42 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-02 13:46:42 +0200 |
commit | 13b5f28c7e8dec12b1db61735931b3830a3c893f (patch) | |
tree | 76132f146e6af94afa92b739057c1bd1359adee7 | |
parent | 1ef87ac4cb676113e86fc16a9eb27546d9a737bd (diff) | |
download | garage-13b5f28c7e8dec12b1db61735931b3830a3c893f.tar.gz garage-13b5f28c7e8dec12b1db61735931b3830a3c893f.zip |
Make use of BytesBuf from new Netapp
-rw-r--r-- | src/api/s3/put.rs | 43 |
1 files changed, 10 insertions, 33 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index dc0530df..97b8e4e3 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use futures::prelude::*; @@ -13,6 +13,7 @@ use opentelemetry::{ Context, }; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( size, etag: data_md5sum_hex.clone(), }, - first_block, + first_block.to_vec(), )), }; @@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { @@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - let block = Bytes::from(block); let (_, _, block_hash) = futures::future::join3( md5hasher.update(block.clone()), sha256hasher.update(block.clone()), @@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque<Bytes>, - buf_len: usize, + buf: BytesBuf, } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { @@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), } } - async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { - while !self.read_all && self.buf_len < self.block_size { + async fn next(&mut self) -> Result<Option<Bytes>, Error> { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf_len += bytes.len(); - self.buf.push_back(bytes); + self.buf.extend(bytes); } else { self.read_all = true; } } - if self.buf_len == 0 { + if self.buf.is_empty() { Ok(None) } else { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while self.buf_len > 0 && taken < self.block_size { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= self.block_size { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = self.block_size - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Ok(Some( - slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(), - )) + Ok(Some(self.buf.take_max(self.block_size))) } } } @@ -545,7 +523,6 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( |