aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/admin/api_server.rs7
-rw-r--r--src/api/generic_server.rs6
-rw-r--r--src/api/s3/get.rs86
4 files changed, 62 insertions, 38 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index cdfabcb8..7c3ed43b 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -38,6 +38,7 @@ futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = "0.1"
form_urlencoded = "1.0.0"
http = "0.2"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index fb0078cc..0816bda1 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -34,7 +34,10 @@ pub struct AdminApiServer {
}
impl AdminApiServer {
- pub fn new(garage: Arc<Garage>) -> Self {
+ pub fn new(
+ garage: Arc<Garage>,
+ #[cfg(feature = "metrics")] exporter: PrometheusExporter,
+ ) -> Self {
let cfg = &garage.config.admin;
let metrics_token = cfg
.metrics_token
@@ -47,7 +50,7 @@ impl AdminApiServer {
Self {
garage,
#[cfg(feature = "metrics")]
- exporter: opentelemetry_prometheus::exporter().init(),
+ exporter,
metrics_token,
admin_token,
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index a48be1bc..62fe4e5a 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -174,7 +174,11 @@ impl<A: ApiHandler> ApiServer<A> {
let current_context = Context::current();
let current_span = current_context.span();
- current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
+ current_span.update_name::<String>(format!(
+ "{} API {}",
+ A::API_NAME_DISPLAY,
+ endpoint.name()
+ ));
current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
endpoint.add_span_attributes(current_span);
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index ae4c287d..2a99551a 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -2,16 +2,19 @@
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
-use futures::stream::*;
+use futures::future;
+use futures::stream::{self, StreamExt};
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::{Body, Request, Response, StatusCode};
+use tokio::sync::mpsc;
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
+use garage_util::error::OkOrMessage;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
@@ -242,43 +245,56 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let order_stream = OrderTag::stream();
-
- let read_first_block = garage
- .block_manager
- .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
- let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
+ let (tx, rx) = mpsc::channel(2);
- let (first_block_stream, version) =
- futures::try_join!(read_first_block, get_next_blocks)?;
- let version = version.ok_or(Error::NoSuchKey)?;
+ let order_stream = OrderTag::stream();
+ let first_block_hash = *first_block_hash;
+ let version_uuid = last_v.uuid;
+
+ tokio::spawn(async move {
+ match async {
+ let garage2 = garage.clone();
+ let version_fut = tokio::spawn(async move {
+ garage2.version_table.get(&version_uuid, &EmptyKey).await
+ });
+
+ let stream_block_0 = garage
+ .block_manager
+ .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ .await?;
+ tx.send(stream_block_0)
+ .await
+ .ok_or_message("channel closed")?;
+
+ let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
+ for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
+ let stream_block_i = garage
+ .block_manager
+ .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ tx.send(stream_block_i)
+ .await
+ .ok_or_message("channel closed")?;
+ }
- let mut blocks = version
- .blocks
- .items()
- .iter()
- .map(|(_, vb)| (vb.hash, None))
- .collect::<Vec<_>>();
- blocks[0].1 = Some(first_block_stream);
-
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (hash, stream_opt))| {
- let garage = garage.clone();
- async move {
- if let Some(stream) = stream_opt {
- stream
- } else {
- garage
- .block_manager
- .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- }
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ let _ = tx
+ .send(Box::pin(stream::once(future::ready(Err(err)))))
+ .await;
}
- })
- .buffered(2)
- .flatten();
+ }
+ });
+
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)