diff options
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 87 |
1 files changed, 65 insertions, 22 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 9ef37421..dc0530df 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -8,7 +8,13 @@ use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_table::*; +use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -130,7 +136,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = blake2sum(&first_block[..]); + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( @@ -273,14 +280,23 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, - first_block: Vec<u8>, + first_block: Bytes, first_block_hash: Hash, chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); + let tracer = opentelemetry::global::tracer("garage"); + + let md5hasher = AsyncHasher::<Md5>::new(); + let sha256hasher = AsyncHasher::<Sha256>::new(); + + futures::future::join( + md5hasher.update(first_block.clone()), + sha256hasher.update(first_block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash first block (md5, sha256)"), + )) + .await; let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( @@ -302,9 +318,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); + let block = Bytes::from(block); + let (_, _, block_hash) = futures::future::join3( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + async_blake2sum(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256, blake2)"), + )) + .await; let block_len = block.len(); put_curr_version_block = put_block_meta( garage, @@ -322,9 +345,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( } let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize().await; - let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); Ok((total_size, data_md5sum, data_sha256sum)) @@ -364,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque<u8>, + buf: VecDeque<Bytes>, + buf_len: usize, } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { @@ -373,29 +397,45 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(2 * block_size), + buf: VecDeque::with_capacity(8), + buf_len: 0, } } async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { - while !self.read_all && self.buf.len() < self.block_size { + while !self.read_all && self.buf_len < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(bytes); + self.buf_len += bytes.len(); + self.buf.push_back(bytes); } else { self.read_all = true; } } - if self.buf.is_empty() { + if self.buf_len == 0 { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::<Vec<u8>>(); - Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); - Ok(Some(block)) + let mut slices = Vec::with_capacity(self.buf.len()); + let mut taken = 0; + while self.buf_len > 0 && taken < self.block_size { + let front = self.buf.pop_front().unwrap(); + if taken + front.len() <= self.block_size { + taken += front.len(); + self.buf_len -= front.len(); + slices.push(front); + } else { + let front_take = self.block_size - taken; + slices.push(front.slice(..front_take)); + self.buf.push_front(front.slice(front_take..)); + self.buf_len -= front_take; + break; + } + } + Ok(Some( + slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(), + )) } } } @@ -504,7 +544,10 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block_hash = blake2sum(&first_block[..]); + + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, &version, |