diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/s3/put.rs | 49 |
1 files changed, 46 insertions, 3 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 557d1e5f..10a018e4 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::stream::FuturesOrdered; use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -37,6 +38,8 @@ use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; +const PUT_BLOCKS_MAX_PARALLEL: usize = 3; + pub async fn handle_put( garage: Arc<Garage>, req: Request<ReqBody>, @@ -376,12 +379,52 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + }; let put_blocks = async { + // Structure for handling several concurrent writes to storage nodes + let mut write_futs = FuturesOrdered::new(); let mut written_bytes = 0u64; - while let Some(next) = block_rx3.recv().await { - let (block, hash) = next?; + loop { + // Simultaneously write blocks to storage nodes & await for next block to be written + let currently_running = write_futs.len(); + let write_futs_next = async { + if write_futs.is_empty() { + futures::future::pending().await + } else { + write_futs.next().await.unwrap() + } + }; + let recv_next = async { + // If more than a maximum number of writes are in progress, don't add more for now + if currently_running >= PUT_BLOCKS_MAX_PARALLEL { + futures::future::pending().await + } else { + block_rx3.recv().await + } + }; + let (block, hash) = tokio::select! { + result = write_futs_next => { + result?; + continue; + }, + recv = recv_next => match recv { + Some(next) => next?, + None => break, + }, + }; + + // For next block to be written: count its size and spawn future to write it let offset = written_bytes; written_bytes += block.len() as u64; - put_block_and_meta(garage, version, part_number, offset, hash, block).await?; + write_futs.push_back(put_block_and_meta( + garage, + version, + part_number, + offset, + hash, + block, + )); + } + while let Some(res) = write_futs.next().await { + res?; } Ok::<_, Error>(written_bytes) }; |