aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-26 18:21:17 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-26 18:22:37 +0100
commitbabccd2ad39c0a626d82521b2d4128ec6f194814 (patch)
tree851b1481525d2941492cef98adce455653370355 /src/api
parent3fe94cc14fa6e2ec2f749afc3b2c4f4f0fa621fc (diff)
downloadgarage-babccd2ad39c0a626d82521b2d4128ec6f194814.tar.gz
garage-babccd2ad39c0a626d82521b2d4128ec6f194814.zip
[refactor-put] send several blocks in parallel to storage nodes
Diffstat (limited to 'src/api')
-rw-r--r--src/api/s3/put.rs49
1 files changed, 46 insertions, 3 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 557d1e5f..10a018e4 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
+use futures::stream::FuturesOrdered;
use futures::try_join;
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@@ -37,6 +38,8 @@ use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
+const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
+
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<ReqBody>,
@@ -376,12 +379,52 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
let put_blocks = async {
+ // Structure for handling several concurrent writes to storage nodes
+ let mut write_futs = FuturesOrdered::new();
let mut written_bytes = 0u64;
- while let Some(next) = block_rx3.recv().await {
- let (block, hash) = next?;
+ loop {
+ // Simultaneously write blocks to storage nodes & await for next block to be written
+ let currently_running = write_futs.len();
+ let write_futs_next = async {
+ if write_futs.is_empty() {
+ futures::future::pending().await
+ } else {
+ write_futs.next().await.unwrap()
+ }
+ };
+ let recv_next = async {
+ // If more than a maximum number of writes are in progress, don't add more for now
+ if currently_running >= PUT_BLOCKS_MAX_PARALLEL {
+ futures::future::pending().await
+ } else {
+ block_rx3.recv().await
+ }
+ };
+ let (block, hash) = tokio::select! {
+ result = write_futs_next => {
+ result?;
+ continue;
+ },
+ recv = recv_next => match recv {
+ Some(next) => next?,
+ None => break,
+ },
+ };
+
+ // For next block to be written: count its size and spawn future to write it
let offset = written_bytes;
written_bytes += block.len() as u64;
- put_block_and_meta(garage, version, part_number, offset, hash, block).await?;
+ write_futs.push_back(put_block_and_meta(
+ garage,
+ version,
+ part_number,
+ offset,
+ hash,
+ block,
+ ));
+ }
+ while let Some(res) = write_futs.next().await {
+ res?;
}
Ok::<_, Error>(written_bytes)
};