diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/api_server.rs | 6 | ||||
-rw-r--r-- | src/api/s3_bucket.rs | 13 | ||||
-rw-r--r-- | src/api/s3_delete.rs | 5 | ||||
-rw-r--r-- | src/api/s3_put.rs | 110 | ||||
-rw-r--r-- | src/api/s3_website.rs | 5 | ||||
-rw-r--r-- | src/api/signature/mod.rs | 47 | ||||
-rw-r--r-- | src/api/signature/payload.rs (renamed from src/api/signature.rs) | 52 | ||||
-rw-r--r-- | src/api/signature/streaming.rs | 319 |
9 files changed, 484 insertions, 75 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index ca4950a1..e93e5ec5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,10 +28,12 @@ hmac = "0.10" idna = "0.2" log = "0.4" md-5 = "0.9" +nom = "7.1" sha2 = "0.9" futures = "0.3" futures-util = "0.3" +pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index ea1990d9..c4606226 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -15,7 +15,7 @@ use garage_model::garage::Garage; use garage_model::key_table::Key; use crate::error::*; -use crate::signature::check_signature; +use crate::signature::payload::check_payload_signature; use crate::helpers::*; use crate::s3_bucket::*; @@ -90,7 +90,7 @@ async fn handler( } async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> { - let (api_key, content_sha256) = check_signature(&garage, &req).await?; + let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?; let authority = req .headers() @@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon .await } Endpoint::PutObject { key, .. } => { - handle_put(garage, req, bucket_id, &key, content_sha256).await + handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await } Endpoint::AbortMultipartUpload { key, upload_id, .. } => { handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 494224c8..8a5407d3 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -120,7 +120,10 @@ pub async fn handle_create_bucket( bucket_name: String, ) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; @@ -320,7 +323,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> </CreateBucketConfiguration > "# ), @@ -329,8 +332,8 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> - <LocationConstraint>Europe</LocationConstraint> + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <LocationConstraint>Europe</LocationConstraint> </CreateBucketConfiguration > "# ), @@ -339,7 +342,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> </Crea > "# ), diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 9e267490..b243d982 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -81,7 +81,10 @@ pub async fn handle_delete_objects( content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..4e85664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,10 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use futures::{prelude::*, TryFutureExt}; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -14,19 +16,23 @@ use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; -use crate::signature::verify_signed_content; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::LONG_DATETIME; +use crate::signature::{compute_scope, verify_signed_content}; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, bucket_id: Uuid, key: &str, - content_sha256: Option<Hash>, + api_key: &Key, + mut content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -40,11 +46,53 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file - let body = req.into_body(); + let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); + + let body = if let Some(signature) = payload_seed_signature { + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = head + .headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, &garage.config.s3_api.s3_region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; - let mut chunker = BodyChunker::new(body, garage.config.block_size); + SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)? + .map_err(Error::from) + .boxed() + } else { + body.boxed() + }; + + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +226,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec<u8>, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +253,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,32 +317,34 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque<u8>, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { + + async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +421,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted @@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload( content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml) diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index fcf8cba3..c4a43e2c 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -80,7 +80,10 @@ pub async fn handle_put_website( content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let mut bucket = garage .bucket_table diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs new file mode 100644 index 00000000..ebdee6da --- /dev/null +++ b/src/api/signature/mod.rs @@ -0,0 +1,47 @@ +use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac, NewMac}; +use sha2::Sha256; + +use garage_util::data::{sha256sum, Hash}; + +use crate::error::*; + +pub mod payload; +pub mod streaming; + +pub const SHORT_DATE: &str = "%Y%m%d"; +pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; + +type HmacSha256 = Hmac<Sha256>; + +pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { + if expected_sha256 != sha256sum(body) { + return Err(Error::BadRequest( + "Request content hash does not match signed hash".to_string(), + )); + } + Ok(()) +} + +pub fn signing_hmac( + datetime: &DateTime<Utc>, + secret_key: &str, + region: &str, + service: &str, +) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { + let secret = String::from("AWS4") + secret_key; + let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + Ok(hmac) +} + +pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String { + format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +} diff --git a/src/api/signature.rs b/src/api/signature/payload.rs index 311e6a9a..b13819a8 100644 --- a/src/api/signature.rs +++ b/src/api/signature/payload.rs @@ -1,25 +1,23 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::Mac; use hyper::{Body, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; -use garage_util::data::{sha256sum, Hash}; +use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; +use super::signing_hmac; +use super::{LONG_DATETIME, SHORT_DATE}; + use crate::encoding::uri_encode; use crate::error::*; -const SHORT_DATE: &str = "%Y%m%d"; -const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; - -type HmacSha256 = Hmac<Sha256>; - -pub async fn check_signature( +pub async fn check_payload_signature( garage: &Garage, request: &Request<Body>, ) -> Result<(Key, Option<Hash>), Error> { @@ -97,13 +95,13 @@ pub async fn check_signature( let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { None + } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) } else { let bytes = hex::decode(authorization.content_sha256) .ok_or_bad_request("Invalid content sha256 hash")?; - Some( - Hash::try_from(&bytes[..]) - .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, - ) + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) }; Ok((key, content_sha256)) @@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: & .join("\n") } -fn signing_hmac( - datetime: &DateTime<Utc>, - secret_key: &str, - region: &str, - service: &str, -) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { - let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; - date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; - region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; - service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; - signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; - Ok(hmac) -} - fn canonical_request( method: &Method, url_path: &str, @@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { "".to_string() } } - -pub fn verify_signed_content(content_sha256: Option<Hash>, body: &[u8]) -> Result<(), Error> { - let expected_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs new file mode 100644 index 00000000..b2dc1591 --- /dev/null +++ b/src/api/signature/streaming.rs @@ -0,0 +1,319 @@ +use std::pin::Pin; + +use chrono::{DateTime, Utc}; +use futures::prelude::*; +use futures::task; +use hyper::body::Bytes; + +use garage_util::data::Hash; +use hmac::Mac; + +use super::sha256sum; +use super::HmacSha256; +use super::LONG_DATETIME; + +use crate::error::*; + +/// Result of `sha256("")` +const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +fn compute_streaming_payload_signature( + signing_hmac: &HmacSha256, + date: DateTime<Utc>, + scope: &str, + previous_signature: Hash, + content_sha256: Hash, +) -> Result<Hash, Error> { + let string_to_sign = [ + "AWS4-HMAC-SHA256-PAYLOAD", + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + EMPTY_STRING_HEX_DIGEST, + &hex::encode(content_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") +} + +mod payload { + use garage_util::data::Hash; + + pub enum Error<I> { + Parser(nom::error::Error<I>), + BadSignature, + } + + impl<I> Error<I> { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct Header { + pub size: usize, + pub signature: Hash, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = Header { + size: size as usize, + signature, + }; + + Ok((input, header)) + } + } +} + +#[derive(Debug)] +pub enum SignedPayloadStreamError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl SignedPayloadStreamError { + fn message(msg: &str) -> Self { + SignedPayloadStreamError::Message(msg.into()) + } +} + +impl From<SignedPayloadStreamError> for Error { + fn from(err: SignedPayloadStreamError) -> Self { + match err { + SignedPayloadStreamError::Stream(e) => e, + SignedPayloadStreamError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } + SignedPayloadStreamError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl<I> From<payload::Error<I>> for SignedPayloadStreamError { + fn from(err: payload::Error<I>) -> Self { + Self::message(err.description()) + } +} + +impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError { + fn from(err: nom::error::Error<I>) -> Self { + Self::message(err.code.description()) + } +} + +struct SignedPayload { + header: payload::Header, + data: Bytes, +} + +#[pin_project::pin_project] +pub struct SignedPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + datetime: DateTime<Utc>, + scope: String, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +impl<S> SignedPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>>, +{ + pub fn new( + stream: S, + signing_hmac: HmacSha256, + datetime: DateTime<Utc>, + scope: &str, + seed_signature: Hash, + ) -> Result<Self, Error> { + Ok(Self { + stream, + buf: bytes::BytesMut::new(), + datetime, + scope: scope.into(), + signing_hmac, + previous_signature: seed_signature, + }) + } + + fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + return Ok(( + input, + SignedPayload { + header, + data: Bytes::new(), + }, + )); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, SignedPayload { header, data })) + } +} + +impl<S> Stream for SignedPayloadStream<S> +where + S: Stream<Item = Result<Bytes, Error>> + Unpin, +{ + type Item = Result<Bytes, SignedPayloadStreamError>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<Option<Self::Item>> { + use std::task::Poll; + + let mut this = self.project(); + + loop { + let (input, payload) = match Self::parse_next(this.buf) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); + } + } + } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + // 0-sized chunk is the last + if payload.data.is_empty() { + return Poll::Ready(None); + } + + let data_sha256sum = sha256sum(&payload.data); + + let expected_signature = compute_streaming_payload_signature( + this.signing_hmac, + *this.datetime, + this.scope, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) + })?; + + if payload.header.signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); + } + + *this.buf = input.into(); + *this.previous_signature = payload.header.signature; + + return Poll::Ready(Some(Ok(payload.data))); + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} + +#[cfg(test)] +mod tests { + use futures::prelude::*; + + use super::{SignedPayloadStream, SignedPayloadStreamError}; + + #[tokio::test] + async fn test_interrupted_signed_payload_stream() { + use chrono::{DateTime, Utc}; + + use garage_util::data::Hash; + + let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 + .unwrap() + .with_timezone(&Utc); + let secret_key = "test"; + let region = "test"; + let scope = crate::signature::compute_scope(&datetime, region); + let signing_hmac = + crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); + + let data: &[&[u8]] = &[b"1"]; + let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into()))); + + let seed_signature = Hash::default(); + + let mut stream = + SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap(); + + assert!(stream.try_next().await.is_err()); + match stream.try_next().await { + Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + item => panic!( + "Unexpected result, expected early EOF error, got {:?}", + item + ), + } + } +} |