aboutsummaryrefslogtreecommitdiff
path: root/src/api/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/common')
-rw-r--r--src/api/common/signature/body.rs112
-rw-r--r--src/api/common/signature/checksum.rs28
-rw-r--r--src/api/common/signature/streaming.rs6
3 files changed, 110 insertions, 36 deletions
diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs
index 877d8d85..d8c15ee5 100644
--- a/src/api/common/signature/body.rs
+++ b/src/api/common/signature/body.rs
@@ -5,7 +5,13 @@ use futures::stream::BoxStream;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use serde::Deserialize;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::mpsc;
+use tokio::task;
+
+use opentelemetry::{
+ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
+ Context,
+};
use super::*;
@@ -13,14 +19,33 @@ use crate::signature::checksum::*;
pub struct ReqBody {
// why need mutex to be sync??
- pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
- pub checksummer: Checksummer,
- pub expected_checksums: ExpectedChecksums,
+ pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
+ pub(crate) checksummer: Checksummer,
+ pub(crate) expected_checksums: ExpectedChecksums,
}
-pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
+pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>;
impl ReqBody {
+ pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) {
+ if more.md5.is_some() {
+ self.expected_checksums.md5 = more.md5;
+ }
+ if more.sha256.is_some() {
+ self.expected_checksums.sha256 = more.sha256;
+ }
+ if more.extra.is_some() {
+ self.expected_checksums.extra = more.extra;
+ }
+ self.checksummer.add_expected(&self.expected_checksums);
+ }
+
+ pub fn add_md5(&mut self) {
+ self.checksummer.add_md5();
+ }
+
+ // ============ non-streaming =============
+
pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
let body = self.collect().await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
@@ -42,28 +67,71 @@ impl ReqBody {
Ok((bytes, checksums))
}
- pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
- self.streaming_with_checksums(false).0
- }
+ // ============ streaming =============
pub fn streaming_with_checksums(
self,
- add_md5: bool,
) -> (
- impl Stream<Item = Result<Bytes, Error>>,
+ BoxStream<'static, Result<Bytes, Error>>,
StreamingChecksumReceiver,
) {
- let (tx, rx) = oneshot::channel();
- // TODO: actually calculate checksums!!
- let stream: BoxStream<_> = self.stream.into_inner().unwrap();
- (
- stream.map(|x| {
- x.and_then(|f| {
- f.into_data()
- .map_err(|_| Error::bad_request("non-data frame"))
- })
- }),
- rx,
- )
+ let Self {
+ stream,
+ mut checksummer,
+ mut expected_checksums,
+ } = self;
+
+ let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1);
+
+ let join_checksums = tokio::spawn(async move {
+ let tracer = opentelemetry::global::tracer("garage");
+
+ while let Some(frame) = frame_rx.recv().await {
+ match frame.into_data() {
+ Ok(data) => {
+ checksummer = tokio::task::spawn_blocking(move || {
+ checksummer.update(&data);
+ checksummer
+ })
+ .await
+ .unwrap()
+ }
+ Err(frame) => {
+ let trailers = frame.into_trailers().unwrap();
+ if let Some(cv) = request_checksum_value(&trailers)? {
+ expected_checksums.extra = Some(cv);
+ }
+ break;
+ }
+ }
+ }
+
+ let checksums = checksummer.finalize();
+ checksums.verify(&expected_checksums)?;
+
+ return Ok(checksums);
+ });
+
+ let stream: BoxStream<_> = stream.into_inner().unwrap();
+ let stream = stream.filter_map(move |x| {
+ let frame_tx = frame_tx.clone();
+ async move {
+ match x {
+ Err(e) => Some(Err(e)),
+ Ok(frame) => {
+ if frame.is_data() {
+ let data = frame.data_ref().unwrap().clone();
+ let _ = frame_tx.send(frame).await;
+ Some(Ok(data))
+ } else {
+ let _ = frame_tx.send(frame).await;
+ None
+ }
+ }
+ }
+ }
+ });
+
+ (stream.boxed(), join_checksums)
}
}
diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs
index b184fc65..a9f00423 100644
--- a/src/api/common/signature/checksum.rs
+++ b/src/api/common/signature/checksum.rs
@@ -32,7 +32,7 @@ pub type Md5Checksum = [u8; 16];
pub type Sha1Checksum = [u8; 20];
pub type Sha256Checksum = [u8; 32];
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
pub struct ExpectedChecksums {
// base64-encoded md5 (content-md5 header)
pub md5: Option<String>,
@@ -70,25 +70,35 @@ impl Checksummer {
}
}
- pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
+ pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self {
let mut ret = Self::new();
+ ret.add_expected(expected);
+ if add_md5 {
+ ret.add_md5();
+ }
+ ret
+ }
- if expected.md5.is_some() || require_md5 {
- ret.md5 = Some(Md5::new());
+ pub fn add_md5(&mut self) {
+ self.md5 = Some(Md5::new());
+ }
+
+ pub fn add_expected(&mut self, expected: &ExpectedChecksums) {
+ if expected.md5.is_some() {
+ self.md5 = Some(Md5::new());
}
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
- ret.sha256 = Some(Sha256::new());
+ self.sha256 = Some(Sha256::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
- ret.crc32 = Some(Crc32::new());
+ self.crc32 = Some(Crc32::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
- ret.crc32c = Some(Crc32c::default());
+ self.crc32c = Some(Crc32c::default());
}
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
- ret.sha1 = Some(Sha1::new());
+ self.sha1 = Some(Sha1::new());
}
- ret
}
pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs
index e8f9b3d7..3ffc5b2f 100644
--- a/src/api/common/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
@@ -25,15 +25,11 @@ pub fn parse_streaming_body(
service: &str,
) -> Result<Request<ReqBody>, Error> {
let expected_checksums = ExpectedChecksums {
- md5: match req.headers().get("content-md5") {
- Some(x) => Some(x.to_str()?.to_string()),
- None => None,
- },
sha256: match &checked_signature.content_sha256_header {
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
_ => None,
},
- extra: None,
+ ..Default::default()
};
let mut checksummer = Checksummer::init(&expected_checksums, false);