use std::pin::Pin; use std::sync::Mutex; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use hmac::Mac; use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; use hyper::Request; use garage_util::data::Hash; use super::*; use crate::helpers::body_stream; use crate::signature::checksum::*; use crate::signature::payload::CheckedSignature; pub use crate::signature::body::ReqBody; pub fn parse_streaming_body( req: Request, checked_signature: &CheckedSignature, region: &str, service: &str, ) -> Result, Error> { let expected_checksums = ExpectedChecksums { md5: match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, }, sha256: match &checked_signature.content_sha256_header { ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), _ => None, }, extra: None, }; let mut checksummer = Checksummer::init(&expected_checksums, false); match checked_signature.content_sha256_header { ContentSha256Header::StreamingPayload { signed, trailer } => { if !signed && !trailer { return Err(Error::bad_request( "STREAMING-UNSIGNED-PAYLOAD is not a valid combination", )); } if trailer { let algo = request_trailer_checksum_algorithm(req.headers())?; checksummer = checksummer.add(algo); } let sign_params = if signed { let signature = checked_signature .signature_header .clone() .ok_or_bad_request("No signature provided")?; let signature = hex::decode(signature) .ok() .and_then(|bytes| Hash::try_from(&bytes)) .ok_or_bad_request("Invalid signature")?; let secret_key = checked_signature .key .as_ref() .ok_or_bad_request("Cannot sign streaming payload without signing key")? .state .as_option() .ok_or_internal_error("Deleted key state")? .secret_key .to_string(); let date = req .headers() .get(X_AMZ_DATE) .ok_or_bad_request("Missing X-Amz-Date field")? .to_str()?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) .ok_or_bad_request("Invalid date")?; let date: DateTime = Utc.from_utc_datetime(&date); let scope = compute_scope(&date, region, service); let signing_hmac = crate::signature::signing_hmac(&date, &secret_key, region, service) .ok_or_internal_error("Unable to build signing HMAC")?; Some(SignParams { datetime: date, scope, signing_hmac, previous_signature: signature, }) } else { None }; Ok(req.map(move |body| { let stream = body_stream::<_, Error>(body); let signed_payload_stream = StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from); ReqBody { stream: Mutex::new(signed_payload_stream.boxed()), checksummer, expected_checksums, } })) } _ => Ok(req.map(|body| { let stream = http_body_util::BodyStream::new(body).map_err(Error::from); ReqBody { stream: Mutex::new(stream.boxed()), checksummer, expected_checksums, } })), } } fn compute_streaming_payload_signature( signing_hmac: &HmacSha256, date: DateTime, scope: &str, previous_signature: Hash, content_sha256: Hash, ) -> Result { let string_to_sign = [ AWS4_HMAC_SHA256_PAYLOAD, &date.format(LONG_DATETIME).to_string(), scope, &hex::encode(previous_signature), EMPTY_STRING_HEX_DIGEST, &hex::encode(content_sha256), ] .join("\n"); let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); Hash::try_from(&hmac.finalize().into_bytes()) .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) } fn compute_streaming_trailer_signature( signing_hmac: &HmacSha256, date: DateTime, scope: &str, previous_signature: Hash, trailer_sha256: Hash, ) -> Result { let string_to_sign = [ AWS4_HMAC_SHA256_PAYLOAD, &date.format(LONG_DATETIME).to_string(), scope, &hex::encode(previous_signature), &hex::encode(trailer_sha256), ] .join("\n"); let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); Hash::try_from(&hmac.finalize().into_bytes()) .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) } mod payload { use garage_util::data::Hash; use nom::bytes::streaming::{tag, take_while}; use nom::character::streaming::hex_digit1; use nom::combinator::map_res; use nom::number::streaming::hex_u32; macro_rules! try_parse { ($expr:expr) => { $expr.map_err(|e| e.map(Error::Parser))? }; } pub enum Error { Parser(nom::error::Error), BadSignature, } impl Error { pub fn description(&self) -> &str { match *self { Error::Parser(ref e) => e.code.description(), Error::BadSignature => "Bad signature", } } } #[derive(Debug, Clone)] pub struct ChunkHeader { pub size: usize, pub signature: Option, } impl ChunkHeader { pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, size) = try_parse!(hex_u32(input)); let (input, _) = try_parse!(tag(";")(input)); let (input, _) = try_parse!(tag("chunk-signature=")(input)); let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; let (input, _) = try_parse!(tag("\r\n")(input)); let header = ChunkHeader { size: size as usize, signature: Some(signature), }; Ok((input, header)) } pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, size) = try_parse!(hex_u32(input)); let (input, _) = try_parse!(tag("\r\n")(input)); let header = ChunkHeader { size: size as usize, signature: None, }; Ok((input, header)) } } #[derive(Debug, Clone)] pub struct TrailerChunk { pub header_name: Vec, pub header_value: Vec, pub signature: Option, } impl TrailerChunk { fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, header_name) = try_parse!(take_while( |c: u8| c.is_ascii_alphanumeric() || c == b'-' )(input)); let (input, _) = try_parse!(tag(b":")(input)); let (input, header_value) = try_parse!(take_while( |c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c) )(input)); let (input, _) = try_parse!(tag(b"\n")(input)); Ok(( input, TrailerChunk { header_name: header_name.to_vec(), header_value: header_value.to_vec(), signature: None, }, )) } pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, trailer) = Self::parse_content(input)?; let (input, _) = try_parse!(tag(b"\r\n\r\n")(input)); Ok((input, trailer)) } pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, trailer) = Self::parse_content(input)?; let (input, _) = try_parse!(tag(b"\r\n")(input)); let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; let (input, _) = try_parse!(tag(b"\r\n")(input)); Ok(( input, TrailerChunk { signature: Some(signature), ..trailer }, )) } } } #[derive(Debug)] pub enum StreamingPayloadError { Stream(Error), InvalidSignature, Message(String), } impl StreamingPayloadError { fn message(msg: &str) -> Self { StreamingPayloadError::Message(msg.into()) } } impl From for Error { fn from(err: StreamingPayloadError) -> Self { match err { StreamingPayloadError::Stream(e) => e, StreamingPayloadError::InvalidSignature => { Error::bad_request("Invalid payload signature") } StreamingPayloadError::Message(e) => { Error::bad_request(format!("Chunk format error: {}", e)) } } } } impl From> for StreamingPayloadError { fn from(err: payload::Error) -> Self { Self::message(err.description()) } } impl From> for StreamingPayloadError { fn from(err: nom::error::Error) -> Self { Self::message(err.code.description()) } } enum StreamingPayloadChunk { Chunk { header: payload::ChunkHeader, data: Bytes, }, Trailer(payload::TrailerChunk), } struct SignParams { datetime: DateTime, scope: String, signing_hmac: HmacSha256, previous_signature: Hash, } #[pin_project::pin_project] pub struct StreamingPayloadStream where S: Stream>, { #[pin] stream: S, buf: bytes::BytesMut, signing: Option, has_trailer: bool, } impl StreamingPayloadStream where S: Stream>, { fn new(stream: S, signing: Option, has_trailer: bool) -> Self { Self { stream, buf: bytes::BytesMut::new(), signing, has_trailer, } } fn parse_next( input: &[u8], is_signed: bool, has_trailer: bool, ) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> { use nom::bytes::streaming::{tag, take}; macro_rules! try_parse { ($expr:expr) => { $expr.map_err(nom::Err::convert)? }; } let (input, header) = if is_signed { try_parse!(payload::ChunkHeader::parse_signed(input)) } else { try_parse!(payload::ChunkHeader::parse_unsigned(input)) }; // 0-sized chunk is the last if header.size == 0 { if has_trailer { let (input, trailer) = if is_signed { try_parse!(payload::TrailerChunk::parse_signed(input)) } else { try_parse!(payload::TrailerChunk::parse_unsigned(input)) }; return Ok((input, StreamingPayloadChunk::Trailer(trailer))); } else { return Ok(( input, StreamingPayloadChunk::Chunk { header, data: Bytes::new(), }, )); } } let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); let data = Bytes::from(data.to_vec()); Ok((input, StreamingPayloadChunk::Chunk { header, data })) } } impl Stream for StreamingPayloadStream where S: Stream> + Unpin, { type Item = Result, StreamingPayloadError>; fn poll_next( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll> { use std::task::Poll; let mut this = self.project(); loop { let (input, payload) = match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) { Ok(res) => res, Err(nom::Err::Incomplete(_)) => { match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { this.buf.extend(bytes); continue; } Some(Err(e)) => { return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e)))) } None => { return Poll::Ready(Some(Err(StreamingPayloadError::message( "Unexpected EOF", )))); } } } Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { return Poll::Ready(Some(Err(e))) } }; match payload { StreamingPayloadChunk::Chunk { data, header } => { if let Some(signing) = this.signing.as_mut() { let data_sha256sum = sha256sum(&data); let expected_signature = compute_streaming_payload_signature( &signing.signing_hmac, signing.datetime, &signing.scope, signing.previous_signature, data_sha256sum, )?; if header.signature.unwrap() != expected_signature { return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); } signing.previous_signature = header.signature.unwrap(); } *this.buf = input.into(); // 0-sized chunk is the last if data.is_empty() { // if there was a trailer, it would have been returned by the parser assert!(!*this.has_trailer); return Poll::Ready(None); } return Poll::Ready(Some(Ok(Frame::data(data)))); } StreamingPayloadChunk::Trailer(trailer) => { if let Some(signing) = this.signing.as_mut() { let data = [ &trailer.header_name[..], &b":"[..], &trailer.header_value[..], &b"\n"[..], ] .concat(); let trailer_sha256sum = sha256sum(&data); let expected_signature = compute_streaming_trailer_signature( &signing.signing_hmac, signing.datetime, &signing.scope, signing.previous_signature, trailer_sha256sum, )?; if trailer.signature.unwrap() != expected_signature { return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); } } *this.buf = input.into(); // TODO: handle trailer return Poll::Ready(None); } } } } fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() } } #[cfg(test)] mod tests { use futures::prelude::*; use super::{SignParams, StreamingPayloadError, StreamingPayloadStream}; #[tokio::test] async fn test_interrupted_signed_payload_stream() { use chrono::{DateTime, Utc}; use garage_util::data::Hash; let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 .unwrap() .with_timezone(&Utc); let secret_key = "test"; let region = "test"; let scope = crate::signature::compute_scope(&datetime, region, "s3"); let signing_hmac = crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); let data: &[&[u8]] = &[b"1"]; let body = futures::stream::iter(data.iter().map(|block| Ok(block.to_vec().into()))); let seed_signature = Hash::default(); let mut stream = StreamingPayloadStream::new( body, Some(SignParams { signing_hmac, datetime, scope, previous_signature: seed_signature, }), false, ); assert!(stream.try_next().await.is_err()); match stream.try_next().await { Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {} item => panic!( "Unexpected result, expected early EOF error, got {:?}", item ), } } }