aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml2
-rw-r--r--src/api/api_server.rs6
-rw-r--r--src/api/s3_bucket.rs13
-rw-r--r--src/api/s3_delete.rs5
-rw-r--r--src/api/s3_put.rs110
-rw-r--r--src/api/s3_website.rs5
-rw-r--r--src/api/signature/mod.rs47
-rw-r--r--src/api/signature/payload.rs (renamed from src/api/signature.rs)52
-rw-r--r--src/api/signature/streaming.rs319
9 files changed, 484 insertions, 75 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index ca4950a1..e93e5ec5 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -28,10 +28,12 @@ hmac = "0.10"
idna = "0.2"
log = "0.4"
md-5 = "0.9"
+nom = "7.1"
sha2 = "0.9"
futures = "0.3"
futures-util = "0.3"
+pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index ea1990d9..c4606226 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -15,7 +15,7 @@ use garage_model::garage::Garage;
use garage_model::key_table::Key;
use crate::error::*;
-use crate::signature::check_signature;
+use crate::signature::payload::check_payload_signature;
use crate::helpers::*;
use crate::s3_bucket::*;
@@ -90,7 +90,7 @@ async fn handler(
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
- let (api_key, content_sha256) = check_signature(&garage, &req).await?;
+ let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let authority = req
.headers()
@@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.await
}
Endpoint::PutObject { key, .. } => {
- handle_put(garage, req, bucket_id, &key, content_sha256).await
+ handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
}
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs
index 494224c8..8a5407d3 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3_bucket.rs
@@ -120,7 +120,10 @@ pub async fn handle_create_bucket(
bucket_name: String,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
- verify_signed_content(content_sha256, &body[..])?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
let cmd =
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
@@ -320,7 +323,7 @@ mod tests {
assert_eq!(
parse_create_bucket_xml(
br#"
- <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
</CreateBucketConfiguration >
"#
),
@@ -329,8 +332,8 @@ mod tests {
assert_eq!(
parse_create_bucket_xml(
br#"
- <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
- <LocationConstraint>Europe</LocationConstraint>
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <LocationConstraint>Europe</LocationConstraint>
</CreateBucketConfiguration >
"#
),
@@ -339,7 +342,7 @@ mod tests {
assert_eq!(
parse_create_bucket_xml(
br#"
- <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
</Crea >
"#
),
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 9e267490..b243d982 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -81,7 +81,10 @@ pub async fn handle_delete_objects(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
- verify_signed_content(content_sha256, &body[..])?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 37658172..4e85664b 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -1,8 +1,10 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
-use futures::stream::*;
-use hyper::{Body, Request, Response};
+use chrono::{DateTime, NaiveDateTime, Utc};
+use futures::{prelude::*, TryFutureExt};
+use hyper::body::{Body, Bytes};
+use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@@ -14,19 +16,23 @@ use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
+use garage_model::key_table::Key;
use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
use crate::s3_xml;
-use crate::signature::verify_signed_content;
+use crate::signature::streaming::SignedPayloadStream;
+use crate::signature::LONG_DATETIME;
+use crate::signature::{compute_scope, verify_signed_content};
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
key: &str,
- content_sha256: Option<Hash>,
+ api_key: &Key,
+ mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
@@ -40,11 +46,53 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
};
+ let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
+ Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
+ let content_sha256 = content_sha256
+ .take()
+ .ok_or_bad_request("No signature provided")?;
+ Some(content_sha256)
+ }
+ _ => None,
+ };
// Parse body of uploaded file
- let body = req.into_body();
+ let (head, body) = req.into_parts();
+ let body = body.map_err(Error::from);
+
+ let body = if let Some(signature) = payload_seed_signature {
+ let secret_key = &api_key
+ .state
+ .as_option()
+ .ok_or_internal_error("Deleted key state")?
+ .secret_key;
+
+ let date = head
+ .headers
+ .get("x-amz-date")
+ .ok_or_bad_request("Missing X-Amz-Date field")?
+ .to_str()?;
+ let date: NaiveDateTime =
+ NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
+ let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
+
+ let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
+ let signing_hmac = crate::signature::signing_hmac(
+ &date,
+ secret_key,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )
+ .ok_or_internal_error("Unable to build signing HMAC")?;
- let mut chunker = BodyChunker::new(body, garage.config.block_size);
+ SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
+ .map_err(Error::from)
+ .boxed()
+ } else {
+ body.boxed()
+ };
+
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
// If body is small enough, store it directly in the object table
@@ -178,13 +226,13 @@ fn ensure_checksum_matches(
Ok(())
}
-async fn read_and_put_blocks(
+async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
- chunker: &mut BodyChunker,
+ chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
@@ -205,8 +253,11 @@ async fn read_and_put_blocks(
.rpc_put_block(first_block_hash, first_block);
loop {
- let (_, _, next_block) =
- futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
+ let (_, _, next_block) = futures::try_join!(
+ put_curr_block.map_err(Error::from),
+ put_curr_version_block.map_err(Error::from),
+ chunker.next(),
+ )?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
@@ -266,32 +317,34 @@ async fn put_block_meta(
Ok(())
}
-struct BodyChunker {
- body: Body,
+struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
+ stream: S,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
-impl BodyChunker {
- fn new(body: Body, block_size: usize) -> Self {
+impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
+ fn new(stream: S, block_size: usize) -> Self {
Self {
- body,
+ stream,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
+
+ async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
while !self.read_all && self.buf.len() < self.block_size {
- if let Some(block) = self.body.next().await {
+ if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(&bytes[..]);
+ self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
+
if self.buf.is_empty() {
Ok(None)
} else if self.buf.len() <= self.block_size {
@@ -368,12 +421,20 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
- let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
+
+ let body = req.into_body().map_err(Error::from);
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (object, version, first_block) = futures::try_join!(
- garage.object_table.get(&bucket_id, &key),
- garage.version_table.get(&version_uuid, &EmptyKey),
- chunker.next()
+ garage
+ .object_table
+ .get(&bucket_id, &key)
+ .map_err(Error::from),
+ garage
+ .version_table
+ .get(&version_uuid, &EmptyKey)
+ .map_err(Error::from),
+ chunker.next(),
)?;
// Check object is valid and multipart block can be accepted
@@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
- verify_signed_content(content_sha256, &body[..])?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs
index fcf8cba3..c4a43e2c 100644
--- a/src/api/s3_website.rs
+++ b/src/api/s3_website.rs
@@ -80,7 +80,10 @@ pub async fn handle_put_website(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
- verify_signed_content(content_sha256, &body[..])?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
let mut bucket = garage
.bucket_table
diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs
new file mode 100644
index 00000000..ebdee6da
--- /dev/null
+++ b/src/api/signature/mod.rs
@@ -0,0 +1,47 @@
+use chrono::{DateTime, Utc};
+use hmac::{Hmac, Mac, NewMac};
+use sha2::Sha256;
+
+use garage_util::data::{sha256sum, Hash};
+
+use crate::error::*;
+
+pub mod payload;
+pub mod streaming;
+
+pub const SHORT_DATE: &str = "%Y%m%d";
+pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
+
+type HmacSha256 = Hmac<Sha256>;
+
+pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
+ if expected_sha256 != sha256sum(body) {
+ return Err(Error::BadRequest(
+ "Request content hash does not match signed hash".to_string(),
+ ));
+ }
+ Ok(())
+}
+
+pub fn signing_hmac(
+ datetime: &DateTime<Utc>,
+ secret_key: &str,
+ region: &str,
+ service: &str,
+) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
+ let secret = String::from("AWS4") + secret_key;
+ let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
+ date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
+ let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
+ region_hmac.update(region.as_bytes());
+ let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
+ service_hmac.update(service.as_bytes());
+ let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
+ signing_hmac.update(b"aws4_request");
+ let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
+ Ok(hmac)
+}
+
+pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
+ format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
+}
diff --git a/src/api/signature.rs b/src/api/signature/payload.rs
index 311e6a9a..b13819a8 100644
--- a/src/api/signature.rs
+++ b/src/api/signature/payload.rs
@@ -1,25 +1,23 @@
use std::collections::HashMap;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
-use hmac::{Hmac, Mac, NewMac};
+use hmac::Mac;
use hyper::{Body, Method, Request};
use sha2::{Digest, Sha256};
use garage_table::*;
-use garage_util::data::{sha256sum, Hash};
+use garage_util::data::Hash;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use super::signing_hmac;
+use super::{LONG_DATETIME, SHORT_DATE};
+
use crate::encoding::uri_encode;
use crate::error::*;
-const SHORT_DATE: &str = "%Y%m%d";
-const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
-
-type HmacSha256 = Hmac<Sha256>;
-
-pub async fn check_signature(
+pub async fn check_payload_signature(
garage: &Garage,
request: &Request<Body>,
) -> Result<(Key, Option<Hash>), Error> {
@@ -97,13 +95,13 @@ pub async fn check_signature(
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None
+ } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
+ let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
+ Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
- Some(
- Hash::try_from(&bytes[..])
- .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?,
- )
+ Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
};
Ok((key, content_sha256))
@@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &
.join("\n")
}
-fn signing_hmac(
- datetime: &DateTime<Utc>,
- secret_key: &str,
- region: &str,
- service: &str,
-) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
- let secret = String::from("AWS4") + secret_key;
- let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
- date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
- let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
- region_hmac.update(region.as_bytes());
- let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
- service_hmac.update(service.as_bytes());
- let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
- signing_hmac.update(b"aws4_request");
- let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
- Ok(hmac)
-}
-
fn canonical_request(
method: &Method,
url_path: &str,
@@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
"".to_string()
}
}
-
-pub fn verify_signed_content(content_sha256: Option<Hash>, body: &[u8]) -> Result<(), Error> {
- let expected_sha256 =
- content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
- if expected_sha256 != sha256sum(body) {
- return Err(Error::BadRequest(
- "Request content hash does not match signed hash".to_string(),
- ));
- }
- Ok(())
-}
diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs
new file mode 100644
index 00000000..b2dc1591
--- /dev/null
+++ b/src/api/signature/streaming.rs
@@ -0,0 +1,319 @@
+use std::pin::Pin;
+
+use chrono::{DateTime, Utc};
+use futures::prelude::*;
+use futures::task;
+use hyper::body::Bytes;
+
+use garage_util::data::Hash;
+use hmac::Mac;
+
+use super::sha256sum;
+use super::HmacSha256;
+use super::LONG_DATETIME;
+
+use crate::error::*;
+
+/// Result of `sha256("")`
+const EMPTY_STRING_HEX_DIGEST: &str =
+ "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+
+fn compute_streaming_payload_signature(
+ signing_hmac: &HmacSha256,
+ date: DateTime<Utc>,
+ scope: &str,
+ previous_signature: Hash,
+ content_sha256: Hash,
+) -> Result<Hash, Error> {
+ let string_to_sign = [
+ "AWS4-HMAC-SHA256-PAYLOAD",
+ &date.format(LONG_DATETIME).to_string(),
+ scope,
+ &hex::encode(previous_signature),
+ EMPTY_STRING_HEX_DIGEST,
+ &hex::encode(content_sha256),
+ ]
+ .join("\n");
+
+ let mut hmac = signing_hmac.clone();
+ hmac.update(string_to_sign.as_bytes());
+
+ Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
+}
+
+mod payload {
+ use garage_util::data::Hash;
+
+ pub enum Error<I> {
+ Parser(nom::error::Error<I>),
+ BadSignature,
+ }
+
+ impl<I> Error<I> {
+ pub fn description(&self) -> &str {
+ match *self {
+ Error::Parser(ref e) => e.code.description(),
+ Error::BadSignature => "Bad signature",
+ }
+ }
+ }
+
+ #[derive(Debug, Clone)]
+ pub struct Header {
+ pub size: usize,
+ pub signature: Hash,
+ }
+
+ impl Header {
+ pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
+ use nom::bytes::streaming::tag;
+ use nom::character::streaming::hex_digit1;
+ use nom::combinator::map_res;
+ use nom::number::streaming::hex_u32;
+
+ macro_rules! try_parse {
+ ($expr:expr) => {
+ $expr.map_err(|e| e.map(Error::Parser))?
+ };
+ }
+
+ let (input, size) = try_parse!(hex_u32(input));
+ let (input, _) = try_parse!(tag(";")(input));
+
+ let (input, _) = try_parse!(tag("chunk-signature=")(input));
+ let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
+ let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
+
+ let (input, _) = try_parse!(tag("\r\n")(input));
+
+ let header = Header {
+ size: size as usize,
+ signature,
+ };
+
+ Ok((input, header))
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum SignedPayloadStreamError {
+ Stream(Error),
+ InvalidSignature,
+ Message(String),
+}
+
+impl SignedPayloadStreamError {
+ fn message(msg: &str) -> Self {
+ SignedPayloadStreamError::Message(msg.into())
+ }
+}
+
+impl From<SignedPayloadStreamError> for Error {
+ fn from(err: SignedPayloadStreamError) -> Self {
+ match err {
+ SignedPayloadStreamError::Stream(e) => e,
+ SignedPayloadStreamError::InvalidSignature => {
+ Error::BadRequest("Invalid payload signature".into())
+ }
+ SignedPayloadStreamError::Message(e) => {
+ Error::BadRequest(format!("Chunk format error: {}", e))
+ }
+ }
+ }
+}
+
+impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
+ fn from(err: payload::Error<I>) -> Self {
+ Self::message(err.description())
+ }
+}
+
+impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
+ fn from(err: nom::error::Error<I>) -> Self {
+ Self::message(err.code.description())
+ }
+}
+
+struct SignedPayload {
+ header: payload::Header,
+ data: Bytes,
+}
+
+#[pin_project::pin_project]
+pub struct SignedPayloadStream<S>
+where
+ S: Stream<Item = Result<Bytes, Error>>,
+{
+ #[pin]
+ stream: S,
+ buf: bytes::BytesMut,
+ datetime: DateTime<Utc>,
+ scope: String,
+ signing_hmac: HmacSha256,
+ previous_signature: Hash,
+}
+
+impl<S> SignedPayloadStream<S>
+where
+ S: Stream<Item = Result<Bytes, Error>>,
+{
+ pub fn new(
+ stream: S,
+ signing_hmac: HmacSha256,
+ datetime: DateTime<Utc>,
+ scope: &str,
+ seed_signature: Hash,
+ ) -> Result<Self, Error> {
+ Ok(Self {
+ stream,
+ buf: bytes::BytesMut::new(),
+ datetime,
+ scope: scope.into(),
+ signing_hmac,
+ previous_signature: seed_signature,
+ })
+ }
+
+ fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
+ use nom::bytes::streaming::{tag, take};
+
+ macro_rules! try_parse {
+ ($expr:expr) => {
+ $expr.map_err(nom::Err::convert)?
+ };
+ }
+
+ let (input, header) = try_parse!(payload::Header::parse(input));
+
+ // 0-sized chunk is the last
+ if header.size == 0 {
+ return Ok((
+ input,
+ SignedPayload {
+ header,
+ data: Bytes::new(),
+ },
+ ));
+ }
+
+ let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
+ let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
+
+ let data = Bytes::from(data.to_vec());
+
+ Ok((input, SignedPayload { header, data }))
+ }
+}
+
+impl<S> Stream for SignedPayloadStream<S>
+where
+ S: Stream<Item = Result<Bytes, Error>> + Unpin,
+{
+ type Item = Result<Bytes, SignedPayloadStreamError>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> task::Poll<Option<Self::Item>> {
+ use std::task::Poll;
+
+ let mut this = self.project();
+
+ loop {
+ let (input, payload) = match Self::parse_next(this.buf) {
+ Ok(res) => res,
+ Err(nom::Err::Incomplete(_)) => {
+ match futures::ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(Ok(bytes)) => {
+ this.buf.extend(bytes);
+ continue;
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
+ }
+ None => {
+ return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
+ "Unexpected EOF",
+ ))));
+ }
+ }
+ }
+ Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
+ return Poll::Ready(Some(Err(e)))
+ }
+ };
+
+ // 0-sized chunk is the last
+ if payload.data.is_empty() {
+ return Poll::Ready(None);
+ }
+
+ let data_sha256sum = sha256sum(&payload.data);
+
+ let expected_signature = compute_streaming_payload_signature(
+ this.signing_hmac,
+ *this.datetime,
+ this.scope,
+ *this.previous_signature,
+ data_sha256sum,
+ )
+ .map_err(|e| {
+ SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
+ })?;
+
+ if payload.header.signature != expected_signature {
+ return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
+ }
+
+ *this.buf = input.into();
+ *this.previous_signature = payload.header.signature;
+
+ return Poll::Ready(Some(Ok(payload.data)));
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures::prelude::*;
+
+ use super::{SignedPayloadStream, SignedPayloadStreamError};
+
+ #[tokio::test]
+ async fn test_interrupted_signed_payload_stream() {
+ use chrono::{DateTime, Utc};
+
+ use garage_util::data::Hash;
+
+ let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0
+ .unwrap()
+ .with_timezone(&Utc);
+ let secret_key = "test";
+ let region = "test";
+ let scope = crate::signature::compute_scope(&datetime, region);
+ let signing_hmac =
+ crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();
+
+ let data: &[&[u8]] = &[b"1"];
+ let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into())));
+
+ let seed_signature = Hash::default();
+
+ let mut stream =
+ SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap();
+
+ assert!(stream.try_next().await.is_err());
+ match stream.try_next().await {
+ Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {}
+ item => panic!(
+ "Unexpected result, expected early EOF error, got {:?}",
+ item
+ ),
+ }
+ }
+}