From 3fe94cc14fa6e2ec2f749afc3b2c4f4f0fa621fc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 26 Feb 2024 17:22:16 +0100 Subject: [refactor-put] rewrite read_and_put_block as a series of steps with channels --- src/api/s3/multipart.rs | 14 +--- src/api/s3/put.rs | 180 ++++++++++++++++++++++++++++-------------------- 2 files changed, 109 insertions(+), 85 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index b9d15b21..5959bcd6 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -6,7 +6,6 @@ use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; -use garage_util::async_hash::*; use garage_util::data::*; use garage_model::bucket_table::Bucket; @@ -135,17 +134,8 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - part_number, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; + let (total_size, data_md5sum, data_sha256sum, _) = + read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index fdfa567d..557d1e5f 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -7,6 +7,8 @@ use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use tokio::sync::mpsc; + use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -168,17 +170,8 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; + let (total_size, data_md5sum, data_sha256sum, first_block_hash) = + read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -299,84 +292,121 @@ pub(crate) async fn read_and_put_blocks> + version: &Version, part_number: u64, first_block: Bytes, - first_block_hash: Hash, chunker: &mut StreamChunker, -) -> Result<(u64, GenericArray, Hash), Error> { +) -> Result<(u64, GenericArray, Hash, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); - let md5hasher = AsyncHasher::::new(); - let sha256hasher = AsyncHasher::::new(); + let (block_tx, mut block_rx) = mpsc::channel::>(2); + let read_blocks = async { + block_tx.send(Ok(first_block)).await?; + loop { + let res = chunker + .next() + .with_context(Context::current_with_span( + tracer.start("Read block from client"), + )) + .await; + match res { + Ok(Some(block)) => block_tx.send(Ok(block)).await?, + Ok(None) => break, + Err(e) => { + block_tx.send(Err(e)).await?; + break; + } + } + } + drop(block_tx); + Ok::<_, mpsc::error::SendError<_>>(()) + }; - futures::future::join( - md5hasher.update(first_block.clone()), - sha256hasher.update(first_block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash first block (md5, sha256)"), - )) - .await; + let (block_tx2, mut block_rx2) = mpsc::channel::>(1); + let hash_stream = async { + let md5hasher = AsyncHasher::::new(); + let sha256hasher = AsyncHasher::::new(); + while let Some(next) = block_rx.recv().await { + match next { + Ok(block) => { + block_tx2.send(Ok(block.clone())).await?; + futures::future::join( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256)"), + )) + .await; + } + Err(e) => { + block_tx2.send(Err(e)).await?; + break; + } + } + } + drop(block_tx2); + Ok::<_, mpsc::error::SendError<_>>(futures::join!( + md5hasher.finalize(), + sha256hasher.finalize() + )) + }; - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta( - garage, - version, - part_number, - 0, - first_block_hash, - first_block.len() as u64, - ); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = futures::try_join!( - put_curr_block.map_err(Error::from), - put_curr_version_block.map_err(Error::from), - chunker.next(), - )?; - if let Some(block) = next_block { - let (_, _, block_hash) = futures::future::join3( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - async_blake2sum(block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash block (md5, sha256, blake2)"), - )) - .await; - let block_len = block.len(); - put_curr_version_block = put_block_meta( - garage, - version, - part_number, - next_offset as u64, - block_hash, - block_len as u64, - ); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; + let (block_tx3, mut block_rx3) = mpsc::channel::>(1); + let hash_blocks = async { + let mut first_block_hash = None; + while let Some(next) = block_rx2.recv().await { + match next { + Ok(block) => { + let hash = async_blake2sum(block.clone()) + .with_context(Context::current_with_span( + tracer.start("Hash block (blake2)"), + )) + .await; + if first_block_hash.is_none() { + first_block_hash = Some(hash); + } + block_tx3.send(Ok((block, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } } - } + drop(block_tx3); + Ok::<_, mpsc::error::SendError<_>>(first_block_hash.unwrap()) + }; + + let put_blocks = async { + let mut written_bytes = 0u64; + while let Some(next) = block_rx3.recv().await { + let (block, hash) = next?; + let offset = written_bytes; + written_bytes += block.len() as u64; + put_block_and_meta(garage, version, part_number, offset, hash, block).await?; + } + Ok::<_, Error>(written_bytes) + }; + + let (_, stream_hash_result, block_hash_result, final_result) = + futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); - let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize().await; + let total_size = final_result?; + // unwrap here is ok, because if hasher failed, it is because something failed + // later in the pipeline which already caused a return at the ? on previous line + let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); + let first_block_hash = block_hash_result.unwrap(); - let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - Ok((total_size, data_md5sum, data_sha256sum)) + Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) } -async fn put_block_meta( +async fn put_block_and_meta( garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, - size: u64, + block: Bytes, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -384,7 +414,10 @@ async fn put_block_meta( part_number, offset, }, - VersionBlock { hash, size }, + VersionBlock { + hash, + size: block.len() as u64, + }, ); let block_ref = BlockRef { @@ -394,6 +427,7 @@ async fn put_block_meta( }; futures::try_join!( + garage.block_manager.rpc_put_block(hash, block), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; -- cgit v1.2.3