aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs54
1 files changed, 38 insertions, 16 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index a6863cd3..5735fd10 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes};
+use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@@ -34,12 +35,8 @@ pub async fn handle_put(
api_key: &Key,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
- // Generate identity of new version
- let version_uuid = gen_uuid();
- let version_timestamp = now_msec();
-
// Retrieve interesting headers from request
- let headers = get_headers(&req)?;
+ let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
let content_md5 = match req.headers().get("content-md5") {
@@ -92,6 +89,32 @@ pub async fn handle_put(
body.boxed()
};
+ save_stream(
+ garage,
+ headers,
+ body,
+ bucket_id,
+ key,
+ content_md5,
+ content_sha256,
+ )
+ .await
+ .map(|(uuid, md5)| put_response(uuid, md5))
+}
+
+pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
+ garage: Arc<Garage>,
+ headers: ObjectVersionHeaders,
+ body: S,
+ bucket_id: Uuid,
+ key: &str,
+ content_md5: Option<String>,
+ content_sha256: Option<FixedBytes32>,
+) -> Result<(Uuid, String), Error> {
+ // Generate identity of new version
+ let version_uuid = gen_uuid();
+ let version_timestamp = now_msec();
+
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
@@ -128,7 +151,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- return Ok(put_response(version_uuid, data_md5sum_hex));
+ return Ok((version_uuid, data_md5sum_hex));
}
// Write version identifier in object table so that we have a trace
@@ -194,7 +217,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- Ok(put_response(version_uuid, md5sum_hex))
+ Ok((version_uuid, md5sum_hex))
}
/// Validate MD5 sum against content-md5 header
@@ -373,7 +396,7 @@ pub async fn handle_create_multipart_upload(
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
- let headers = get_headers(req)?;
+ let headers = get_headers(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@@ -490,7 +513,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
- .body(Body::from(vec![]))
+ .body(Body::empty())
.unwrap();
Ok(response)
}
@@ -672,17 +695,16 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
-fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
- Ok(req
- .headers()
+fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
+ Ok(headers
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
-pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
- let content_type = get_mime_type(req)?;
+pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
+ let content_type = get_mime_type(headers)?;
let mut other = BTreeMap::new();
// Preserve standard headers
@@ -694,7 +716,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
- if let Some(v) = req.headers().get(h) {
+ if let Some(v) = headers.get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
@@ -707,7 +729,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
}
// Preserve x-amz-meta- headers
- for (k, v) in req.headers().iter() {
+ for (k, v) in headers.iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {