aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-02 13:46:42 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-02 13:46:42 +0200
commit13b5f28c7e8dec12b1db61735931b3830a3c893f (patch)
tree76132f146e6af94afa92b739057c1bd1359adee7 /src/api/s3/put.rs
parent1ef87ac4cb676113e86fc16a9eb27546d9a737bd (diff)
downloadgarage-13b5f28c7e8dec12b1db61735931b3830a3c893f.tar.gz
garage-13b5f28c7e8dec12b1db61735931b3830a3c893f.zip
Make use of BytesBuf from new Netapp
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs43
1 files changed, 10 insertions, 33 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index dc0530df..97b8e4e3 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use futures::prelude::*;
@@ -13,6 +13,7 @@ use opentelemetry::{
Context,
};
+use garage_rpc::netapp::bytes_buf::BytesBuf;
use garage_table::*;
use garage_util::async_hash::*;
use garage_util::data::*;
@@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
size,
etag: data_md5sum_hex.clone(),
},
- first_block,
+ first_block.to_vec(),
)),
};
@@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
@@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- let block = Bytes::from(block);
let (_, _, block_hash) = futures::future::join3(
md5hasher.update(block.clone()),
sha256hasher.update(block.clone()),
@@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<Bytes>,
- buf_len: usize,
+ buf: BytesBuf,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(8),
- buf_len: 0,
+ buf: BytesBuf::new(),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf_len < self.block_size {
+ async fn next(&mut self) -> Result<Option<Bytes>, Error> {
+ while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf_len += bytes.len();
- self.buf.push_back(bytes);
+ self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
- if self.buf_len == 0 {
+ if self.buf.is_empty() {
Ok(None)
} else {
- let mut slices = Vec::with_capacity(self.buf.len());
- let mut taken = 0;
- while self.buf_len > 0 && taken < self.block_size {
- let front = self.buf.pop_front().unwrap();
- if taken + front.len() <= self.block_size {
- taken += front.len();
- self.buf_len -= front.len();
- slices.push(front);
- } else {
- let front_take = self.block_size - taken;
- slices.push(front.slice(..front_take));
- self.buf.push_front(front.slice(front_take..));
- self.buf_len -= front_take;
- break;
- }
- }
- Ok(Some(
- slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
- ))
+ Ok(Some(self.buf.take_max(self.block_size)))
}
}
}
@@ -545,7 +523,6 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(