aboutsummaryrefslogtreecommitdiff
path: root/src/api/common/signature/streaming.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/common/signature/streaming.rs')
-rw-r--r--src/api/common/signature/streaming.rs58
1 files changed, 42 insertions, 16 deletions
diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs
index 75f3bf80..70b6e004 100644
--- a/src/api/common/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
@@ -5,7 +5,7 @@ use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
-use http::header::{HeaderValue, CONTENT_ENCODING};
+use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING};
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
@@ -64,10 +64,16 @@ pub fn parse_streaming_body(
}
// If trailer header is announced, add the calculation of the requested checksum
- if trailer {
- let algo = request_trailer_checksum_algorithm(req.headers())?;
+ let trailer_algorithm = if trailer {
+ let algo = Some(
+ request_trailer_checksum_algorithm(req.headers())?
+ .ok_or_bad_request("Missing x-amz-trailer header")?,
+ );
checksummer = checksummer.add(algo);
- }
+ algo
+ } else {
+ None
+ };
// For signed variants, determine signing parameters
let sign_params = if signed {
@@ -123,6 +129,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(signed_payload_stream.boxed()),
checksummer,
expected_checksums,
+ trailer_algorithm,
}
}))
}
@@ -132,6 +139,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(stream.boxed()),
checksummer,
expected_checksums,
+ trailer_algorithm: None,
}
})),
}
@@ -185,6 +193,8 @@ fn compute_streaming_trailer_signature(
}
mod payload {
+ use http::{HeaderName, HeaderValue};
+
use garage_util::data::Hash;
use nom::bytes::streaming::{tag, take_while};
@@ -252,19 +262,21 @@ mod payload {
#[derive(Debug, Clone)]
pub struct TrailerChunk {
- pub header_name: Vec<u8>,
- pub header_value: Vec<u8>,
+ pub header_name: HeaderName,
+ pub header_value: HeaderValue,
pub signature: Option<Hash>,
}
impl TrailerChunk {
fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
- let (input, header_name) = try_parse!(take_while(
- |c: u8| c.is_ascii_alphanumeric() || c == b'-'
+ let (input, header_name) = try_parse!(map_res(
+ take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'),
+ HeaderName::from_bytes
)(input));
let (input, _) = try_parse!(tag(b":")(input));
- let (input, header_value) = try_parse!(take_while(
- |c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)
+ let (input, header_value) = try_parse!(map_res(
+ take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)),
+ HeaderValue::from_bytes
)(input));
// Possible '\n' after the header value, depends on clients
@@ -276,8 +288,8 @@ mod payload {
Ok((
input,
TrailerChunk {
- header_name: header_name.to_vec(),
- header_value: header_value.to_vec(),
+ header_name,
+ header_value,
signature: None,
},
))
@@ -371,6 +383,7 @@ where
buf: bytes::BytesMut,
signing: Option<SignParams>,
has_trailer: bool,
+ done: bool,
}
impl<S> StreamingPayloadStream<S>
@@ -383,6 +396,7 @@ where
buf: bytes::BytesMut::new(),
signing,
has_trailer,
+ done: false,
}
}
@@ -448,6 +462,10 @@ where
let mut this = self.project();
+ if *this.done {
+ return Poll::Ready(None);
+ }
+
loop {
let (input, payload) =
match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) {
@@ -499,17 +517,23 @@ where
if data.is_empty() {
// if there was a trailer, it would have been returned by the parser
assert!(!*this.has_trailer);
+ *this.done = true;
return Poll::Ready(None);
}
return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
+ trace!(
+ "In StreamingPayloadStream::poll_next: got trailer {:?}",
+ trailer
+ );
+
if let Some(signing) = this.signing.as_mut() {
let data = [
- &trailer.header_name[..],
+ trailer.header_name.as_ref(),
&b":"[..],
- &trailer.header_value[..],
+ trailer.header_value.as_ref(),
&b"\n"[..],
]
.concat();
@@ -529,10 +553,12 @@ where
}
*this.buf = input.into();
+ *this.done = true;
- // TODO: handle trailer
+ let mut trailers_map = HeaderMap::new();
+ trailers_map.insert(trailer.header_name, trailer.header_value);
- return Poll::Ready(None);
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers_map))));
}
}
}