use std::pin::Pin;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::prelude::*;
use futures::task;
use garage_model::key_table::Key;
use hmac::Mac;
use hyper::body::Bytes;
use hyper::{Body, Request};
use garage_util::data::Hash;
use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
use crate::error::*;
pub fn parse_streaming_body(
api_key: &Key,
req: Request
,
content_sha256: &mut Option,
region: &str,
service: &str,
) -> Result, Error> {
match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = req
.headers()
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, region, service);
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
Body::wrap_stream(
SignedPayloadStream::new(
body.map_err(Error::from),
signing_hmac,
date,
&scope,
signature,
)
.map_err(Error::from),
)
}))
}
_ => Ok(req),
}
}
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
fn compute_streaming_payload_signature(
signing_hmac: &HmacSha256,
date: DateTime,
scope: &str,
previous_signature: Hash,
content_sha256: Hash,
) -> Result {
let string_to_sign = [
"AWS4-HMAC-SHA256-PAYLOAD",
&date.format(LONG_DATETIME).to_string(),
scope,
&hex::encode(previous_signature),
EMPTY_STRING_HEX_DIGEST,
&hex::encode(content_sha256),
]
.join("\n");
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?)
}
mod payload {
use garage_util::data::Hash;
pub enum Error {
Parser(nom::error::Error),
BadSignature,
}
impl Error {
pub fn description(&self) -> &str {
match *self {
Error::Parser(ref e) => e.code.description(),
Error::BadSignature => "Bad signature",
}
}
}
#[derive(Debug, Clone)]
pub struct Header {
pub size: usize,
pub signature: Hash,
}
impl Header {
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
use nom::bytes::streaming::tag;
use nom::character::streaming::hex_digit1;
use nom::combinator::map_res;
use nom::number::streaming::hex_u32;
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(|e| e.map(Error::Parser))?
};
}
let (input, size) = try_parse!(hex_u32(input));
let (input, _) = try_parse!(tag(";")(input));
let (input, _) = try_parse!(tag("chunk-signature=")(input));
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
let (input, _) = try_parse!(tag("\r\n")(input));
let header = Header {
size: size as usize,
signature,
};
Ok((input, header))
}
}
}
#[derive(Debug)]
pub enum SignedPayloadStreamError {
Stream(Error),
InvalidSignature,
Message(String),
}
impl SignedPayloadStreamError {
fn message(msg: &str) -> Self {
SignedPayloadStreamError::Message(msg.into())
}
}
impl From for Error {
fn from(err: SignedPayloadStreamError) -> Self {
match err {
SignedPayloadStreamError::Stream(e) => e,
SignedPayloadStreamError::InvalidSignature => {
Error::bad_request("Invalid payload signature")
}
SignedPayloadStreamError::Message(e) => {
Error::bad_request(format!("Chunk format error: {}", e))
}
}
}
}
impl From> for SignedPayloadStreamError {
fn from(err: payload::Error) -> Self {
Self::message(err.description())
}
}
impl From> for SignedPayloadStreamError {
fn from(err: nom::error::Error) -> Self {
Self::message(err.code.description())
}
}
struct SignedPayload {
header: payload::Header,
data: Bytes,
}
#[pin_project::pin_project]
pub struct SignedPayloadStream
where
S: Stream>,
{
#[pin]
stream: S,
buf: bytes::BytesMut,
datetime: DateTime,
scope: String,
signing_hmac: HmacSha256,
previous_signature: Hash,
}
impl SignedPayloadStream
where
S: Stream>,
{
pub fn new(
stream: S,
signing_hmac: HmacSha256,
datetime: DateTime,
scope: &str,
seed_signature: Hash,
) -> Self {
Self {
stream,
buf: bytes::BytesMut::new(),
datetime,
scope: scope.into(),
signing_hmac,
previous_signature: seed_signature,
}
}
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
use nom::bytes::streaming::{tag, take};
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(nom::Err::convert)?
};
}
let (input, header) = try_parse!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
return Ok((
input,
SignedPayload {
header,
data: Bytes::new(),
},
));
}
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
Ok((input, SignedPayload { header, data }))
}
}
impl Stream for SignedPayloadStream
where
S: Stream> + Unpin,
{
type Item = Result;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll