From b45dcc1925d76f3f7dcae7deea0391953ba548e5 Mon Sep 17 00:00:00 2001 From: Jill Date: Mon, 17 Jan 2022 10:55:31 +0100 Subject: Support STREAMING-AWS4-HMAC-SHA256-PAYLOAD (#64) (#156) Closes #64. Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/156 Co-authored-by: Jill Co-committed-by: Jill --- src/api/Cargo.toml | 2 + src/api/api_server.rs | 6 +- src/api/s3_bucket.rs | 13 +- src/api/s3_delete.rs | 5 +- src/api/s3_put.rs | 110 +++++++++++--- src/api/s3_website.rs | 5 +- src/api/signature.rs | 301 -------------------------------------- src/api/signature/mod.rs | 47 ++++++ src/api/signature/payload.rs | 269 ++++++++++++++++++++++++++++++++++ src/api/signature/streaming.rs | 319 +++++++++++++++++++++++++++++++++++++++++ 10 files changed, 743 insertions(+), 334 deletions(-) delete mode 100644 src/api/signature.rs create mode 100644 src/api/signature/mod.rs create mode 100644 src/api/signature/payload.rs create mode 100644 src/api/signature/streaming.rs (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index ca4950a1..e93e5ec5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,10 +28,12 @@ hmac = "0.10" idna = "0.2" log = "0.4" md-5 = "0.9" +nom = "7.1" sha2 = "0.9" futures = "0.3" futures-util = "0.3" +pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index ea1990d9..c4606226 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -15,7 +15,7 @@ use garage_model::garage::Garage; use garage_model::key_table::Key; use crate::error::*; -use crate::signature::check_signature; +use crate::signature::payload::check_payload_signature; use crate::helpers::*; use crate::s3_bucket::*; @@ -90,7 +90,7 @@ async fn handler( } async fn handler_inner(garage: Arc, req: Request) -> Result, Error> { - let (api_key, content_sha256) = check_signature(&garage, &req).await?; + let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?; let authority = req .headers() @@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { - handle_put(garage, req, bucket_id, &key, content_sha256).await + handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await } Endpoint::AbortMultipartUpload { key, upload_id, .. } => { handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 494224c8..8a5407d3 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -120,7 +120,10 @@ pub async fn handle_create_bucket( bucket_name: String, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; @@ -320,7 +323,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), @@ -329,8 +332,8 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - - Europe + + Europe "# ), @@ -339,7 +342,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 9e267490..b243d982 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -81,7 +81,10 @@ pub async fn handle_delete_objects( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..4e85664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,10 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use futures::{prelude::*, TryFutureExt}; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -14,19 +16,23 @@ use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; -use crate::signature::verify_signed_content; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::LONG_DATETIME; +use crate::signature::{compute_scope, verify_signed_content}; pub async fn handle_put( garage: Arc, req: Request, bucket_id: Uuid, key: &str, - content_sha256: Option, + api_key: &Key, + mut content_sha256: Option, ) -> Result, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -40,11 +46,53 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file - let body = req.into_body(); + let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); + + let body = if let Some(signature) = payload_seed_signature { + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = head + .headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, &garage.config.s3_api.s3_region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; - let mut chunker = BodyChunker::new(body, garage.config.block_size); + SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)? + .map_err(Error::from) + .boxed() + } else { + body.boxed() + }; + + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +226,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +253,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,32 +317,34 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker>> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl> + Unpin> StreamChunker { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result>, GarageError> { + + async fn next(&mut self) -> Result>, Error> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +421,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted @@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml) diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index fcf8cba3..c4a43e2c 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -80,7 +80,10 @@ pub async fn handle_put_website( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let mut bucket = garage .bucket_table diff --git a/src/api/signature.rs b/src/api/signature.rs deleted file mode 100644 index 311e6a9a..00000000 --- a/src/api/signature.rs +++ /dev/null @@ -1,301 +0,0 @@ -use std::collections::HashMap; - -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; -use hyper::{Body, Method, Request}; -use sha2::{Digest, Sha256}; - -use garage_table::*; -use garage_util::data::{sha256sum, Hash}; - -use garage_model::garage::Garage; -use garage_model::key_table::*; - -use crate::encoding::uri_encode; -use crate::error::*; - -const SHORT_DATE: &str = "%Y%m%d"; -const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; - -type HmacSha256 = Hmac; - -pub async fn check_signature( - garage: &Garage, - request: &Request, -) -> Result<(Key, Option), Error> { - let mut headers = HashMap::new(); - for (key, val) in request.headers() { - headers.insert(key.to_string(), val.to_str()?.to_string()); - } - if let Some(query) = request.uri().query() { - let query_pairs = url::form_urlencoded::parse(query.as_bytes()); - for (key, val) in query_pairs { - headers.insert(key.to_lowercase(), val.to_string()); - } - } - - let authorization = if let Some(authorization) = headers.get("authorization") { - parse_authorization(authorization, &headers)? - } else { - parse_query_authorization(&headers)? - }; - - let date = headers - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field")?; - let date: NaiveDateTime = - NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; - let date: DateTime = DateTime::from_utc(date, Utc); - - if Utc::now() - date > Duration::hours(24) { - return Err(Error::BadRequest("Date is too old".to_string())); - } - - let scope = format!( - "{}/{}/s3/aws4_request", - date.format(SHORT_DATE), - garage.config.s3_api.s3_region - ); - if authorization.scope != scope { - return Err(Error::AuthorizationHeaderMalformed(scope.to_string())); - } - - let key = garage - .key_table - .get(&EmptyKey, &authorization.key_id) - .await? - .filter(|k| !k.state.is_deleted()) - .ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?; - let key_p = key.params().unwrap(); - - let canonical_request = canonical_request( - request.method(), - &request.uri().path().to_string(), - &canonical_query_string(request.uri()), - &headers, - &authorization.signed_headers, - &authorization.content_sha256, - ); - let string_to_sign = string_to_sign(&date, &scope, &canonical_request); - - let mut hmac = signing_hmac( - &date, - &key_p.secret_key, - &garage.config.s3_api.s3_region, - "s3", - ) - .ok_or_internal_error("Unable to build signing HMAC")?; - hmac.update(string_to_sign.as_bytes()); - let signature = hex::encode(hmac.finalize().into_bytes()); - - if authorization.signature != signature { - trace!("Canonical request: ``{}``", canonical_request); - trace!("String to sign: ``{}``", string_to_sign); - trace!("Expected: {}, got: {}", signature, authorization.signature); - return Err(Error::Forbidden("Invalid signature".to_string())); - } - - let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { - None - } else { - let bytes = hex::decode(authorization.content_sha256) - .ok_or_bad_request("Invalid content sha256 hash")?; - Some( - Hash::try_from(&bytes[..]) - .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, - ) - }; - - Ok((key, content_sha256)) -} - -struct Authorization { - key_id: String, - scope: String, - signed_headers: String, - signature: String, - content_sha256: String, -} - -fn parse_authorization( - authorization: &str, - headers: &HashMap, -) -> Result { - let first_space = authorization - .find(' ') - .ok_or_bad_request("Authorization field to short")?; - let (auth_kind, rest) = authorization.split_at(first_space); - - if auth_kind != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest("Unsupported authorization method".into())); - } - - let mut auth_params = HashMap::new(); - for auth_part in rest.split(',') { - let auth_part = auth_part.trim(); - let eq = auth_part - .find('=') - .ok_or_bad_request("Field without value in authorization header")?; - let (key, value) = auth_part.split_at(eq); - auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); - } - - let cred = auth_params - .get("Credential") - .ok_or_bad_request("Could not find Credential in Authorization field")?; - let (key_id, scope) = parse_credential(cred)?; - - let content_sha256 = headers - .get("x-amz-content-sha256") - .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; - - let auth = Authorization { - key_id, - scope, - signed_headers: auth_params - .get("SignedHeaders") - .ok_or_bad_request("Could not find SignedHeaders in Authorization field")? - .to_string(), - signature: auth_params - .get("Signature") - .ok_or_bad_request("Could not find Signature in Authorization field")? - .to_string(), - content_sha256: content_sha256.to_string(), - }; - Ok(auth) -} - -fn parse_query_authorization(headers: &HashMap) -> Result { - let algo = headers - .get("x-amz-algorithm") - .ok_or_bad_request("X-Amz-Algorithm not found in query parameters")?; - if algo != "AWS4-HMAC-SHA256" { - return Err(Error::BadRequest( - "Unsupported authorization method".to_string(), - )); - } - - let cred = headers - .get("x-amz-credential") - .ok_or_bad_request("X-Amz-Credential not found in query parameters")?; - let (key_id, scope) = parse_credential(cred)?; - let signed_headers = headers - .get("x-amz-signedheaders") - .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?; - let signature = headers - .get("x-amz-signature") - .ok_or_bad_request("X-Amz-Signature not found in query parameters")?; - let content_sha256 = headers - .get("x-amz-content-sha256") - .map(|x| x.as_str()) - .unwrap_or("UNSIGNED-PAYLOAD"); - - Ok(Authorization { - key_id, - scope, - signed_headers: signed_headers.to_string(), - signature: signature.to_string(), - content_sha256: content_sha256.to_string(), - }) -} - -fn parse_credential(cred: &str) -> Result<(String, String), Error> { - let first_slash = cred - .find('/') - .ok_or_bad_request("Credentials does not contain / in authorization field")?; - let (key_id, scope) = cred.split_at(first_slash); - Ok(( - key_id.to_string(), - scope.trim_start_matches('/').to_string(), - )) -} - -fn string_to_sign(datetime: &DateTime, scope_string: &str, canonical_req: &str) -> String { - let mut hasher = Sha256::default(); - hasher.update(canonical_req.as_bytes()); - [ - "AWS4-HMAC-SHA256", - &datetime.format(LONG_DATETIME).to_string(), - scope_string, - &hex::encode(hasher.finalize().as_slice()), - ] - .join("\n") -} - -fn signing_hmac( - datetime: &DateTime, - secret_key: &str, - region: &str, - service: &str, -) -> Result { - let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; - date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; - region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; - service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; - signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; - Ok(hmac) -} - -fn canonical_request( - method: &Method, - url_path: &str, - canonical_query_string: &str, - headers: &HashMap, - signed_headers: &str, - content_sha256: &str, -) -> String { - [ - method.as_str(), - url_path, - canonical_query_string, - &canonical_header_string(headers, signed_headers), - "", - signed_headers, - content_sha256, - ] - .join("\n") -} - -fn canonical_header_string(headers: &HashMap, signed_headers: &str) -> String { - let signed_headers_vec = signed_headers.split(';').collect::>(); - let mut items = headers - .iter() - .filter(|(key, _)| signed_headers_vec.contains(&key.as_str())) - .collect::>(); - items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - items - .iter() - .map(|(key, value)| key.to_lowercase() + ":" + value.trim()) - .collect::>() - .join("\n") -} - -fn canonical_query_string(uri: &hyper::Uri) -> String { - if let Some(query) = uri.query() { - let query_pairs = url::form_urlencoded::parse(query.as_bytes()); - let mut items = query_pairs - .filter(|(key, _)| key != "X-Amz-Signature") - .map(|(key, value)| uri_encode(&key, true) + "=" + &uri_encode(&value, true)) - .collect::>(); - items.sort(); - items.join("&") - } else { - "".to_string() - } -} - -pub fn verify_signed_content(content_sha256: Option, body: &[u8]) -> Result<(), Error> { - let expected_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs new file mode 100644 index 00000000..ebdee6da --- /dev/null +++ b/src/api/signature/mod.rs @@ -0,0 +1,47 @@ +use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac, NewMac}; +use sha2::Sha256; + +use garage_util::data::{sha256sum, Hash}; + +use crate::error::*; + +pub mod payload; +pub mod streaming; + +pub const SHORT_DATE: &str = "%Y%m%d"; +pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; + +type HmacSha256 = Hmac; + +pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { + if expected_sha256 != sha256sum(body) { + return Err(Error::BadRequest( + "Request content hash does not match signed hash".to_string(), + )); + } + Ok(()) +} + +pub fn signing_hmac( + datetime: &DateTime, + secret_key: &str, + region: &str, + service: &str, +) -> Result { + let secret = String::from("AWS4") + secret_key; + let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + Ok(hmac) +} + +pub fn compute_scope(datetime: &DateTime, region: &str) -> String { + format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +} diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs new file mode 100644 index 00000000..b13819a8 --- /dev/null +++ b/src/api/signature/payload.rs @@ -0,0 +1,269 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use hmac::Mac; +use hyper::{Body, Method, Request}; +use sha2::{Digest, Sha256}; + +use garage_table::*; +use garage_util::data::Hash; + +use garage_model::garage::Garage; +use garage_model::key_table::*; + +use super::signing_hmac; +use super::{LONG_DATETIME, SHORT_DATE}; + +use crate::encoding::uri_encode; +use crate::error::*; + +pub async fn check_payload_signature( + garage: &Garage, + request: &Request, +) -> Result<(Key, Option), Error> { + let mut headers = HashMap::new(); + for (key, val) in request.headers() { + headers.insert(key.to_string(), val.to_str()?.to_string()); + } + if let Some(query) = request.uri().query() { + let query_pairs = url::form_urlencoded::parse(query.as_bytes()); + for (key, val) in query_pairs { + headers.insert(key.to_lowercase(), val.to_string()); + } + } + + let authorization = if let Some(authorization) = headers.get("authorization") { + parse_authorization(authorization, &headers)? + } else { + parse_query_authorization(&headers)? + }; + + let date = headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime = DateTime::from_utc(date, Utc); + + if Utc::now() - date > Duration::hours(24) { + return Err(Error::BadRequest("Date is too old".to_string())); + } + + let scope = format!( + "{}/{}/s3/aws4_request", + date.format(SHORT_DATE), + garage.config.s3_api.s3_region + ); + if authorization.scope != scope { + return Err(Error::AuthorizationHeaderMalformed(scope.to_string())); + } + + let key = garage + .key_table + .get(&EmptyKey, &authorization.key_id) + .await? + .filter(|k| !k.state.is_deleted()) + .ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?; + let key_p = key.params().unwrap(); + + let canonical_request = canonical_request( + request.method(), + &request.uri().path().to_string(), + &canonical_query_string(request.uri()), + &headers, + &authorization.signed_headers, + &authorization.content_sha256, + ); + let string_to_sign = string_to_sign(&date, &scope, &canonical_request); + + let mut hmac = signing_hmac( + &date, + &key_p.secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; + hmac.update(string_to_sign.as_bytes()); + let signature = hex::encode(hmac.finalize().into_bytes()); + + if authorization.signature != signature { + trace!("Canonical request: ``{}``", canonical_request); + trace!("String to sign: ``{}``", string_to_sign); + trace!("Expected: {}, got: {}", signature, authorization.signature); + return Err(Error::Forbidden("Invalid signature".to_string())); + } + + let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { + None + } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) + } else { + let bytes = hex::decode(authorization.content_sha256) + .ok_or_bad_request("Invalid content sha256 hash")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) + }; + + Ok((key, content_sha256)) +} + +struct Authorization { + key_id: String, + scope: String, + signed_headers: String, + signature: String, + content_sha256: String, +} + +fn parse_authorization( + authorization: &str, + headers: &HashMap, +) -> Result { + let first_space = authorization + .find(' ') + .ok_or_bad_request("Authorization field to short")?; + let (auth_kind, rest) = authorization.split_at(first_space); + + if auth_kind != "AWS4-HMAC-SHA256" { + return Err(Error::BadRequest("Unsupported authorization method".into())); + } + + let mut auth_params = HashMap::new(); + for auth_part in rest.split(',') { + let auth_part = auth_part.trim(); + let eq = auth_part + .find('=') + .ok_or_bad_request("Field without value in authorization header")?; + let (key, value) = auth_part.split_at(eq); + auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); + } + + let cred = auth_params + .get("Credential") + .ok_or_bad_request("Could not find Credential in Authorization field")?; + let (key_id, scope) = parse_credential(cred)?; + + let content_sha256 = headers + .get("x-amz-content-sha256") + .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; + + let auth = Authorization { + key_id, + scope, + signed_headers: auth_params + .get("SignedHeaders") + .ok_or_bad_request("Could not find SignedHeaders in Authorization field")? + .to_string(), + signature: auth_params + .get("Signature") + .ok_or_bad_request("Could not find Signature in Authorization field")? + .to_string(), + content_sha256: content_sha256.to_string(), + }; + Ok(auth) +} + +fn parse_query_authorization(headers: &HashMap) -> Result { + let algo = headers + .get("x-amz-algorithm") + .ok_or_bad_request("X-Amz-Algorithm not found in query parameters")?; + if algo != "AWS4-HMAC-SHA256" { + return Err(Error::BadRequest( + "Unsupported authorization method".to_string(), + )); + } + + let cred = headers + .get("x-amz-credential") + .ok_or_bad_request("X-Amz-Credential not found in query parameters")?; + let (key_id, scope) = parse_credential(cred)?; + let signed_headers = headers + .get("x-amz-signedheaders") + .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?; + let signature = headers + .get("x-amz-signature") + .ok_or_bad_request("X-Amz-Signature not found in query parameters")?; + let content_sha256 = headers + .get("x-amz-content-sha256") + .map(|x| x.as_str()) + .unwrap_or("UNSIGNED-PAYLOAD"); + + Ok(Authorization { + key_id, + scope, + signed_headers: signed_headers.to_string(), + signature: signature.to_string(), + content_sha256: content_sha256.to_string(), + }) +} + +fn parse_credential(cred: &str) -> Result<(String, String), Error> { + let first_slash = cred + .find('/') + .ok_or_bad_request("Credentials does not contain / in authorization field")?; + let (key_id, scope) = cred.split_at(first_slash); + Ok(( + key_id.to_string(), + scope.trim_start_matches('/').to_string(), + )) +} + +fn string_to_sign(datetime: &DateTime, scope_string: &str, canonical_req: &str) -> String { + let mut hasher = Sha256::default(); + hasher.update(canonical_req.as_bytes()); + [ + "AWS4-HMAC-SHA256", + &datetime.format(LONG_DATETIME).to_string(), + scope_string, + &hex::encode(hasher.finalize().as_slice()), + ] + .join("\n") +} + +fn canonical_request( + method: &Method, + url_path: &str, + canonical_query_string: &str, + headers: &HashMap, + signed_headers: &str, + content_sha256: &str, +) -> String { + [ + method.as_str(), + url_path, + canonical_query_string, + &canonical_header_string(headers, signed_headers), + "", + signed_headers, + content_sha256, + ] + .join("\n") +} + +fn canonical_header_string(headers: &HashMap, signed_headers: &str) -> String { + let signed_headers_vec = signed_headers.split(';').collect::>(); + let mut items = headers + .iter() + .filter(|(key, _)| signed_headers_vec.contains(&key.as_str())) + .collect::>(); + items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + items + .iter() + .map(|(key, value)| key.to_lowercase() + ":" + value.trim()) + .collect::>() + .join("\n") +} + +fn canonical_query_string(uri: &hyper::Uri) -> String { + if let Some(query) = uri.query() { + let query_pairs = url::form_urlencoded::parse(query.as_bytes()); + let mut items = query_pairs + .filter(|(key, _)| key != "X-Amz-Signature") + .map(|(key, value)| uri_encode(&key, true) + "=" + &uri_encode(&value, true)) + .collect::>(); + items.sort(); + items.join("&") + } else { + "".to_string() + } +} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs new file mode 100644 index 00000000..b2dc1591 --- /dev/null +++ b/src/api/signature/streaming.rs @@ -0,0 +1,319 @@ +use std::pin::Pin; + +use chrono::{DateTime, Utc}; +use futures::prelude::*; +use futures::task; +use hyper::body::Bytes; + +use garage_util::data::Hash; +use hmac::Mac; + +use super::sha256sum; +use super::HmacSha256; +use super::LONG_DATETIME; + +use crate::error::*; + +/// Result of `sha256("")` +const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +fn compute_streaming_payload_signature( + signing_hmac: &HmacSha256, + date: DateTime, + scope: &str, + previous_signature: Hash, + content_sha256: Hash, +) -> Result { + let string_to_sign = [ + "AWS4-HMAC-SHA256-PAYLOAD", + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + EMPTY_STRING_HEX_DIGEST, + &hex::encode(content_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") +} + +mod payload { + use garage_util::data::Hash; + + pub enum Error { + Parser(nom::error::Error), + BadSignature, + } + + impl Error { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct Header { + pub size: usize, + pub signature: Hash, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = Header { + size: size as usize, + signature, + }; + + Ok((input, header)) + } + } +} + +#[derive(Debug)] +pub enum SignedPayloadStreamError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl SignedPayloadStreamError { + fn message(msg: &str) -> Self { + SignedPayloadStreamError::Message(msg.into()) + } +} + +impl From for Error { + fn from(err: SignedPayloadStreamError) -> Self { + match err { + SignedPayloadStreamError::Stream(e) => e, + SignedPayloadStreamError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } + SignedPayloadStreamError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: payload::Error) -> Self { + Self::message(err.description()) + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: nom::error::Error) -> Self { + Self::message(err.code.description()) + } +} + +struct SignedPayload { + header: payload::Header, + data: Bytes, +} + +#[pin_project::pin_project] +pub struct SignedPayloadStream +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + datetime: DateTime, + scope: String, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +impl SignedPayloadStream +where + S: Stream>, +{ + pub fn new( + stream: S, + signing_hmac: HmacSha256, + datetime: DateTime, + scope: &str, + seed_signature: Hash, + ) -> Result { + Ok(Self { + stream, + buf: bytes::BytesMut::new(), + datetime, + scope: scope.into(), + signing_hmac, + previous_signature: seed_signature, + }) + } + + fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + return Ok(( + input, + SignedPayload { + header, + data: Bytes::new(), + }, + )); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, SignedPayload { header, data })) + } +} + +impl Stream for SignedPayloadStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + let mut this = self.project(); + + loop { + let (input, payload) = match Self::parse_next(this.buf) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); + } + } + } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + // 0-sized chunk is the last + if payload.data.is_empty() { + return Poll::Ready(None); + } + + let data_sha256sum = sha256sum(&payload.data); + + let expected_signature = compute_streaming_payload_signature( + this.signing_hmac, + *this.datetime, + this.scope, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) + })?; + + if payload.header.signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); + } + + *this.buf = input.into(); + *this.previous_signature = payload.header.signature; + + return Poll::Ready(Some(Ok(payload.data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +#[cfg(test)] +mod tests { + use futures::prelude::*; + + use super::{SignedPayloadStream, SignedPayloadStreamError}; + + #[tokio::test] + async fn test_interrupted_signed_payload_stream() { + use chrono::{DateTime, Utc}; + + use garage_util::data::Hash; + + let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 + .unwrap() + .with_timezone(&Utc); + let secret_key = "test"; + let region = "test"; + let scope = crate::signature::compute_scope(&datetime, region); + let signing_hmac = + crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); + + let data: &[&[u8]] = &[b"1"]; + let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into()))); + + let seed_signature = Hash::default(); + + let mut stream = + SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap(); + + assert!(stream.try_next().await.is_err()); + match stream.try_next().await { + Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + item => panic!( + "Unexpected result, expected early EOF error, got {:?}", + item + ), + } + } +} -- cgit v1.2.3