aboutsummaryrefslogtreecommitdiff
path: root/src/api/common/signature/body.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/common/signature/body.rs')
-rw-r--r--src/api/common/signature/body.rs69
1 files changed, 69 insertions, 0 deletions
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs
new file mode 100644
index 00000000..877d8d85
--- /dev/null
+++ b/src/api/common/signature/body.rs
@@ -0,0 +1,69 @@
+use std::sync::Mutex;
+
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use http_body_util::{BodyExt, StreamBody};
+use hyper::body::{Bytes, Frame};
+use serde::Deserialize;
+use tokio::sync::{mpsc, oneshot};
+
+use super::*;
+
+use crate::signature::checksum::*;
+
+pub struct ReqBody {
+ // why need mutex to be sync??
+ pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
+ pub checksummer: Checksummer,
+ pub expected_checksums: ExpectedChecksums,
+}
+
+pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
+
+impl ReqBody {
+ pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
+ let body = self.collect().await?;
+ let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ Ok(resp)
+ }
+
+ pub async fn collect(self) -> Result<Bytes, Error> {
+ self.collect_with_checksums().await.map(|(b, _)| b)
+ }
+
+ pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
+
+ self.checksummer.update(&bytes);
+ let checksums = self.checksummer.finalize();
+ checksums.verify(&self.expected_checksums)?;
+
+ Ok((bytes, checksums))
+ }
+
+ pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
+ self.streaming_with_checksums(false).0
+ }
+
+ pub fn streaming_with_checksums(
+ self,
+ add_md5: bool,
+ ) -> (
+ impl Stream<Item = Result<Bytes, Error>>,
+ StreamingChecksumReceiver,
+ ) {
+ let (tx, rx) = oneshot::channel();
+ // TODO: actually calculate checksums!!
+ let stream: BoxStream<_> = self.stream.into_inner().unwrap();
+ (
+ stream.map(|x| {
+ x.and_then(|f| {
+ f.into_data()
+ .map_err(|_| Error::bad_request("non-data frame"))
+ })
+ }),
+ rx,
+ )
+ }
+}