use std::pin::Pin; use chrono::{DateTime, NaiveDateTime, Utc}; use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; use hyper::body::Bytes; use hyper::{Body, Request}; use garage_util::data::Hash; use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; use crate::error::*; pub fn parse_streaming_body( api_key: &Key, req: Request, content_sha256: &mut Option, region: &str, service: &str, ) -> Result, Error> { match req.headers().get("x-amz-content-sha256") { Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { let signature = content_sha256 .take() .ok_or_bad_request("No signature provided")?; let secret_key = &api_key .state .as_option() .ok_or_internal_error("Deleted key state")? .secret_key; let date = req .headers() .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field")? .to_str()?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) .ok_or_bad_request("Invalid date")?; let date: DateTime = DateTime::from_utc(date, Utc); let scope = compute_scope(&date, region, service); let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { Body::wrap_stream( SignedPayloadStream::new( body.map_err(Error::from), signing_hmac, date, &scope, signature, ) .map_err(Error::from), ) })) } _ => Ok(req), } } /// Result of `sha256("")` const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; fn compute_streaming_payload_signature( signing_hmac: &HmacSha256, date: DateTime, scope: &str, previous_signature: Hash, content_sha256: Hash, ) -> Result { let string_to_sign = [ "AWS4-HMAC-SHA256-PAYLOAD", &date.format(LONG_DATETIME).to_string(), scope, &hex::encode(previous_signature), EMPTY_STRING_HEX_DIGEST, &hex::encode(content_sha256), ] .join("\n"); let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) } mod payload { use garage_util::data::Hash; pub enum Error { Parser(nom::error::Error), BadSignature, } impl Error { pub fn description(&self) -> &str { match *self { Error::Parser(ref e) => e.code.description(), Error::BadSignature => "Bad signature", } } } #[derive(Debug, Clone)] pub struct Header { pub size: usize, pub signature: Hash, } impl Header { pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { use nom::bytes::streaming::tag; use nom::character::streaming::hex_digit1; use nom::combinator::map_res; use nom::number::streaming::hex_u32; macro_rules! try_parse { ($expr:expr) => { $expr.map_err(|e| e.map(Error::Parser))? }; } let (input, size) = try_parse!(hex_u32(input)); let (input, _) = try_parse!(tag(";")(input)); let (input, _) = try_parse!(tag("chunk-signature=")(input)); let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; let (input, _) = try_parse!(tag("\r\n")(input)); let header = Header { size: size as usize, signature, }; Ok((input, header)) } } } #[derive(Debug)] pub enum SignedPayloadStreamError { Stream(Error), InvalidSignature, Message(String), } impl SignedPayloadStreamError { fn message(msg: &str) -> Self { SignedPayloadStreamError::Message(msg.into()) } } impl From for Error { fn from(err: SignedPayloadStreamError) -> Self { match err { SignedPayloadStreamError::Stream(e) => e, SignedPayloadStreamError::InvalidSignature => { Error::bad_request("Invalid payload signature") } SignedPayloadStreamError::Message(e) => { Error::bad_request(format!("Chunk format error: {}", e)) } } } } impl From> for SignedPayloadStreamError { fn from(err: payload::Error) -> Self { Self::message(err.description()) } } impl From> for SignedPayloadStreamError { fn from(err: nom::error::Error) -> Self { Self::message(err.code.description()) } } struct SignedPayload { header: payload::Header, data: Bytes, } #[pin_project::pin_project] pub struct SignedPayloadStream where S: Stream>, { #[pin] stream: S, buf: bytes::BytesMut, datetime: DateTime, scope: String, signing_hmac: HmacSha256, previous_signature: Hash, } impl SignedPayloadStream where S: Stream>, { pub fn new( stream: S, signing_hmac: HmacSha256, datetime: DateTime, scope: &str, seed_signature: Hash, ) -> Self { Self { stream, buf: bytes::BytesMut::new(), datetime, scope: scope.into(), signing_hmac, previous_signature: seed_signature, } } fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { use nom::bytes::streaming::{tag, take}; macro_rules! try_parse { ($expr:expr) => { $expr.map_err(nom::Err::convert)? }; } let (input, header) = try_parse!(payload::Header::parse(input)); // 0-sized chunk is the last if header.size == 0 { return Ok(( input, SignedPayload { header, data: Bytes::new(), }, )); } let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); let data = Bytes::from(data.to_vec()); Ok((input, SignedPayload { header, data })) } } impl Stream for SignedPayloadStream where S: Stream> + Unpin, { type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll> { use std::task::Poll; let mut this = self.project(); loop { let (input, payload) = match Self::parse_next(this.buf) { Ok(res) => res, Err(nom::Err::Incomplete(_)) => { match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { this.buf.extend(bytes); continue; } Some(Err(e)) => { return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) } None => { return Poll::Ready(Some(Err(SignedPayloadStreamError::message( "Unexpected EOF", )))); } } } Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { return Poll::Ready(Some(Err(e))) } }; // 0-sized chunk is the last if payload.data.is_empty() { return Poll::Ready(None); } let data_sha256sum = sha256sum(&payload.data); let expected_signature = compute_streaming_payload_signature( this.signing_hmac, *this.datetime, this.scope, *this.previous_signature, data_sha256sum, ) .map_err(|e| { SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) })?; if payload.header.signature != expected_signature { return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); } *this.buf = input.into(); *this.previous_signature = payload.header.signature; return Poll::Ready(Some(Ok(payload.data))); } } fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() } } #[cfg(test)] mod tests { use futures::prelude::*; use super::{SignedPayloadStream, SignedPayloadStreamError}; #[tokio::test] async fn test_interrupted_signed_payload_stream() { use chrono::{DateTime, Utc}; use garage_util::data::Hash; let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 .unwrap() .with_timezone(&Utc); let secret_key = "test"; let region = "test"; let scope = crate::signature::compute_scope(&datetime, region, "s3"); let signing_hmac = crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); let data: &[&[u8]] = &[b"1"]; let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into()))); let seed_signature = Hash::default(); let mut stream = SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature); assert!(stream.try_next().await.is_err()); match stream.try_next().await { Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} item => panic!( "Unexpected result, expected early EOF error, got {:?}", item ), } } }