diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-19 12:16:38 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-19 12:19:59 +0200 |
commit | 5d4b6f2173344d59d59c7f6336c5d21799f8b37d (patch) | |
tree | 35821ba7b033b32679f84c7c67b891596126fa2b /src/api | |
parent | 4fba06d62e94eb572894ba4df45d6b50a64a662c (diff) | |
download | garage-faster-get.tar.gz garage-faster-get.zip |
Faster GetObject workflow for getting entire objectsfaster-get
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 1 | ||||
-rw-r--r-- | src/api/s3/get.rs | 86 |
2 files changed, 52 insertions, 35 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index cdfabcb8..7c3ed43b 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -38,6 +38,7 @@ futures = "0.3" futures-util = "0.3" pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-stream = "0.1" form_urlencoded = "1.0.0" http = "0.2" diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ae4c287d..2a99551a 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -2,16 +2,19 @@ use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; -use futures::stream::*; +use futures::future; +use futures::stream::{self, StreamExt}; use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; use hyper::{Body, Request, Response, StatusCode}; +use tokio::sync::mpsc; use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; +use garage_util::error::OkOrMessage; use garage_model::garage::Garage; use garage_model::s3::object_table::*; @@ -242,43 +245,56 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let order_stream = OrderTag::stream(); - - let read_first_block = garage - .block_manager - .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); - let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + let (tx, rx) = mpsc::channel(2); - let (first_block_stream, version) = - futures::try_join!(read_first_block, get_next_blocks)?; - let version = version.ok_or(Error::NoSuchKey)?; + let order_stream = OrderTag::stream(); + let first_block_hash = *first_block_hash; + let version_uuid = last_v.uuid; + + tokio::spawn(async move { + match async { + let garage2 = garage.clone(); + let version_fut = tokio::spawn(async move { + garage2.version_table.get(&version_uuid, &EmptyKey).await + }); + + let stream_block_0 = garage + .block_manager + .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) + .await?; + tx.send(stream_block_0) + .await + .ok_or_message("channel closed")?; + + let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; + for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { + let stream_block_i = garage + .block_manager + .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) + .await?; + tx.send(stream_block_i) + .await + .ok_or_message("channel closed")?; + } - let mut blocks = version - .blocks - .items() - .iter() - .map(|(_, vb)| (vb.hash, None)) - .collect::<Vec<_>>(); - blocks[0].1 = Some(first_block_stream); - - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (hash, stream_opt))| { - let garage = garage.clone(); - async move { - if let Some(stream) = stream_opt { - stream - } else { - garage - .block_manager - .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - } + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + let _ = tx + .send(Box::pin(stream::once(future::ready(Err(err))))) + .await; } - }) - .buffered(2) - .flatten(); + } + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) |