aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--src/web/Cargo.toml2
-rw-r--r--src/web/web_server.rs111
3 files changed, 109 insertions, 5 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9e0a1bb0..a36cdf83 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -660,6 +660,7 @@ dependencies = [
"garage_util 0.1.0",
"hex",
"http",
+ "httpdate",
"hyper",
"idna",
"log",
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 252ee58d..819b51c1 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -42,3 +42,5 @@ webpki = "0.21"
roxmltree = "0.11"
idna = "0.2"
+
+httpdate = "0.3"
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 7172f222..8a222738 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -2,17 +2,23 @@ use std::borrow::Cow;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
+use std::time::{Duration, UNIX_EPOCH};
use futures::future::Future;
+use futures::stream::*;
-use hyper::header::HOST;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Request, Response, Server};
+use hyper::{
+ header::HOST,
+ body::Bytes,
+ server::conn::AddrStream,
+ service::{make_service_fn, service_fn},
+ Body, Request, Response, Server, StatusCode};
use idna::domain_to_unicode;
use garage_model::garage::Garage;
+use garage_model::object_table::*;
+use garage_table::EmptyKey;
use garage_util::error::Error as GarageError;
use crate::error::*;
@@ -90,7 +96,102 @@ async fn serve_file(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<
.await?
.ok_or(Error::NotFound)?;
- Ok(Response::new(Body::from("hello world\n")))
+ // Get last complete version descriptor
+ let last_v = object
+ .versions()
+ .iter()
+ .rev()
+ .filter(|v| v.is_complete())
+ .next()
+ .ok_or(Error::NotFound)?;
+
+ // Unwrap version
+ let last_v_data = match &last_v.state {
+ ObjectVersionState::Complete(x) => x,
+ _ => unreachable!(),
+ };
+
+ // Get metadata from version
+ let last_v_meta = match last_v_data {
+ ObjectVersionData::DeleteMarker => return Err(Error::NotFound),
+ ObjectVersionData::Inline(meta, _) => meta,
+ ObjectVersionData::FirstBlock(meta, _) => meta,
+ };
+
+ // @FIXME Support range
+
+
+ // Set headers
+ let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK);
+
+
+ // Stream body
+ match &last_v_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_, bytes) => {
+ let body: Body = Body::from(bytes.to_vec());
+ Ok(resp_builder.body(body)?)
+ }
+ ObjectVersionData::FirstBlock(_, first_block_hash) => {
+ let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
+ let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
+
+ let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
+ let version = version.ok_or(Error::NotFound)?;
+
+ let mut blocks = version
+ .blocks()
+ .iter()
+ .map(|vb| (vb.hash, None))
+ .collect::<Vec<_>>();
+ blocks[0].1 = Some(first_block);
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |(hash, data_opt)| {
+ let garage = garage.clone();
+ async move {
+ if let Some(data) = data_opt {
+ Ok(Bytes::from(data))
+ } else {
+ garage
+ .block_manager
+ .rpc_get_block(&hash)
+ .await
+ .map(Bytes::from)
+ }
+ }
+ })
+ .buffered(2);
+ //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
+ let body = hyper::body::Body::wrap_stream(body_stream);
+ Ok(resp_builder.body(body)?)
+ }
+ }
+}
+
+// Copied from api/s3_get.rs
+fn object_headers(
+ version: &ObjectVersion,
+ version_meta: &ObjectVersionMeta,
+) -> http::response::Builder {
+ let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
+ let date_str = httpdate::fmt_http_date(date);
+
+ let mut resp = Response::builder()
+ .header(
+ "Content-Type",
+ version_meta.headers.content_type.to_string(),
+ )
+ .header("Content-Length", format!("{}", version_meta.size))
+ .header("ETag", version_meta.etag.to_string())
+ .header("Last-Modified", date_str)
+ .header("Accept-Ranges", format!("bytes"));
+
+ for (k, v) in version_meta.headers.other.iter() {
+ resp = resp.header(k, v.to_string());
+ }
+
+ resp
}
/// Extract host from the authority section given by the HTTP host header