diff options
Diffstat (limited to 'src/api/common')
-rw-r--r-- | src/api/common/signature/body.rs | 112 | ||||
-rw-r--r-- | src/api/common/signature/checksum.rs | 28 | ||||
-rw-r--r-- | src/api/common/signature/streaming.rs | 6 |
3 files changed, 110 insertions, 36 deletions
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs index 877d8d85..d8c15ee5 100644 --- a/src/api/common/signature/body.rs +++ b/src/api/common/signature/body.rs @@ -5,7 +5,13 @@ use futures::stream::BoxStream; use http_body_util::{BodyExt, StreamBody}; use hyper::body::{Bytes, Frame}; use serde::Deserialize; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; +use tokio::task; + +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; use super::*; @@ -13,14 +19,33 @@ use crate::signature::checksum::*; pub struct ReqBody { // why need mutex to be sync?? - pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, - pub checksummer: Checksummer, - pub expected_checksums: ExpectedChecksums, + pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, + pub(crate) checksummer: Checksummer, + pub(crate) expected_checksums: ExpectedChecksums, } -pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>; +pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>; impl ReqBody { + pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) { + if more.md5.is_some() { + self.expected_checksums.md5 = more.md5; + } + if more.sha256.is_some() { + self.expected_checksums.sha256 = more.sha256; + } + if more.extra.is_some() { + self.expected_checksums.extra = more.extra; + } + self.checksummer.add_expected(&self.expected_checksums); + } + + pub fn add_md5(&mut self) { + self.checksummer.add_md5(); + } + + // ============ non-streaming ============= + pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { let body = self.collect().await?; let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; @@ -42,28 +67,71 @@ impl ReqBody { Ok((bytes, checksums)) } - pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> { - self.streaming_with_checksums(false).0 - } + // ============ streaming ============= pub fn streaming_with_checksums( self, - add_md5: bool, ) -> ( - impl Stream<Item = Result<Bytes, Error>>, + BoxStream<'static, Result<Bytes, Error>>, StreamingChecksumReceiver, ) { - let (tx, rx) = oneshot::channel(); - // TODO: actually calculate checksums!! - let stream: BoxStream<_> = self.stream.into_inner().unwrap(); - ( - stream.map(|x| { - x.and_then(|f| { - f.into_data() - .map_err(|_| Error::bad_request("non-data frame")) - }) - }), - rx, - ) + let Self { + stream, + mut checksummer, + mut expected_checksums, + } = self; + + let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1); + + let join_checksums = tokio::spawn(async move { + let tracer = opentelemetry::global::tracer("garage"); + + while let Some(frame) = frame_rx.recv().await { + match frame.into_data() { + Ok(data) => { + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&data); + checksummer + }) + .await + .unwrap() + } + Err(frame) => { + let trailers = frame.into_trailers().unwrap(); + if let Some(cv) = request_checksum_value(&trailers)? { + expected_checksums.extra = Some(cv); + } + break; + } + } + } + + let checksums = checksummer.finalize(); + checksums.verify(&expected_checksums)?; + + return Ok(checksums); + }); + + let stream: BoxStream<_> = stream.into_inner().unwrap(); + let stream = stream.filter_map(move |x| { + let frame_tx = frame_tx.clone(); + async move { + match x { + Err(e) => Some(Err(e)), + Ok(frame) => { + if frame.is_data() { + let data = frame.data_ref().unwrap().clone(); + let _ = frame_tx.send(frame).await; + Some(Ok(data)) + } else { + let _ = frame_tx.send(frame).await; + None + } + } + } + } + }); + + (stream.boxed(), join_checksums) } } diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs index b184fc65..a9f00423 100644 --- a/src/api/common/signature/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -32,7 +32,7 @@ pub type Md5Checksum = [u8; 16]; pub type Sha1Checksum = [u8; 20]; pub type Sha256Checksum = [u8; 32]; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ExpectedChecksums { // base64-encoded md5 (content-md5 header) pub md5: Option<String>, @@ -70,25 +70,35 @@ impl Checksummer { } } - pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self { let mut ret = Self::new(); + ret.add_expected(expected); + if add_md5 { + ret.add_md5(); + } + ret + } - if expected.md5.is_some() || require_md5 { - ret.md5 = Some(Md5::new()); + pub fn add_md5(&mut self) { + self.md5 = Some(Md5::new()); + } + + pub fn add_expected(&mut self, expected: &ExpectedChecksums) { + if expected.md5.is_some() { + self.md5 = Some(Md5::new()); } if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { - ret.sha256 = Some(Sha256::new()); + self.sha256 = Some(Sha256::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { - ret.crc32 = Some(Crc32::new()); + self.crc32 = Some(Crc32::new()); } if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { - ret.crc32c = Some(Crc32c::default()); + self.crc32c = Some(Crc32c::default()); } if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { - ret.sha1 = Some(Sha1::new()); + self.sha1 = Some(Sha1::new()); } - ret } pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index e8f9b3d7..3ffc5b2f 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -25,15 +25,11 @@ pub fn parse_streaming_body( service: &str, ) -> Result<Request<ReqBody>, Error> { let expected_checksums = ExpectedChecksums { - md5: match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, - }, sha256: match &checked_signature.content_sha256_header { ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), _ => None, }, - extra: None, + ..Default::default() }; let mut checksummer = Checksummer::init(&expected_checksums, false); |