diff options
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 110 |
1 files changed, 87 insertions, 23 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..4e85664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,10 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use futures::{prelude::*, TryFutureExt}; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -14,19 +16,23 @@ use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; -use crate::signature::verify_signed_content; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::LONG_DATETIME; +use crate::signature::{compute_scope, verify_signed_content}; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, bucket_id: Uuid, key: &str, - content_sha256: Option<Hash>, + api_key: &Key, + mut content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -40,11 +46,53 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file - let body = req.into_body(); + let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); + + let body = if let Some(signature) = payload_seed_signature { + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = head + .headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, &garage.config.s3_api.s3_region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; - let mut chunker = BodyChunker::new(body, garage.config.block_size); + SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)? + .map_err(Error::from) + .boxed() + } else { + body.boxed() + }; + + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +226,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec<u8>, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +253,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,32 +317,34 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque<u8>, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { + + async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +421,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted @@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload( content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml) |