diff options
author | Alex <alex@adnab.me> | 2024-02-26 17:52:45 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-02-26 17:52:45 +0000 |
commit | 911a83ea7d06143c5a9621f88020ab6c0850ba54 (patch) | |
tree | 26caedf5ef22e6b299d1e2921d7ef854d5d132d6 /src/api/s3/put.rs | |
parent | 17b55205aa666e78b73d93754574aca83a12193a (diff) | |
parent | b76c0c102ee07758ecf8ae4dfeef0a9a095c4136 (diff) | |
download | garage-911a83ea7d06143c5a9621f88020ab6c0850ba54.tar.gz garage-911a83ea7d06143c5a9621f88020ab6c0850ba54.zip |
Merge pull request 'rewrite read_and_put_block as a series of steps with channels' (#734) from refactor-put into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/734
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 223 |
1 files changed, 153 insertions, 70 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index fdfa567d..489f1136 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,10 +3,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::stream::FuturesOrdered; use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use tokio::sync::mpsc; + use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -17,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -35,6 +39,8 @@ use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; +const PUT_BLOCKS_MAX_PARALLEL: usize = 3; + pub async fn handle_put( garage: Arc<Garage>, req: Request<ReqBody>, @@ -168,17 +174,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; + let (total_size, data_md5sum, data_sha256sum, first_block_hash) = + read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -299,84 +296,164 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + version: &Version, part_number: u64, first_block: Bytes, - first_block_hash: Hash, chunker: &mut StreamChunker<S>, -) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { +) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); - let md5hasher = AsyncHasher::<Md5>::new(); - let sha256hasher = AsyncHasher::<Sha256>::new(); + let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2); + let read_blocks = async { + block_tx.send(Ok(first_block)).await?; + loop { + let res = chunker + .next() + .with_context(Context::current_with_span( + tracer.start("Read block from client"), + )) + .await; + match res { + Ok(Some(block)) => block_tx.send(Ok(block)).await?, + Ok(None) => break, + Err(e) => { + block_tx.send(Err(e)).await?; + break; + } + } + } + drop(block_tx); + Ok::<_, mpsc::error::SendError<_>>(()) + }; - futures::future::join( - md5hasher.update(first_block.clone()), - sha256hasher.update(first_block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash first block (md5, sha256)"), - )) - .await; + let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1); + let hash_stream = async { + let md5hasher = AsyncHasher::<Md5>::new(); + let sha256hasher = AsyncHasher::<Sha256>::new(); + while let Some(next) = block_rx.recv().await { + match next { + Ok(block) => { + block_tx2.send(Ok(block.clone())).await?; + futures::future::join( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256)"), + )) + .await; + } + Err(e) => { + block_tx2.send(Err(e)).await?; + break; + } + } + } + drop(block_tx2); + Ok::<_, mpsc::error::SendError<_>>(futures::join!( + md5hasher.finalize(), + sha256hasher.finalize() + )) + }; - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta( - garage, - version, - part_number, - 0, - first_block_hash, - first_block.len() as u64, - ); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = futures::try_join!( - put_curr_block.map_err(Error::from), - put_curr_version_block.map_err(Error::from), - chunker.next(), - )?; - if let Some(block) = next_block { - let (_, _, block_hash) = futures::future::join3( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - async_blake2sum(block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash block (md5, sha256, blake2)"), - )) - .await; - let block_len = block.len(); - put_curr_version_block = put_block_meta( + let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1); + let hash_blocks = async { + let mut first_block_hash = None; + while let Some(next) = block_rx2.recv().await { + match next { + Ok(block) => { + let hash = async_blake2sum(block.clone()) + .with_context(Context::current_with_span( + tracer.start("Hash block (blake2)"), + )) + .await; + if first_block_hash.is_none() { + first_block_hash = Some(hash); + } + block_tx3.send(Ok((block, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } + } + drop(block_tx3); + Ok::<_, mpsc::error::SendError<_>>(first_block_hash.unwrap()) + }; + + let put_blocks = async { + // Structure for handling several concurrent writes to storage nodes + let order_stream = OrderTag::stream(); + let mut write_futs = FuturesOrdered::new(); + let mut written_bytes = 0u64; + loop { + // Simultaneously write blocks to storage nodes & await for next block to be written + let currently_running = write_futs.len(); + let write_futs_next = async { + if write_futs.is_empty() { + futures::future::pending().await + } else { + write_futs.next().await.unwrap() + } + }; + let recv_next = async { + // If more than a maximum number of writes are in progress, don't add more for now + if currently_running >= PUT_BLOCKS_MAX_PARALLEL { + futures::future::pending().await + } else { + block_rx3.recv().await + } + }; + let (block, hash) = tokio::select! { + result = write_futs_next => { + result?; + continue; + }, + recv = recv_next => match recv { + Some(next) => next?, + None => break, + }, + }; + + // For next block to be written: count its size and spawn future to write it + let offset = written_bytes; + written_bytes += block.len() as u64; + write_futs.push_back(put_block_and_meta( garage, version, part_number, - next_offset as u64, - block_hash, - block_len as u64, - ); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; + offset, + hash, + block, + order_stream.order(written_bytes), + )); } - } + while let Some(res) = write_futs.next().await { + res?; + } + Ok::<_, Error>(written_bytes) + }; + + let (_, stream_hash_result, block_hash_result, final_result) = + futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); - let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize().await; + let total_size = final_result?; + // unwrap here is ok, because if hasher failed, it is because something failed + // later in the pipeline which already caused a return at the ? on previous line + let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); + let first_block_hash = block_hash_result.unwrap(); - let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - Ok((total_size, data_md5sum, data_sha256sum)) + Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) } -async fn put_block_meta( +async fn put_block_and_meta( garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, - size: u64, + block: Bytes, + order_tag: OrderTag, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -384,7 +461,10 @@ async fn put_block_meta( part_number, offset, }, - VersionBlock { hash, size }, + VersionBlock { + hash, + size: block.len() as u64, + }, ); let block_ref = BlockRef { @@ -394,6 +474,9 @@ async fn put_block_meta( }; futures::try_join!( + garage + .block_manager + .rpc_put_block(hash, block, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; |