diff options
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 54 |
1 files changed, 38 insertions, 16 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index a6863cd3..5735fd10 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, Utc}; use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; +use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -34,12 +35,8 @@ pub async fn handle_put( api_key: &Key, mut content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { - // Generate identity of new version - let version_uuid = gen_uuid(); - let version_timestamp = now_msec(); - // Retrieve interesting headers from request - let headers = get_headers(&req)?; + let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); let content_md5 = match req.headers().get("content-md5") { @@ -92,6 +89,32 @@ pub async fn handle_put( body.boxed() }; + save_stream( + garage, + headers, + body, + bucket_id, + key, + content_md5, + content_sha256, + ) + .await + .map(|(uuid, md5)| put_response(uuid, md5)) +} + +pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( + garage: Arc<Garage>, + headers: ObjectVersionHeaders, + body: S, + bucket_id: Uuid, + key: &str, + content_md5: Option<String>, + content_sha256: Option<FixedBytes32>, +) -> Result<(Uuid, String), Error> { + // Generate identity of new version + let version_uuid = gen_uuid(); + let version_timestamp = now_msec(); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); @@ -128,7 +151,7 @@ pub async fn handle_put( let object = Object::new(bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - return Ok(put_response(version_uuid, data_md5sum_hex)); + return Ok((version_uuid, data_md5sum_hex)); } // Write version identifier in object table so that we have a trace @@ -194,7 +217,7 @@ pub async fn handle_put( let object = Object::new(bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - Ok(put_response(version_uuid, md5sum_hex)) + Ok((version_uuid, md5sum_hex)) } /// Validate MD5 sum against content-md5 header @@ -373,7 +396,7 @@ pub async fn handle_create_multipart_upload( key: &str, ) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); - let headers = get_headers(req)?; + let headers = get_headers(req.headers())?; // Create object in object table let object_version = ObjectVersion { @@ -490,7 +513,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::from(vec![])) + .body(Body::empty()) .unwrap(); Ok(response) } @@ -672,17 +695,16 @@ pub async fn handle_abort_multipart_upload( Ok(Response::new(Body::from(vec![]))) } -fn get_mime_type(req: &Request<Body>) -> Result<String, Error> { - Ok(req - .headers() +fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { + Ok(headers .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) .unwrap_or(Ok("blob"))? .to_string()) } -pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { - let content_type = get_mime_type(req)?; +pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> { + let content_type = get_mime_type(headers)?; let mut other = BTreeMap::new(); // Preserve standard headers @@ -694,7 +716,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E hyper::header::EXPIRES, ]; for h in standard_header.iter() { - if let Some(v) = req.headers().get(h) { + if let Some(v) = headers.get(h) { match v.to_str() { Ok(v_str) => { other.insert(h.to_string(), v_str.to_string()); @@ -707,7 +729,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E } // Preserve x-amz-meta- headers - for (k, v) in req.headers().iter() { + for (k, v) in headers.iter() { if k.as_str().starts_with("x-amz-meta-") { match v.to_str() { Ok(v_str) => { |