aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-18 19:27:02 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-18 19:27:02 +0100
commit4348bde180887f5185ca6da6024476e8e8fb2fe6 (patch)
tree8a35f0e4229d15665af7673eebcaa1b3d75a73cb /src/api
parent5b659b28ce6bef15072d2fc93f777aa8ff73b2d8 (diff)
parent4eb16e886388f35d2bdee52b16922421004cf132 (diff)
downloadgarage-4348bde180887f5185ca6da6024476e8e8fb2fe6.tar.gz
garage-4348bde180887f5185ca6da6024476e8e8fb2fe6.zip
Merge branch 'dev-0.2'
Diffstat (limited to 'src/api')
-rw-r--r--src/api/Cargo.toml21
-rw-r--r--src/api/api_server.rs5
-rw-r--r--src/api/error.rs8
-rw-r--r--src/api/s3_copy.rs106
-rw-r--r--src/api/s3_delete.rs11
-rw-r--r--src/api/s3_get.rs12
-rw-r--r--src/api/s3_list.rs8
-rw-r--r--src/api/s3_put.rs207
-rw-r--r--src/api/signature.rs36
9 files changed, 257 insertions, 157 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 45388eff..bce9946e 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -17,26 +17,25 @@ garage_util = { version = "0.1.1", path = "../util" }
garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1.1", path = "../model" }
-err-derive = "0.2.3"
-bytes = "0.4"
-hex = "0.3"
+err-derive = "0.3"
+bytes = "1.0"
+hex = "0.4"
base64 = "0.13"
log = "0.4"
chrono = "0.4"
-md-5 = "0.9.1"
-sha2 = "0.8"
-hmac = "0.7"
-crypto-mac = "0.7"
-rand = "0.7"
+md-5 = "0.9"
+sha2 = "0.9"
+hmac = "0.10"
+crypto-mac = "0.10"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "^0.13.6"
+hyper = "0.14"
url = "2.1"
httpdate = "0.3"
percent-encoding = "2.1.0"
-roxmltree = "0.11"
+roxmltree = "0.14"
http-range = "0.1"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index c6b1d483..bc98686d 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
- Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?)
+ Ok(
+ handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key)
+ .await?,
+ )
} else {
// PutObject query
Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?)
diff --git a/src/api/error.rs b/src/api/error.rs
index a1681fc3..42a7ab10 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -33,7 +33,7 @@ pub enum Error {
InvalidBase64(#[error(source)] base64::DecodeError),
#[error(display = "Invalid XML: {}", _0)]
- InvalidXML(#[error(source)] roxmltree::Error),
+ InvalidXML(String),
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
@@ -45,6 +45,12 @@ pub enum Error {
BadRequest(String),
}
+impl From<roxmltree::Error> for Error {
+ fn from(err: roxmltree::Error) -> Self {
+ Self::InvalidXML(format!("{}", err))
+ }
+}
+
impl Error {
pub fn http_status_code(&self) -> StatusCode {
match self {
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index b6ec48b0..187fe347 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,11 +1,11 @@
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{SecondsFormat, Utc};
-use hyper::{Body, Response};
+use hyper::{Body, Request, Response};
use garage_table::*;
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
@@ -13,9 +13,11 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
+use crate::s3_put::get_headers;
pub async fn handle_copy(
garage: Arc<Garage>,
+ req: &Request<Body>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
@@ -41,17 +43,37 @@ pub async fn handle_copy(
};
let new_uuid = gen_uuid();
- let dest_object_version = ObjectVersion {
- uuid: new_uuid,
- timestamp: now_msec(),
- state: ObjectVersionState::Complete(source_last_state.clone()),
- };
+ let new_timestamp = now_msec();
- match &source_last_state {
+ // Implement x-amz-metadata-directive: REPLACE
+ let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
- ObjectVersionData::Inline(_meta, _bytes) => {
+ ObjectVersionData::Inline(meta, _bytes) => meta,
+ ObjectVersionData::FirstBlock(meta, _fbh) => meta,
+ };
+ let new_meta = match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
+ headers: get_headers(req)?,
+ size: old_meta.size,
+ etag: old_meta.etag.clone(),
+ },
+ _ => old_meta.clone(),
+ };
+
+ // Save object copy
+ match source_last_state {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ new_meta,
+ bytes.clone(),
+ )),
+ };
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
@@ -59,44 +81,84 @@ pub async fn handle_copy(
);
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
+ // Get block list from source version
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NotFound)?;
- let dest_version = Version::new(
- new_uuid,
+ // Write an "uploading" marker in Object table
+ // This holds a reference to the object in the Version table
+ // so that it won't be deleted, e.g. by repair_versions.
+ let tmp_dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Uploading(new_meta.headers.clone()),
+ };
+ let tmp_dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
- false,
- source_version.blocks().to_vec(),
+ vec![tmp_dest_object_version],
);
- let dest_object = Object::new(
+ garage.object_table.insert(&tmp_dest_object).await?;
+
+ // Write version in the version table. Even with empty block list,
+ // this means that the BlockRef entries linked to this version cannot be
+ // marked as deleted (they are marked as deleted only if the Version
+ // doesn't exist or is marked as deleted).
+ let mut dest_version = Version::new(
+ new_uuid,
dest_bucket.to_string(),
dest_key.to_string(),
- vec![dest_object_version],
+ false,
);
+ garage.version_table.insert(&dest_version).await?;
+
+ // Fill in block list for version and insert block refs
+ for (bk, bv) in source_version.blocks.items().iter() {
+ dest_version.blocks.put(*bk, *bv);
+ }
let dest_block_refs = dest_version
- .blocks()
+ .blocks
+ .items()
.iter()
.map(|b| BlockRef {
- block: b.hash,
+ block: b.1.hash,
version: new_uuid,
- deleted: false,
+ deleted: false.into(),
})
.collect::<Vec<_>>();
futures::try_join!(
- garage.object_table.insert(&dest_object),
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
)?;
+
+ // Insert final object
+ // We do this last because otherwise there is a race condition in the case where
+ // the copy call has the same source and destination (this happens, rclone does
+ // it to update the modification timestamp for instance). If we did this concurrently
+ // with the stuff before, the block's reference counts could be decremented before
+ // they are incremented again for the new version, leading to data being deleted.
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ new_meta,
+ *first_block_hash,
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
+ garage.object_table.insert(&dest_object).await?;
}
}
- let now = Utc::now();
- let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let last_modified = msec_to_rfc3339(new_timestamp);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap();
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 4b6a2b18..6abbfc48 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use hyper::{Body, Request, Response};
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -29,16 +30,16 @@ async fn handle_delete_internal(
_ => true,
});
- let mut must_delete = None;
+ let mut version_to_delete = None;
let mut timestamp = now_msec();
for v in interesting_versions {
- if v.timestamp + 1 > timestamp || must_delete.is_none() {
- must_delete = Some(v.uuid);
+ if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
+ version_to_delete = Some(v.uuid);
}
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
}
- let deleted_version = must_delete.ok_or(Error::NotFound)?;
+ let deleted_version = version_to_delete.ok_or(Error::NotFound)?;
let version_uuid = gen_uuid();
@@ -47,7 +48,7 @@ async fn handle_delete_internal(
key.into(),
vec![ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 68e7c66a..22a55b55 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -146,9 +146,10 @@ pub async fn handle_get(
let version = version.ok_or(Error::NotFound)?;
let mut blocks = version
- .blocks()
+ .blocks
+ .items()
.iter()
- .map(|vb| (vb.hash, None))
+ .map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
blocks[0].1 = Some(first_block);
@@ -219,11 +220,12 @@ pub async fn handle_get_range(
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min(
- version.blocks().len(),
- 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize,
+ version.blocks.len(),
+ 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
+ as usize,
));
let mut true_offset = 0;
- for b in version.blocks().iter() {
+ for (_, b) in version.blocks.items().iter() {
if true_offset >= end {
break;
}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 98d774db..4d6c32bc 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
use hyper::{Body, Response};
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -42,7 +42,7 @@ pub fn parse_list_objects_query(
Ok(ListObjectsQuery {
is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false),
bucket: bucket.to_string(),
- delimiter: params.get("delimiter").cloned(),
+ delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(),
max_keys: params
.get("max-keys")
.map(|x| {
@@ -247,9 +247,7 @@ pub async fn handle_list(
}
for (key, info) in result_keys.iter() {
- let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0);
- let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc);
- let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true);
+ let last_modif = msec_to_rfc3339(info.last_modified);
writeln!(&mut xml, "\t<Contents>").unwrap();
writeln!(
&mut xml,
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index ec599a05..c4e3b818 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -5,11 +5,12 @@ use std::sync::Arc;
use futures::stream::*;
use hyper::{Body, Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
-use sha2::{Digest as Sha256Digest, Sha256};
+use sha2::Sha256;
use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
@@ -52,14 +53,14 @@ pub async fn handle_put(
if first_block.len() < INLINE_THRESHOLD {
let mut md5sum = Md5::new();
md5sum.update(&first_block[..]);
- let md5sum_arr = md5sum.finalize();
- let md5sum_hex = hex::encode(md5sum_arr);
+ let data_md5sum = md5sum.finalize();
+ let data_md5sum_hex = hex::encode(data_md5sum);
- let sha256sum_hash = sha256sum(&first_block[..]);
+ let data_sha256sum = sha256sum(&first_block[..]);
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum_hash,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
@@ -71,7 +72,7 @@ pub async fn handle_put(
ObjectVersionMeta {
headers,
size: first_block.len() as u64,
- etag: md5sum_hex.clone(),
+ etag: data_md5sum_hex.clone(),
},
first_block,
)),
@@ -80,41 +81,45 @@ pub async fn handle_put(
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- return Ok(put_response(version_uuid, md5sum_hex));
+ return Ok(put_response(version_uuid, data_md5sum_hex));
}
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
- let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
- let first_block_hash = sha256sum(&first_block[..]);
+ // Write this entry now, even with empty block list,
+ // to prevent block_ref entries from being deleted (they can be deleted
+ // if the reference a version that isn't found in the version table)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
+ let first_block_hash = blake2sum(&first_block[..]);
let tx_result = read_and_put_blocks(
&garage,
- version,
+ &version,
1,
first_block,
first_block_hash,
&mut chunker,
)
.await
- .and_then(|(total_size, md5sum_arr, sha256sum)| {
+ .and_then(|(total_size, data_md5sum, data_sha256sum)| {
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)
- .map(|()| (total_size, md5sum_arr))
+ .map(|()| (total_size, data_md5sum))
});
// If something went wrong, clean up
@@ -148,13 +153,13 @@ pub async fn handle_put(
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
- md5sum: &[u8],
- sha256sum: garage_util::data::FixedBytes32,
+ data_md5sum: &[u8],
+ data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
- if expected_sha256 != sha256sum {
+ if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
@@ -163,7 +168,7 @@ fn ensure_checksum_matches(
}
}
if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != base64::encode(md5sum) {
+ if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
@@ -173,8 +178,8 @@ fn ensure_checksum_matches(
}
async fn read_and_put_blocks(
- garage: &Arc<Garage>,
- version: Version,
+ garage: &Garage,
+ version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
@@ -183,11 +188,11 @@ async fn read_and_put_blocks(
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
- sha256hasher.input(&first_block[..]);
+ sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
0,
@@ -203,11 +208,11 @@ async fn read_and_put_blocks(
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
- sha256hasher.input(&block[..]);
- let block_hash = sha256sum(&block[..]);
+ sha256hasher.update(&block[..]);
+ let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
next_offset as u64,
@@ -222,39 +227,35 @@ async fn read_and_put_blocks(
}
let total_size = next_offset as u64;
- let md5sum_arr = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize();
- let sha256sum_arr = sha256hasher.result();
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&sha256sum_arr[..]);
- let sha256sum_arr = Hash::from(hash);
+ let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
- Ok((total_size, md5sum_arr, sha256sum_arr))
+ Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
- garage: Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
- // TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
- version
- .add_block(VersionBlock {
+ version.blocks.put(
+ VersionBlockKey {
part_number,
offset,
- hash,
- size,
- })
- .unwrap();
+ },
+ VersionBlock { hash, size },
+ );
let block_ref = BlockRef {
block: hash,
version: version.uuid,
- deleted: false,
+ deleted: false.into(),
};
futures::try_join!(
@@ -319,6 +320,7 @@ pub async fn handle_create_multipart_upload(
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
+ // Create object in object table
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
@@ -327,6 +329,14 @@ pub async fn handle_create_multipart_upload(
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // Insert empty version so that block_ref entries refer to something
+ // (they are inserted concurrently with blocks in the version table, so
+ // there is the possibility that they are inserted before the version table
+ // is created, in which case it is allowed to delete them, e.g. in repair_*)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Send success response
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -389,11 +399,11 @@ pub async fn handle_put_part(
}
// Copy block to store
- let version = Version::new(version_uuid, bucket, key, false, vec![]);
- let first_block_hash = sha256sum(&first_block[..]);
- let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
+ let version = Version::new(version_uuid, bucket, key, false);
+ let first_block_hash = blake2sum(&first_block[..]);
+ let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
- version,
+ &version,
part_number,
first_block,
first_block_hash,
@@ -401,15 +411,24 @@ pub async fn handle_put_part(
)
.await?;
+ // Verify that checksums map
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
+ // Store part etag in version
+ let data_md5sum_hex = hex::encode(data_md5sum);
+ let mut version = version;
+ version
+ .parts_etags
+ .put(part_number, data_md5sum_hex.clone());
+ garage.version_table.insert(&version).await?;
+
let response = Response::builder()
- .header("ETag", format!("\"{}\"", hex::encode(md5sum_arr)))
+ .header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![]))
.unwrap();
Ok(response)
@@ -444,17 +463,15 @@ pub async fn handle_complete_multipart_upload(
)?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
- let object_version = object
+ let mut object_version = object
.versions()
.iter()
- .find(|v| v.uuid == version_uuid && v.is_uploading());
- let mut object_version = match object_version {
- None => return Err(Error::NotFound),
- Some(x) => x.clone(),
- };
+ .find(|v| v.uuid == version_uuid && v.is_uploading())
+ .cloned()
+ .ok_or(Error::BadRequest(format!("Version not found")))?;
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
- if version.blocks().len() == 0 {
+ if version.blocks.len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded")));
}
@@ -464,53 +481,50 @@ pub async fn handle_complete_multipart_upload(
};
// Check that the list of parts they gave us corresponds to the parts we have here
- // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
- let mut parts = version
- .blocks()
+ debug!("Expected parts from request: {:?}", body_list_of_parts);
+ debug!("Parts stored in version: {:?}", version.parts_etags.items());
+ let parts = version
+ .parts_etags
+ .items()
.iter()
- .map(|x| x.part_number)
- .collect::<Vec<_>>();
- parts.dedup();
+ .map(|pair| (&pair.0, &pair.1));
let same_parts = body_list_of_parts
.iter()
- .map(|x| &x.part_number)
- .eq(parts.iter());
+ .map(|x| (&x.part_number, &x.etag))
+ .eq(parts);
if !same_parts {
return Err(Error::BadRequest(format!("We don't have the same parts")));
}
- // ETag calculation: we produce ETags that have the same form as
- // those of S3 multipart uploads, but we don't use their actual
- // calculation for the first part (we use random bytes). This
- // shouldn't impact compatibility as the S3 docs specify that
- // the ETag is an opaque value in case of a multipart upload.
- // See also: https://teppen.io/2018/06/23/aws_s3_etags/
- let num_parts = version.blocks().last().unwrap().part_number
- - version.blocks().first().unwrap().part_number
+ // Calculate etag of final object
+ // To understand how etags are calculated, read more here:
+ // https://teppen.io/2018/06/23/aws_s3_etags/
+ let num_parts = version.blocks.items().last().unwrap().0.part_number
+ - version.blocks.items().first().unwrap().0.part_number
+ 1;
- let etag = format!(
- "{}-{}",
- hex::encode(&rand::random::<[u8; 16]>()[..]),
- num_parts
- );
+ let mut etag_md5_hasher = Md5::new();
+ for (_, etag) in version.parts_etags.items().iter() {
+ etag_md5_hasher.update(etag.as_bytes());
+ }
+ let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
- let total_size = version
- .blocks()
- .iter()
- .map(|x| x.size)
- .fold(0, |x, y| x + y);
+ // Calculate total size of final object
+ let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
+
+ // Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
size: total_size,
- etag: etag,
+ etag,
},
- version.blocks()[0].hash,
+ version.blocks.items()[0].1.hash,
));
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
+ // Send response saying ok we're done
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -570,17 +584,19 @@ fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
.to_string())
}
-fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
+pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
- let other_headers = vec![
+ let mut other = BTreeMap::new();
+
+ // Preserve standard headers
+ let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
- let mut other = BTreeMap::new();
- for h in other_headers.iter() {
+ for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
@@ -592,6 +608,21 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
}
}
}
+
+ // Preserve x-amz-meta- headers
+ for (k, v) in req.headers().iter() {
+ if k.as_str().starts_with("x-amz-meta-") {
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(k.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", k, e);
+ }
+ }
+ }
+ }
+
Ok(ObjectVersionHeaders {
content_type,
other,
diff --git a/src/api/signature.rs b/src/api/signature.rs
index d7fbd3f7..6dc69afa 100644
--- a/src/api/signature.rs
+++ b/src/api/signature.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
-use hmac::{Hmac, Mac};
+use hmac::{Hmac, Mac, NewMac};
use hyper::{Body, Method, Request};
use sha2::{Digest, Sha256};
@@ -91,8 +91,8 @@ pub async fn check_signature(
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
- hmac.input(string_to_sign.as_bytes());
- let signature = hex::encode(hmac.result().code());
+ hmac.update(string_to_sign.as_bytes());
+ let signature = hex::encode(hmac.finalize().into_bytes());
if authorization.signature != signature {
trace!("Canonical request: ``{}``", canonical_request);
@@ -106,12 +106,10 @@ pub async fn check_signature(
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
- let mut hash = [0u8; 32];
- if bytes.len() != 32 {
- return Err(Error::BadRequest(format!("Invalid content sha256 hash")));
- }
- hash.copy_from_slice(&bytes[..]);
- Some(Hash::from(hash))
+ Some(
+ Hash::try_from(&bytes[..])
+ .ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?,
+ )
};
Ok((key, content_sha256))
@@ -220,12 +218,12 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> {
fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String {
let mut hasher = Sha256::default();
- hasher.input(canonical_req.as_bytes());
+ hasher.update(canonical_req.as_bytes());
[
"AWS4-HMAC-SHA256",
&datetime.format(LONG_DATETIME).to_string(),
scope_string,
- &hex::encode(hasher.result().as_slice()),
+ &hex::encode(hasher.finalize().as_slice()),
]
.join("\n")
}
@@ -238,14 +236,14 @@ fn signing_hmac(
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
let secret = String::from("AWS4") + secret_key;
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
- date_hmac.input(datetime.format(SHORT_DATE).to_string().as_bytes());
- let mut region_hmac = HmacSha256::new_varkey(&date_hmac.result().code())?;
- region_hmac.input(region.as_bytes());
- let mut service_hmac = HmacSha256::new_varkey(&region_hmac.result().code())?;
- service_hmac.input(service.as_bytes());
- let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.result().code())?;
- signing_hmac.input(b"aws4_request");
- let hmac = HmacSha256::new_varkey(&signing_hmac.result().code())?;
+ date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
+ let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
+ region_hmac.update(region.as_bytes());
+ let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
+ service_hmac.update(service.as_bytes());
+ let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
+ signing_hmac.update(b"aws4_request");
+ let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}