1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
use std::sync::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use serde::Deserialize;
use tokio::sync::{mpsc, oneshot};
use super::*;
use crate::signature::checksum::*;
pub struct ReqBody {
// why need mutex to be sync??
pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub checksummer: Checksummer,
pub expected_checksums: ExpectedChecksums,
}
pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
impl ReqBody {
pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
let body = self.collect().await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
pub async fn collect(self) -> Result<Bytes, Error> {
self.collect_with_checksums().await.map(|(b, _)| b)
}
pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
let stream: BoxStream<_> = self.stream.into_inner().unwrap();
let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
self.checksummer.update(&bytes);
let checksums = self.checksummer.finalize();
checksums.verify(&self.expected_checksums)?;
Ok((bytes, checksums))
}
pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
self.streaming_with_checksums(false).0
}
pub fn streaming_with_checksums(
self,
add_md5: bool,
) -> (
impl Stream<Item = Result<Bytes, Error>>,
StreamingChecksumReceiver,
) {
let (tx, rx) = oneshot::channel();
// TODO: actually calculate checksums!!
let stream: BoxStream<_> = self.stream.into_inner().unwrap();
(
stream.map(|x| {
x.and_then(|f| {
f.into_data()
.map_err(|_| Error::bad_request("non-data frame"))
})
}),
rx,
)
}
}
|