From 5d4b6f2173344d59d59c7f6336c5d21799f8b37d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 19 Sep 2022 12:16:38 +0200 Subject: Faster GetObject workflow for getting entire objects --- src/api/s3/get.rs | 86 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 35 deletions(-) (limited to 'src/api/s3/get.rs') diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ae4c287d..2a99551a 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -2,16 +2,19 @@ use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use futures::stream::*; +use futures::future; +use futures::stream::{self, StreamExt}; use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; use hyper::{Body, Request, Response, StatusCode}; +use tokio::sync::mpsc; use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::object_table::*; @@ -242,43 +245,56 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let order_stream = OrderTag::stream(); - - let read_first_block = garage - .block_manager - .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); - let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + let (tx, rx) = mpsc::channel(2); - let (first_block_stream, version) = - futures::try_join!(read_first_block, get_next_blocks)?; - let version = version.ok_or(Error::NoSuchKey)?; + let order_stream = OrderTag::stream(); + let first_block_hash = *first_block_hash; + let version_uuid = last_v.uuid; + + tokio::spawn(async move { + match async { + let garage2 = garage.clone(); + let version_fut = tokio::spawn(async move { + garage2.version_table.get(&version_uuid, &EmptyKey).await + }); + + let stream_block_0 = garage + .block_manager + .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + .await?; + tx.send(stream_block_0) + .await + .ok_or_message("channel closed")?; + + let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; + for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { + let stream_block_i = garage + .block_manager + .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + .await?; + tx.send(stream_block_i) + .await + .ok_or_message("channel closed")?; + } - let mut blocks = version - .blocks - .items() - .iter() - .map(|(_, vb)| (vb.hash, None)) - .collect::>(); - blocks[0].1 = Some(first_block_stream); - - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (hash, stream_opt))| { - let garage = garage.clone(); - async move { - if let Some(stream) = stream_opt { - stream - } else { - garage - .block_manager - .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - } + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + let _ = tx + .send(Box::pin(stream::once(future::ready(Err(err))))) + .await; } - }) - .buffered(2) - .flatten(); + } + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) -- cgit v1.2.3