aboutsummaryrefslogblamecommitdiff
path: root/src/api/common/signature/streaming.rs
blob: 3ffc5b2fd7a141d6f21eedabf67624649b77a7d3 (plain) (tree)
1
2
3
4
5
6
7
8
9
                  
                     
 
                                                     

                        
              
                                                          
                   

                            
 
             
 

                                  
                                                
 
                                        
 
                            
                                   
                                             

                      
                                      
                                                    



                                                                                     
                                    



                                                                            

                                                                              
                                                
                                                              
                                                                                                


                                   




                                                                                              










































                                                                                                                

                                                
                                                                           
 
                                                           





                                                                                                                       

                           







                                                                                                


         





                                       
                                          
                              
                                         










                                                        
























                                                                                                  




                                    










                                                                














                                                                             
                                
                                
                                            

         

                                                                                              








                                                                                                             












                                                                                                
                                                    
                                                




                                           




















































                                                                                                             


                
                                




                         
                            
                                       
                                                          


         

                                                     
                           

                                                                    
                                                                               
                         
                                                              
                                                                                        




                         
                                                           




                                                 
                                                              




                                                     












                                             


                           
                                    





                                               

                                    

 
                                 


                                               
                                                                                   
                      

                                                    

                                    
                 

         




                                                                                







                                                                 




                                                                               


                                            















                                                                                                






                                                                                                       
                                                                          


         
                                            


                                                       
                                                                









                                              
















                                                                                                                               
                                                 




















                                                                                                                               
                                                 

                                                                                                       
                                         
 
                                                                 
 





                                                                                                                    
 
                                                                                        



























                                                                                                                               
 


                                                                 











                                                       
                                                                               











                                                                                                       
                                                                                     



                                                                                                     
                                                                                                     


                                                     









                                                                   


                                                          
                                                                                                 






                                                                                        
use std::pin::Pin;
use std::sync::Mutex;

use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;

use garage_util::data::Hash;

use super::*;

use crate::helpers::body_stream;
use crate::signature::checksum::*;
use crate::signature::payload::CheckedSignature;

pub use crate::signature::body::ReqBody;

pub fn parse_streaming_body(
	req: Request<IncomingBody>,
	checked_signature: &CheckedSignature,
	region: &str,
	service: &str,
) -> Result<Request<ReqBody>, Error> {
	let expected_checksums = ExpectedChecksums {
		sha256: match &checked_signature.content_sha256_header {
			ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
			_ => None,
		},
		..Default::default()
	};

	let mut checksummer = Checksummer::init(&expected_checksums, false);

	match checked_signature.content_sha256_header {
		ContentSha256Header::StreamingPayload { signed, trailer } => {
			if !signed && !trailer {
				return Err(Error::bad_request(
					"STREAMING-UNSIGNED-PAYLOAD is not a valid combination",
				));
			}

			if trailer {
				let algo = request_trailer_checksum_algorithm(req.headers())?;
				checksummer = checksummer.add(algo);
			}

			let sign_params = if signed {
				let signature = checked_signature
					.signature_header
					.clone()
					.ok_or_bad_request("No signature provided")?;
				let signature = hex::decode(signature)
					.ok()
					.and_then(|bytes| Hash::try_from(&bytes))
					.ok_or_bad_request("Invalid signature")?;

				let secret_key = checked_signature
					.key
					.as_ref()
					.ok_or_bad_request("Cannot sign streaming payload without signing key")?
					.state
					.as_option()
					.ok_or_internal_error("Deleted key state")?
					.secret_key
					.to_string();

				let date = req
					.headers()
					.get(X_AMZ_DATE)
					.ok_or_bad_request("Missing X-Amz-Date field")?
					.to_str()?;
				let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
					.ok_or_bad_request("Invalid date")?;
				let date: DateTime<Utc> = Utc.from_utc_datetime(&date);

				let scope = compute_scope(&date, region, service);
				let signing_hmac =
					crate::signature::signing_hmac(&date, &secret_key, region, service)
						.ok_or_internal_error("Unable to build signing HMAC")?;

				Some(SignParams {
					datetime: date,
					scope,
					signing_hmac,
					previous_signature: signature,
				})
			} else {
				None
			};

			Ok(req.map(move |body| {
				let stream = body_stream::<_, Error>(body);

				let signed_payload_stream =
					StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from);
				ReqBody {
					stream: Mutex::new(signed_payload_stream.boxed()),
					checksummer,
					expected_checksums,
				}
			}))
		}
		_ => Ok(req.map(|body| {
			let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
			ReqBody {
				stream: Mutex::new(stream.boxed()),
				checksummer,
				expected_checksums,
			}
		})),
	}
}

fn compute_streaming_payload_signature(
	signing_hmac: &HmacSha256,
	date: DateTime<Utc>,
	scope: &str,
	previous_signature: Hash,
	content_sha256: Hash,
) -> Result<Hash, StreamingPayloadError> {
	let string_to_sign = [
		AWS4_HMAC_SHA256_PAYLOAD,
		&date.format(LONG_DATETIME).to_string(),
		scope,
		&hex::encode(previous_signature),
		EMPTY_STRING_HEX_DIGEST,
		&hex::encode(content_sha256),
	]
	.join("\n");

	let mut hmac = signing_hmac.clone();
	hmac.update(string_to_sign.as_bytes());

	Hash::try_from(&hmac.finalize().into_bytes())
		.ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into()))
}

fn compute_streaming_trailer_signature(
	signing_hmac: &HmacSha256,
	date: DateTime<Utc>,
	scope: &str,
	previous_signature: Hash,
	trailer_sha256: Hash,
) -> Result<Hash, StreamingPayloadError> {
	let string_to_sign = [
		AWS4_HMAC_SHA256_PAYLOAD,
		&date.format(LONG_DATETIME).to_string(),
		scope,
		&hex::encode(previous_signature),
		&hex::encode(trailer_sha256),
	]
	.join("\n");

	let mut hmac = signing_hmac.clone();
	hmac.update(string_to_sign.as_bytes());

	Hash::try_from(&hmac.finalize().into_bytes())
		.ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into()))
}

mod payload {
	use garage_util::data::Hash;

	use nom::bytes::streaming::{tag, take_while};
	use nom::character::streaming::hex_digit1;
	use nom::combinator::map_res;
	use nom::number::streaming::hex_u32;

	macro_rules! try_parse {
		($expr:expr) => {
			$expr.map_err(|e| e.map(Error::Parser))?
		};
	}

	pub enum Error<I> {
		Parser(nom::error::Error<I>),
		BadSignature,
	}

	impl<I> Error<I> {
		pub fn description(&self) -> &str {
			match *self {
				Error::Parser(ref e) => e.code.description(),
				Error::BadSignature => "Bad signature",
			}
		}
	}

	#[derive(Debug, Clone)]
	pub struct ChunkHeader {
		pub size: usize,
		pub signature: Option<Hash>,
	}

	impl ChunkHeader {
		pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
			let (input, size) = try_parse!(hex_u32(input));
			let (input, _) = try_parse!(tag(";")(input));

			let (input, _) = try_parse!(tag("chunk-signature=")(input));
			let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
			let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;

			let (input, _) = try_parse!(tag("\r\n")(input));

			let header = ChunkHeader {
				size: size as usize,
				signature: Some(signature),
			};

			Ok((input, header))
		}

		pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
			let (input, size) = try_parse!(hex_u32(input));
			let (input, _) = try_parse!(tag("\r\n")(input));

			let header = ChunkHeader {
				size: size as usize,
				signature: None,
			};

