diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 37 | ||||
-rw-r--r-- | src/api/api_server.rs | 7 | ||||
-rw-r--r-- | src/api/error.rs | 8 | ||||
-rw-r--r-- | src/api/s3_copy.rs | 106 | ||||
-rw-r--r-- | src/api/s3_delete.rs | 11 | ||||
-rw-r--r-- | src/api/s3_get.rs | 62 | ||||
-rw-r--r-- | src/api/s3_list.rs | 8 | ||||
-rw-r--r-- | src/api/s3_put.rs | 207 | ||||
-rw-r--r-- | src/api/signature.rs | 36 |
9 files changed, 315 insertions, 167 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 45388eff..0b824ca3 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "garage_api" -version = "0.1.1" +version = "0.2.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" -license = "GPL-3.0" +license = "AGPL-3.0" description = "S3 API server crate for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" @@ -13,30 +13,29 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.1.1", path = "../util" } -garage_table = { version = "0.1.1", path = "../table" } -garage_model = { version = "0.1.1", path = "../model" } +garage_model = { version = "0.2.1", path = "../model" } +garage_table = { version = "0.2.1", path = "../table" } +garage_util = { version = "0.2.1", path = "../util" } -err-derive = "0.2.3" -bytes = "0.4" -hex = "0.3" base64 = "0.13" -log = "0.4" +bytes = "1.0" chrono = "0.4" -md-5 = "0.9.1" -sha2 = "0.8" -hmac = "0.7" -crypto-mac = "0.7" -rand = "0.7" +crypto-mac = "0.10" +err-derive = "0.3" +hex = "0.4" +hmac = "0.10" +log = "0.4" +md-5 = "0.9" +sha2 = "0.9" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" -hyper = "^0.13.6" -url = "2.1" httpdate = "0.3" -percent-encoding = "2.1.0" -roxmltree = "0.11" http-range = "0.1" +hyper = "0.14" +percent-encoding = "2.1.0" +roxmltree = "0.14" +url = "2.1" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index c6b1d483..2feb0e3a 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -101,7 +101,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon match req.method() { &Method::HEAD => { // HeadObject query - Ok(handle_head(garage, &bucket, &key).await?) + Ok(handle_head(garage, &req, &bucket, &key).await?) } &Method::GET => { // GetObject query @@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon ))); } let source_key = source_key.ok_or_bad_request("No source key specified")?; - Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?) + Ok( + handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key) + .await?, + ) } else { // PutObject query Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?) diff --git a/src/api/error.rs b/src/api/error.rs index a1681fc3..42a7ab10 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -33,7 +33,7 @@ pub enum Error { InvalidBase64(#[error(source)] base64::DecodeError), #[error(display = "Invalid XML: {}", _0)] - InvalidXML(#[error(source)] roxmltree::Error), + InvalidXML(String), #[error(display = "Invalid header value: {}", _0)] InvalidHeader(#[error(source)] hyper::header::ToStrError), @@ -45,6 +45,12 @@ pub enum Error { BadRequest(String), } +impl From<roxmltree::Error> for Error { + fn from(err: roxmltree::Error) -> Self { + Self::InvalidXML(format!("{}", err)) + } +} + impl Error { pub fn http_status_code(&self) -> StatusCode { match self { diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index b6ec48b0..187fe347 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -1,11 +1,11 @@ use std::fmt::Write; use std::sync::Arc; -use chrono::{SecondsFormat, Utc}; -use hyper::{Body, Response}; +use hyper::{Body, Request, Response}; use garage_table::*; use garage_util::data::*; +use garage_util::time::*; use garage_model::block_ref_table::*; use garage_model::garage::Garage; @@ -13,9 +13,11 @@ use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; +use crate::s3_put::get_headers; pub async fn handle_copy( garage: Arc<Garage>, + req: &Request<Body>, dest_bucket: &str, dest_key: &str, source_bucket: &str, @@ -41,17 +43,37 @@ pub async fn handle_copy( }; let new_uuid = gen_uuid(); - let dest_object_version = ObjectVersion { - uuid: new_uuid, - timestamp: now_msec(), - state: ObjectVersionState::Complete(source_last_state.clone()), - }; + let new_timestamp = now_msec(); - match &source_last_state { + // Implement x-amz-metadata-directive: REPLACE + let old_meta = match source_last_state { ObjectVersionData::DeleteMarker => { return Err(Error::NotFound); } - ObjectVersionData::Inline(_meta, _bytes) => { + ObjectVersionData::Inline(meta, _bytes) => meta, + ObjectVersionData::FirstBlock(meta, _fbh) => meta, + }; + let new_meta = match req.headers().get("x-amz-metadata-directive") { + Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { + headers: get_headers(req)?, + size: old_meta.size, + etag: old_meta.etag.clone(), + }, + _ => old_meta.clone(), + }; + + // Save object copy + match source_last_state { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_meta, bytes) => { + let dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Complete(ObjectVersionData::Inline( + new_meta, + bytes.clone(), + )), + }; let dest_object = Object::new( dest_bucket.to_string(), dest_key.to_string(), @@ -59,44 +81,84 @@ pub async fn handle_copy( ); garage.object_table.insert(&dest_object).await?; } - ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { + ObjectVersionData::FirstBlock(_meta, first_block_hash) => { + // Get block list from source version let source_version = garage .version_table .get(&source_last_v.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NotFound)?; - let dest_version = Version::new( - new_uuid, + // Write an "uploading" marker in Object table + // This holds a reference to the object in the Version table + // so that it won't be deleted, e.g. by repair_versions. + let tmp_dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Uploading(new_meta.headers.clone()), + }; + let tmp_dest_object = Object::new( dest_bucket.to_string(), dest_key.to_string(), - false, - source_version.blocks().to_vec(), + vec![tmp_dest_object_version], ); - let dest_object = Object::new( + garage.object_table.insert(&tmp_dest_object).await?; + + // Write version in the version table. Even with empty block list, + // this means that the BlockRef entries linked to this version cannot be + // marked as deleted (they are marked as deleted only if the Version + // doesn't exist or is marked as deleted). + let mut dest_version = Version::new( + new_uuid, dest_bucket.to_string(), dest_key.to_string(), - vec![dest_object_version], + false, ); + garage.version_table.insert(&dest_version).await?; + + // Fill in block list for version and insert block refs + for (bk, bv) in source_version.blocks.items().iter() { + dest_version.blocks.put(*bk, *bv); + } let dest_block_refs = dest_version - .blocks() + .blocks + .items() .iter() .map(|b| BlockRef { - block: b.hash, + block: b.1.hash, version: new_uuid, - deleted: false, + deleted: false.into(), }) .collect::<Vec<_>>(); futures::try_join!( - garage.object_table.insert(&dest_object), garage.version_table.insert(&dest_version), garage.block_ref_table.insert_many(&dest_block_refs[..]), )?; + + // Insert final object + // We do this last because otherwise there is a race condition in the case where + // the copy call has the same source and destination (this happens, rclone does + // it to update the modification timestamp for instance). If we did this concurrently + // with the stuff before, the block's reference counts could be decremented before + // they are incremented again for the new version, leading to data being deleted. + let dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + new_meta, + *first_block_hash, + )), + }; + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); + garage.object_table.insert(&dest_object).await?; } } - let now = Utc::now(); - let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true); + let last_modified = msec_to_rfc3339(new_timestamp); let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap(); diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 4b6a2b18..6abbfc48 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use hyper::{Body, Request, Response}; use garage_util::data::*; +use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; @@ -29,16 +30,16 @@ async fn handle_delete_internal( _ => true, }); - let mut must_delete = None; + let mut version_to_delete = None; let mut timestamp = now_msec(); for v in interesting_versions { - if v.timestamp + 1 > timestamp || must_delete.is_none() { - must_delete = Some(v.uuid); + if v.timestamp + 1 > timestamp || version_to_delete.is_none() { + version_to_delete = Some(v.uuid); } timestamp = std::cmp::max(timestamp, v.timestamp + 1); } - let deleted_version = must_delete.ok_or(Error::NotFound)?; + let deleted_version = version_to_delete.ok_or(Error::NotFound)?; let version_uuid = gen_uuid(); @@ -47,7 +48,7 @@ async fn handle_delete_internal( key.into(), vec![ObjectVersion { uuid: version_uuid, - timestamp: now_msec(), + timestamp, state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 68e7c66a..2590c9bd 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -40,8 +40,48 @@ fn object_headers( resp } +fn try_answer_cached( + version: &ObjectVersion, + version_meta: &ObjectVersionMeta, + req: &Request<Body>, +) -> Option<Response<Body>> { + // <trinity> It is possible, and is even usually the case, [that both If-None-Match and + // If-Modified-Since] are present in a request. In this situation If-None-Match takes + // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational + // being that etag based matching is more accurate, it has no issue with sub-second precision + // for instance (in case of very fast updates) + let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) { + let none_match = none_match.to_str().ok()?; + let expected = format!("\"{}\"", version_meta.etag); + let found = none_match + .split(',') + .map(str::trim) + .any(|etag| etag == expected || etag == "\"*\""); + found + } else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) { + let modified_since = modified_since.to_str().ok()?; + let client_date = httpdate::parse_http_date(modified_since).ok()?; + let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp); + client_date >= server_date + } else { + false + }; + + if cached { + Some( + Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty()) + .unwrap(), + ) + } else { + None + } +} + pub async fn handle_head( garage: Arc<Garage>, + req: &Request<Body>, bucket: &str, key: &str, ) -> Result<Response<Body>, Error> { @@ -65,7 +105,11 @@ pub async fn handle_head( _ => unreachable!(), }; - let body: Body = Body::from(vec![]); + if let Some(cached) = try_answer_cached(&version, version_meta, req) { + return Ok(cached); + } + + let body: Body = Body::empty(); let response = object_headers(&version, version_meta) .header("Content-Length", format!("{}", version_meta.size)) .status(StatusCode::OK) @@ -104,6 +148,10 @@ pub async fn handle_get( ObjectVersionData::FirstBlock(meta, _) => meta, }; + if let Some(cached) = try_answer_cached(&last_v, last_v_meta, req) { + return Ok(cached); + } + let range = match req.headers().get("range") { Some(range) => { let range_str = range.to_str()?; @@ -146,9 +194,10 @@ pub async fn handle_get( let version = version.ok_or(Error::NotFound)?; let mut blocks = version - .blocks() + .blocks + .items() .iter() - .map(|vb| (vb.hash, None)) + .map(|(_, vb)| (vb.hash, None)) .collect::<Vec<_>>(); blocks[0].1 = Some(first_block); @@ -219,11 +268,12 @@ pub async fn handle_get_range( // file (whereas block.offset designates the offset of the block WITHIN THE PART // block.part_number, which is not the same in the case of a multipart upload) let mut blocks = Vec::with_capacity(std::cmp::min( - version.blocks().len(), - 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize, + version.blocks.len(), + 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024)) + as usize, )); let mut true_offset = 0; - for b in version.blocks().iter() { + for (_, b) in version.blocks.items().iter() { if true_offset >= end { break; } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 98d774db..4d6c32bc 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt::Write; use std::sync::Arc; -use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc}; use hyper::{Body, Response}; use garage_util::error::Error as GarageError; +use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; @@ -42,7 +42,7 @@ pub fn parse_list_objects_query( Ok(ListObjectsQuery { is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false), bucket: bucket.to_string(), - delimiter: params.get("delimiter").cloned(), + delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(), max_keys: params .get("max-keys") .map(|x| { @@ -247,9 +247,7 @@ pub async fn handle_list( } for (key, info) in result_keys.iter() { - let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0); - let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc); - let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true); + let last_modif = msec_to_rfc3339(info.last_modified); writeln!(&mut xml, "\t<Contents>").unwrap(); writeln!( &mut xml, diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index ec599a05..c4e3b818 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -5,11 +5,12 @@ use std::sync::Arc; use futures::stream::*; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; -use sha2::{Digest as Sha256Digest, Sha256}; +use sha2::Sha256; use garage_table::*; use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; @@ -52,14 +53,14 @@ pub async fn handle_put( if first_block.len() < INLINE_THRESHOLD { let mut md5sum = Md5::new(); md5sum.update(&first_block[..]); - let md5sum_arr = md5sum.finalize(); - let md5sum_hex = hex::encode(md5sum_arr); + let data_md5sum = md5sum.finalize(); + let data_md5sum_hex = hex::encode(data_md5sum); - let sha256sum_hash = sha256sum(&first_block[..]); + let data_sha256sum = sha256sum(&first_block[..]); ensure_checksum_matches( - md5sum_arr.as_slice(), - sha256sum_hash, + data_md5sum.as_slice(), + data_sha256sum, content_md5.as_deref(), content_sha256, )?; @@ -71,7 +72,7 @@ pub async fn handle_put( ObjectVersionMeta { headers, size: first_block.len() as u64, - etag: md5sum_hex.clone(), + etag: data_md5sum_hex.clone(), }, first_block, )), @@ -80,41 +81,45 @@ pub async fn handle_put( let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; - return Ok(put_response(version_uuid, md5sum_hex)); + return Ok(put_response(version_uuid, data_md5sum_hex)); } // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { uuid: version_uuid, - timestamp: now_msec(), + timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table - let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); - let first_block_hash = sha256sum(&first_block[..]); + // Write this entry now, even with empty block list, + // to prevent block_ref entries from being deleted (they can be deleted + // if the reference a version that isn't found in the version table) + let version = Version::new(version_uuid, bucket.into(), key.into(), false); + garage.version_table.insert(&version).await?; // Transfer data and verify checksum + let first_block_hash = blake2sum(&first_block[..]); let tx_result = read_and_put_blocks( &garage, - version, + &version, 1, first_block, first_block_hash, &mut chunker, ) .await - .and_then(|(total_size, md5sum_arr, sha256sum)| { + .and_then(|(total_size, data_md5sum, data_sha256sum)| { ensure_checksum_matches( - md5sum_arr.as_slice(), - sha256sum, + data_md5sum.as_slice(), + data_sha256sum, content_md5.as_deref(), content_sha256, ) - .map(|()| (total_size, md5sum_arr)) + .map(|()| (total_size, data_md5sum)) }); // If something went wrong, clean up @@ -148,13 +153,13 @@ pub async fn handle_put( /// Validate MD5 sum against content-md5 header /// and sha256sum against signed content-sha256 fn ensure_checksum_matches( - md5sum: &[u8], - sha256sum: garage_util::data::FixedBytes32, + data_md5sum: &[u8], + data_sha256sum: garage_util::data::FixedBytes32, content_md5: Option<&str>, content_sha256: Option<garage_util::data::FixedBytes32>, ) -> Result<(), Error> { if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != sha256sum { + if expected_sha256 != data_sha256sum { return Err(Error::BadRequest(format!( "Unable to validate x-amz-content-sha256" ))); @@ -163,7 +168,7 @@ fn ensure_checksum_matches( } } if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(md5sum) { + if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { return Err(Error::BadRequest(format!("Unable to validate content-md5"))); } else { trace!("Successfully validated content-md5"); @@ -173,8 +178,8 @@ fn ensure_checksum_matches( } async fn read_and_put_blocks( - garage: &Arc<Garage>, - version: Version, + garage: &Garage, + version: &Version, part_number: u64, first_block: Vec<u8>, first_block_hash: Hash, @@ -183,11 +188,11 @@ async fn read_and_put_blocks( let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); md5hasher.update(&first_block[..]); - sha256hasher.input(&first_block[..]); + sha256hasher.update(&first_block[..]); let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, 0, @@ -203,11 +208,11 @@ async fn read_and_put_blocks( futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; if let Some(block) = next_block { md5hasher.update(&block[..]); - sha256hasher.input(&block[..]); - let block_hash = sha256sum(&block[..]); + sha256hasher.update(&block[..]); + let block_hash = blake2sum(&block[..]); let block_len = block.len(); put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, next_offset as u64, @@ -222,39 +227,35 @@ async fn read_and_put_blocks( } let total_size = next_offset as u64; - let md5sum_arr = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize(); - let sha256sum_arr = sha256hasher.result(); - let mut hash = [0u8; 32]; - hash.copy_from_slice(&sha256sum_arr[..]); - let sha256sum_arr = Hash::from(hash); + let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - Ok((total_size, md5sum_arr, sha256sum_arr)) + Ok((total_size, data_md5sum, data_sha256sum)) } async fn put_block_meta( - garage: Arc<Garage>, + garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, size: u64, ) -> Result<(), GarageError> { - // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); - version - .add_block(VersionBlock { + version.blocks.put( + VersionBlockKey { part_number, offset, - hash, - size, - }) - .unwrap(); + }, + VersionBlock { hash, size }, + ); let block_ref = BlockRef { block: hash, version: version.uuid, - deleted: false, + deleted: false.into(), }; futures::try_join!( @@ -319,6 +320,7 @@ pub async fn handle_create_multipart_upload( let version_uuid = gen_uuid(); let headers = get_headers(req)?; + // Create object in object table let object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), @@ -327,6 +329,14 @@ pub async fn handle_create_multipart_upload( let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; + // Insert empty version so that block_ref entries refer to something + // (they are inserted concurrently with blocks in the version table, so + // there is the possibility that they are inserted before the version table + // is created, in which case it is allowed to delete them, e.g. in repair_*) + let version = Version::new(version_uuid, bucket.into(), key.into(), false); + garage.version_table.insert(&version).await?; + + // Send success response let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!( @@ -389,11 +399,11 @@ pub async fn handle_put_part( } // Copy block to store - let version = Version::new(version_uuid, bucket, key, false, vec![]); - let first_block_hash = sha256sum(&first_block[..]); - let (_, md5sum_arr, sha256sum) = read_and_put_blocks( + let version = Version::new(version_uuid, bucket, key, false); + let first_block_hash = blake2sum(&first_block[..]); + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, - version, + &version, part_number, first_block, first_block_hash, @@ -401,15 +411,24 @@ pub async fn handle_put_part( ) .await?; + // Verify that checksums map ensure_checksum_matches( - md5sum_arr.as_slice(), - sha256sum, + data_md5sum.as_slice(), + data_sha256sum, content_md5.as_deref(), content_sha256, )?; + // Store part etag in version + let data_md5sum_hex = hex::encode(data_md5sum); + let mut version = version; + version + .parts_etags + .put(part_number, data_md5sum_hex.clone()); + garage.version_table.insert(&version).await?; + let response = Response::builder() - .header("ETag", format!("\"{}\"", hex::encode(md5sum_arr))) + .header("ETag", format!("\"{}\"", data_md5sum_hex)) .body(Body::from(vec![])) .unwrap(); Ok(response) @@ -444,17 +463,15 @@ pub async fn handle_complete_multipart_upload( )?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; - let object_version = object + let mut object_version = object .versions() .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()); - let mut object_version = match object_version { - None => return Err(Error::NotFound), - Some(x) => x.clone(), - }; + .find(|v| v.uuid == version_uuid && v.is_uploading()) + .cloned() + .ok_or(Error::BadRequest(format!("Version not found")))?; let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; - if version.blocks().len() == 0 { + if version.blocks.len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } @@ -464,53 +481,50 @@ pub async fn handle_complete_multipart_upload( }; // Check that the list of parts they gave us corresponds to the parts we have here - // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... - let mut parts = version - .blocks() + debug!("Expected parts from request: {:?}", body_list_of_parts); + debug!("Parts stored in version: {:?}", version.parts_etags.items()); + let parts = version + .parts_etags + .items() .iter() - .map(|x| x.part_number) - .collect::<Vec<_>>(); - parts.dedup(); + .map(|pair| (&pair.0, &pair.1)); let same_parts = body_list_of_parts .iter() - .map(|x| &x.part_number) - .eq(parts.iter()); + .map(|x| (&x.part_number, &x.etag)) + .eq(parts); if !same_parts { return Err(Error::BadRequest(format!("We don't have the same parts"))); } - // ETag calculation: we produce ETags that have the same form as - // those of S3 multipart uploads, but we don't use their actual - // calculation for the first part (we use random bytes). This - // shouldn't impact compatibility as the S3 docs specify that - // the ETag is an opaque value in case of a multipart upload. - // See also: https://teppen.io/2018/06/23/aws_s3_etags/ - let num_parts = version.blocks().last().unwrap().part_number - - version.blocks().first().unwrap().part_number + // Calculate etag of final object + // To understand how etags are calculated, read more here: + // https://teppen.io/2018/06/23/aws_s3_etags/ + let num_parts = version.blocks.items().last().unwrap().0.part_number + - version.blocks.items().first().unwrap().0.part_number + 1; - let etag = format!( - "{}-{}", - hex::encode(&rand::random::<[u8; 16]>()[..]), - num_parts - ); + let mut etag_md5_hasher = Md5::new(); + for (_, etag) in version.parts_etags.items().iter() { + etag_md5_hasher.update(etag.as_bytes()); + } + let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts); - let total_size = version - .blocks() - .iter() - .map(|x| x.size) - .fold(0, |x, y| x + y); + // Calculate total size of final object + let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); + + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, size: total_size, - etag: etag, + etag, }, - version.blocks()[0].hash, + version.blocks.items()[0].1.hash, )); let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; + // Send response saying ok we're done let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!( @@ -570,17 +584,19 @@ fn get_mime_type(req: &Request<Body>) -> Result<String, Error> { .to_string()) } -fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { +pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { let content_type = get_mime_type(req)?; - let other_headers = vec![ + let mut other = BTreeMap::new(); + + // Preserve standard headers + let standard_header = vec![ hyper::header::CACHE_CONTROL, hyper::header::CONTENT_DISPOSITION, hyper::header::CONTENT_ENCODING, hyper::header::CONTENT_LANGUAGE, hyper::header::EXPIRES, ]; - let mut other = BTreeMap::new(); - for h in other_headers.iter() { + for h in standard_header.iter() { if let Some(v) = req.headers().get(h) { match v.to_str() { Ok(v_str) => { @@ -592,6 +608,21 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { } } } + + // Preserve x-amz-meta- headers + for (k, v) in req.headers().iter() { + if k.as_str().starts_with("x-amz-meta-") { + match v.to_str() { + Ok(v_str) => { + other.insert(k.to_string(), v_str.to_string()); + } + Err(e) => { + warn!("Discarding header {}, error in .to_str(): {}", k, e); + } + } + } + } + Ok(ObjectVersionHeaders { content_type, other, diff --git a/src/api/signature.rs b/src/api/signature.rs index d7fbd3f7..6dc69afa 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use hmac::{Hmac, Mac}; +use hmac::{Hmac, Mac, NewMac}; use hyper::{Body, Method, Request}; use sha2::{Digest, Sha256}; @@ -91,8 +91,8 @@ pub async fn check_signature( "s3", ) .ok_or_internal_error("Unable to build signing HMAC")?; - hmac.input(string_to_sign.as_bytes()); - let signature = hex::encode(hmac.result().code()); + hmac.update(string_to_sign.as_bytes()); + let signature = hex::encode(hmac.finalize().into_bytes()); if authorization.signature != signature { trace!("Canonical request: ``{}``", canonical_request); @@ -106,12 +106,10 @@ pub async fn check_signature( } else { let bytes = hex::decode(authorization.content_sha256) .ok_or_bad_request("Invalid content sha256 hash")?; - let mut hash = [0u8; 32]; - if bytes.len() != 32 { - return Err(Error::BadRequest(format!("Invalid content sha256 hash"))); - } - hash.copy_from_slice(&bytes[..]); - Some(Hash::from(hash)) + Some( + Hash::try_from(&bytes[..]) + .ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?, + ) }; Ok((key, content_sha256)) @@ -220,12 +218,12 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> { fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String { let mut hasher = Sha256::default(); - hasher.input(canonical_req.as_bytes()); + hasher.update(canonical_req.as_bytes()); [ "AWS4-HMAC-SHA256", &datetime.format(LONG_DATETIME).to_string(), scope_string, - &hex::encode(hasher.result().as_slice()), + &hex::encode(hasher.finalize().as_slice()), ] .join("\n") } @@ -238,14 +236,14 @@ fn signing_hmac( ) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { let secret = String::from("AWS4") + secret_key; let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; - date_hmac.input(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.result().code())?; - region_hmac.input(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.result().code())?; - service_hmac.input(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.result().code())?; - signing_hmac.input(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.result().code())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } |