From 4d501b6947497d7705c64dcaf093e5e5a09a9235 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 11:35:39 +0100 Subject: Compile streams --- src/mail/mailbox.rs | 2 +- src/mail/query.rs | 68 ++++++++++++++++++++++++----------------------------- 2 files changed, 32 insertions(+), 38 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c20d815..9190883 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou) { /// The metadata of a message that is stored in K2V /// at pk = mail/, sk = -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MailMeta { /// INTERNALDATE field (milliseconds since epoch) pub internaldate: u64, diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..cbc5fe6 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{Stream, StreamExt, BoxStream}; +use futures::future::FutureExt; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,57 @@ impl QueryScope { } } +//type QueryResultStream = Box>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result> { + pub fn fetch(&self) -> BoxStream> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { + async move { + let maybe_meta_list: Result> = self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + ) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::>(); - - Ok(result) + futures::stream::iter(list_res) + }.flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { + fn full<'d>(&'d self) -> impl Stream> + 'd + Send { + self.partial() + .then(move |maybe_meta| async move { + let meta = maybe_meta?; + let content = self .frozen .mailbox .fetch_full( *meta.uuid(), &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) .await?; Ok(meta.into_full(content).expect("meta to be PartialResult")) }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::, _>>() } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, -- cgit v1.2.3