			Ok((input, header))
		}
	}

	#[derive(Debug, Clone)]
	pub struct TrailerChunk {
		pub header_name: Vec<u8>,
		pub header_value: Vec<u8>,
		pub signature: Option<Hash>,
	}

	impl TrailerChunk {
		fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
			let (input, header_name) = try_parse!(take_while(
				|c: u8| c.is_ascii_alphanumeric() || c == b'-'
			)(input));
			let (input, _) = try_parse!(tag(b":")(input));
			let (input, header_value) = try_parse!(take_while(
				|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)
			)(input));
			let (input, _) = try_parse!(tag(b"\n")(input));

			Ok((
				input,
				TrailerChunk {
					header_name: header_name.to_vec(),
					header_value: header_value.to_vec(),
					signature: None,
				},
			))
		}
		pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
			let (input, trailer) = Self::parse_content(input)?;

			let (input, _) = try_parse!(tag(b"\r\n\r\n")(input));

			Ok((input, trailer))
		}
		pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
			let (input, trailer) = Self::parse_content(input)?;

			let (input, _) = try_parse!(tag(b"\r\n")(input));

			let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
			let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
			let (input, _) = try_parse!(tag(b"\r\n")(input));

			Ok((
				input,
				TrailerChunk {
					signature: Some(signature),
					..trailer
				},
			))
		}
	}
}

#[derive(Debug)]
pub enum StreamingPayloadError {
	Stream(Error),
	InvalidSignature,
	Message(String),
}

impl StreamingPayloadError {
	fn message(msg: &str) -> Self {
		StreamingPayloadError::Message(msg.into())
	}
}

impl From<StreamingPayloadError> for Error {
	fn from(err: StreamingPayloadError) -> Self {
		match err {
			StreamingPayloadError::Stream(e) => e,
			StreamingPayloadError::InvalidSignature => {
				Error::bad_request("Invalid payload signature")
			}
			StreamingPayloadError::Message(e) => {
				Error::bad_request(format!("Chunk format error: {}", e))
			}
		}
	}
}

impl<I> From<payload::Error<I>> for StreamingPayloadError {
	fn from(err: payload::Error<I>) -> Self {
		Self::message(err.description())
	}
}

impl<I> From<nom::error::Error<I>> for StreamingPayloadError {
	fn from(err: nom::error::Error<I>) -> Self {
		Self::message(err.code.description())
	}
}

enum StreamingPayloadChunk {
	Chunk {
		header: payload::ChunkHeader,
		data: Bytes,
	},
	Trailer(payload::TrailerChunk),
}

struct SignParams {
	datetime: DateTime<Utc>,
	scope: String,
	signing_hmac: HmacSha256,
	previous_signature: Hash,
}

#[pin_project::pin_project]
pub struct StreamingPayloadStream<S>
where
	S: Stream<Item = Result<Bytes, Error>>,
{
	#[pin]
	stream: S,
	buf: bytes::BytesMut,
	signing: Option<SignParams>,
	has_trailer: bool,
}

impl<S> StreamingPayloadStream<S>
where
	S: Stream<Item = Result<Bytes, Error>>,
{
	fn new(stream: S, signing: Option<SignParams>, has_trailer: bool) -> Self {
		Self {
			stream,
			buf: bytes::BytesMut::new(),
			signing,
			has_trailer,
		}
	}

	fn parse_next(
		input: &[u8],
		is_signed: bool,
		has_trailer: bool,
	) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> {
		use nom::bytes::streaming::{tag, take};

		macro_rules! try_parse {
			($expr:expr) => {
				$expr.map_err(nom::Err::convert)?
			};
		}

		let (input, header) = if is_signed {
			try_parse!(payload::ChunkHeader::parse_signed(input))
		} else {
			try_parse!(payload::ChunkHeader::parse_unsigned(input))
		};

		// 0-sized chunk is the last
		if header.size == 0 {
			if has_trailer {
				let (input, trailer) = if is_signed {
					try_parse!(payload::TrailerChunk::parse_signed(input))
				} else {
					try_parse!(payload::TrailerChunk::parse_unsigned(input))
				};
				return Ok((input, StreamingPayloadChunk::Trailer(trailer)));
			} else {
				return Ok((
					input,
					StreamingPayloadChunk::Chunk {
						header,
						data: Bytes::new(),
					},
				));
			}
		}

		let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
		let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));

		let data = Bytes::from(data.to_vec());

		Ok((input, StreamingPayloadChunk::Chunk { header, data }))
	}
}

impl<S> Stream for StreamingPayloadStream<S>
where
	S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
	type Item = Result<Frame<Bytes>, StreamingPayloadError>;

	fn poll_next(
		self: Pin<&mut Self>,
		cx: &mut task::Context<'_>,
	) -> task::Poll<Option<Self::Item>> {
		use std::task::Poll;

		let mut this = self.project();

		loop {
			let (input, payload) =
				match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) {
					Ok(res) => res,
					Err(nom::Err::Incomplete(_)) => {
						match futures::ready!(this.stream.as_mut().poll_next(cx)) {
							Some(Ok(bytes)) => {
								this.buf.extend(bytes);
								continue;
							}
							Some(Err(e)) => {
								return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e))))
							}
							None => {
								return Poll::Ready(Some(Err(StreamingPayloadError::message(
									"Unexpected EOF",
								))));
							}
						}
					}
					Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
						return Poll::Ready(Some(Err(e)))
					}
				};

			match payload {
				StreamingPayloadChunk::Chunk { data, header } => {
					if let Some(signing) = this.signing.as_mut() {
						let data_sha256sum = sha256sum(&data);

						let expected_signature = compute_streaming_payload_signature(
							&signing.signing_hmac,
							signing.datetime,
							&signing.scope,
							signing.previous_signature,
							data_sha256sum,
						)?;

						if header.signature.unwrap() != expected_signature {
							return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature)));
						}

						signing.previous_signature = header.signature.unwrap();
					}

					*this.buf = input.into();

					// 0-sized chunk is the last
					if data.is_empty() {
						// if there was a trailer, it would have been returned by the parser
						assert!(!*this.has_trailer);
						return Poll::Ready(None);
					}

					return Poll::Ready(Some(Ok(Frame::data(data))));
				}
				StreamingPayloadChunk::Trailer(trailer) => {
					if let Some(signing) = this.signing.as_mut() {
						let data = [
							&trailer.header_name[..],
							&b":"[..],
							&trailer.header_value[..],
							&b"\n"[..],
						]
						.concat();
						let trailer_sha256sum = sha256sum(&data);

						let expected_signature = compute_streaming_trailer_signature(
							&signing.signing_hmac,
							signing.datetime,
							&signing.scope,
							signing.previous_signature,
							trailer_sha256sum,
						)?;

						if trailer.signature.unwrap() != expected_signature {
							return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature)));
						}
					}

					*this.buf = input.into();

					// TODO: handle trailer

					return Poll::Ready(None);
				}
			}
		}
	}

	fn size_hint(&self) -> (usize, Option<usize>) {
		self.stream.size_hint()
	}
}

#[cfg(test)]
mod tests {
	use futures::prelude::*;

	use super::{SignParams, StreamingPayloadError, StreamingPayloadStream};

	#[tokio::test]
	async fn test_interrupted_signed_payload_stream() {
		use chrono::{DateTime, Utc};

		use garage_util::data::Hash;

		let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0
			.unwrap()
			.with_timezone(&Utc);
		let secret_key = "test";
		let region = "test";
		let scope = crate::signature::compute_scope(&datetime, region, "s3");
		let signing_hmac =
			crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();

		let data: &[&[u8]] = &[b"1"];
		let body = futures::stream::iter(data.iter().map(|block| Ok(block.to_vec().into())));

		let seed_signature = Hash::default();

		let mut stream = StreamingPayloadStream::new(
			body,
			Some(SignParams {
				signing_hmac,
				datetime,
				scope,
				previous_signature: seed_signature,
			}),
			false,
		);

		assert!(stream.try_next().await.is_err());
		match stream.try_next().await {
			Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {}
			item => panic!(
				"Unexpected result, expected early EOF error, got {:?}",
				item
			),
		}
	}
}