aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/get.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/get.rs')
-rw-r--r--src/api/s3/get.rs86
1 files changed, 51 insertions, 35 deletions
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index ae4c287d..2a99551a 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -2,16 +2,19 @@
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
-use futures::stream::*;
+use futures::future;
+use futures::stream::{self, StreamExt};
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::{Body, Request, Response, StatusCode};
+use tokio::sync::mpsc;
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
+use garage_util::error::OkOrMessage;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
@@ -242,43 +245,56 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let order_stream = OrderTag::stream();
-
- let read_first_block = garage
- .block_manager
- .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
- let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
+ let (tx, rx) = mpsc::channel(2);
- let (first_block_stream, version) =
- futures::try_join!(read_first_block, get_next_blocks)?;
- let version = version.ok_or(Error::NoSuchKey)?;
+ let order_stream = OrderTag::stream();
+ let first_block_hash = *first_block_hash;
+ let version_uuid = last_v.uuid;
+
+ tokio::spawn(async move {
+ match async {
+ let garage2 = garage.clone();
+ let version_fut = tokio::spawn(async move {
+ garage2.version_table.get(&version_uuid, &EmptyKey).await
+ });
+
+ let stream_block_0 = garage
+ .block_manager
+ .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ .await?;
+ tx.send(stream_block_0)
+ .await
+ .ok_or_message("channel closed")?;
+
+ let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
+ for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
+ let stream_block_i = garage
+ .block_manager
+ .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ tx.send(stream_block_i)
+ .await
+ .ok_or_message("channel closed")?;
+ }
- let mut blocks = version
- .blocks
- .items()
- .iter()
- .map(|(_, vb)| (vb.hash, None))
- .collect::<Vec<_>>();
- blocks[0].1 = Some(first_block_stream);
-
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (hash, stream_opt))| {
- let garage = garage.clone();
- async move {
- if let Some(stream) = stream_opt {
- stream
- } else {
- garage
- .block_manager
- .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- }
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ let _ = tx
+ .send(Box::pin(stream::once(future::ready(Err(err)))))
+ .await;
}
- })
- .buffered(2)
- .flatten();
+ }
+ });
+
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)