aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs110
1 files changed, 87 insertions, 23 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 37658172..4e85664b 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -1,8 +1,10 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
-use futures::stream::*;
-use hyper::{Body, Request, Response};
+use chrono::{DateTime, NaiveDateTime, Utc};
+use futures::{prelude::*, TryFutureExt};
+use hyper::body::{Body, Bytes};
+use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@@ -14,19 +16,23 @@ use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
+use garage_model::key_table::Key;
use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
use crate::s3_xml;
-use crate::signature::verify_signed_content;
+use crate::signature::streaming::SignedPayloadStream;
+use crate::signature::LONG_DATETIME;
+use crate::signature::{compute_scope, verify_signed_content};
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
key: &str,
- content_sha256: Option<Hash>,
+ api_key: &Key,
+ mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
@@ -40,11 +46,53 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
};
+ let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
+ Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
+ let content_sha256 = content_sha256
+ .take()
+ .ok_or_bad_request("No signature provided")?;
+ Some(content_sha256)
+ }
+ _ => None,
+ };
// Parse body of uploaded file
- let body = req.into_body();
+ let (head, body) = req.into_parts();
+ let body = body.map_err(Error::from);
+
+ let body = if let Some(signature) = payload_seed_signature {
+ let secret_key = &api_key
+ .state
+ .as_option()
+ .ok_or_internal_error("Deleted key state")?
+ .secret_key;
+
+ let date = head
+ .headers
+ .get("x-amz-date")
+ .ok_or_bad_request("Missing X-Amz-Date field")?
+ .to_str()?;
+ let date: NaiveDateTime =
+ NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
+ let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+
+ let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
+ let signing_hmac = crate::signature::signing_hmac(
+ &date,
+ secret_key,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )
+ .ok_or_internal_error("Unable to build signing HMAC")?;
- let mut chunker = BodyChunker::new(body, garage.config.block_size);
+ SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
+ .map_err(Error::from)
+ .boxed()
+ } else {
+ body.boxed()
+ };
+
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
// If body is small enough, store it directly in the object table
@@ -178,13 +226,13 @@ fn ensure_checksum_matches(
Ok(())
}
-async fn read_and_put_blocks(
+async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
- chunker: &mut BodyChunker,
+ chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
@@ -205,8 +253,11 @@ async fn read_and_put_blocks(
.rpc_put_block(first_block_hash, first_block);
loop {
- let (_, _, next_block) =
- futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
+ let (_, _, next_block) = futures::try_join!(
+ put_curr_block.map_err(Error::from),
+ put_curr_version_block.map_err(Error::from),
+ chunker.next(),
+ )?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
@@ -266,32 +317,34 @@ async fn put_block_meta(
Ok(())
}
-struct BodyChunker {
- body: Body,
+struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
+ stream: S,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
-impl BodyChunker {
- fn new(body: Body, block_size: usize) -> Self {
+impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
+ fn new(stream: S, block_size: usize) -> Self {
Self {
- body,
+ stream,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
+
+ async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
while !self.read_all && self.buf.len() < self.block_size {
- if let Some(block) = self.body.next().await {
+ if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(&bytes[..]);
+ self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
+
if self.buf.is_empty() {
Ok(None)
} else if self.buf.len() <= self.block_size {
@@ -368,12 +421,20 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
+
+ let body = req.into_body().map_err(Error::from);
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (object, version, first_block) = futures::try_join!(
- garage.object_table.get(&bucket_id, &key),
- garage.version_table.get(&version_uuid, &EmptyKey),
- chunker.next()
+ garage
+ .object_table
+ .get(&bucket_id, &key)
+ .map_err(Error::from),
+ garage
+ .version_table
+ .get(&version_uuid, &EmptyKey)
+ .map_err(Error::from),
+ chunker.next(),
)?;
// Check object is valid and multipart block can be accepted
@@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
- verify_signed_content(content_sha256, &body[..])?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)