aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs78
1 files changed, 63 insertions, 15 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index fdc188c9..07ab5a03 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -2,9 +2,10 @@ use std::collections::{BTreeMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc;
-use md5::{Md5, Digest};
use futures::stream::*;
use hyper::{Body, Request, Response};
+use md5::{Digest as Md5Digest, Md5};
+use sha2::{Digest as Sha256Digest, Sha256};
use garage_table::*;
use garage_util::data::*;
@@ -23,9 +24,14 @@ pub async fn handle_put(
req: Request<Body>,
bucket: &str,
key: &str,
+ content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
let headers = get_headers(&req)?;
+ let content_md5 = match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ };
let body = req.into_body();
@@ -42,9 +48,9 @@ pub async fn handle_put(
};
if first_block.len() < INLINE_THRESHOLD {
- let mut md5sum = Md5::new();
- md5sum.update(&first_block[..]);
- let etag = hex::encode(md5sum.finalize());
+ let mut md5sum = Md5::new();
+ md5sum.update(&first_block[..]);
+ let etag = hex::encode(md5sum.finalize());
object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
@@ -66,7 +72,7 @@ pub async fn handle_put(
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
- let (total_size, etag) = read_and_put_blocks(
+ let (total_size, md5sum, sha256sum) = read_and_put_blocks(
&garage,
version,
1,
@@ -76,13 +82,26 @@ pub async fn handle_put(
)
.await?;
+ if let Some(expected_sha256) = content_sha256 {
+ if expected_sha256 != sha256sum {
+ return Err(Error::Message(format!(
+ "Unable to validate x-amz-content-sha256"
+ )));
+ }
+ }
+ if let Some(expected_md5) = content_md5 {
+ if expected_md5.trim_matches('"') != md5sum {
+ return Err(Error::Message(format!("Unable to validate content-md5")));
+ }
+ }
+
// TODO: if at any step we have an error, we should undo everything we did
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
size: total_size,
- etag: etag.clone(),
+ etag: md5sum.clone(),
},
first_block_hash,
));
@@ -90,7 +109,7 @@ pub async fn handle_put(
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- Ok(put_response(version_uuid, etag))
+ Ok(put_response(version_uuid, md5sum))
}
async fn read_and_put_blocks(
@@ -100,9 +119,11 @@ async fn read_and_put_blocks(
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
-) -> Result<(u64, String), Error> {
- let mut md5sum = Md5::new();
- md5sum.update(&first_block[..]);
+) -> Result<(u64, String, Hash), Error> {
+ let mut md5sum = Md5::new();
+ let mut sha256sum = Sha256::new();
+ md5sum.update(&first_block[..]);
+ sha256sum.input(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
@@ -121,7 +142,8 @@ async fn read_and_put_blocks(
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
- md5sum.update(&block[..]);
+ md5sum.update(&block[..]);
+ sha256sum.input(&block[..]);
let block_hash = hash(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
@@ -139,9 +161,15 @@ async fn read_and_put_blocks(
}
}
- let total_size = next_offset as u64;
- let md5sum = hex::encode(md5sum.finalize());
- Ok((total_size, md5sum))
+ let total_size = next_offset as u64;
+ let md5sum = hex::encode(md5sum.finalize());
+
+ let sha256sum = sha256sum.result();
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&sha256sum[..]);
+ let sha256sum = Hash::from(hash);
+
+ Ok((total_size, md5sum, sha256sum))
}
async fn put_block_meta(
@@ -267,6 +295,7 @@ pub async fn handle_put_part(
key: &str,
part_number_str: &str,
upload_id: &str,
+ content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Check parameters
let part_number = part_number_str
@@ -276,6 +305,11 @@ pub async fn handle_put_part(
let version_uuid =
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
+ let content_md5 = match req.headers().get("content-md5") {
+ Some(x) => Some(x.to_str()?.to_string()),
+ None => None,
+ };
+
// Read first chuck, and at the same time try to get object to see if it exists
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
@@ -307,7 +341,7 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]);
- read_and_put_blocks(
+ let (_, md5sum, sha256sum) = read_and_put_blocks(
&garage,
version,
part_number,
@@ -317,6 +351,20 @@ pub async fn handle_put_part(
)
.await?;
+ if let Some(expected_md5) = content_md5 {
+ if expected_md5.trim_matches('"') != md5sum {
+ return Err(Error::Message(format!("Unable to validate content-md5")));
+ }
+ }
+
+ if let Some(expected_sha256) = content_sha256 {
+ if expected_sha256 != sha256sum {
+ return Err(Error::Message(format!(
+ "Unable to validate x-amz-content-sha256"
+ )));
+ }
+ }
+
Ok(Response::new(Body::from(vec![])))
}