diff options
Diffstat (limited to 'src/mail/query.rs')
-rw-r--r-- | src/mail/query.rs | 91 |
1 files changed, 45 insertions, 46 deletions
diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..3e6fe99 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,62 @@ impl QueryScope { } } +//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result<Vec<QueryResult>> { + pub fn fetch(&self) -> BoxStream<Result<QueryResult>> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + async move { + let maybe_meta_list: Result<Vec<MailMeta>> = + self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result<Vec<QueryResult>> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::<Vec<_>>(); - - Ok(result) + futures::stream::iter(list_res) + } + .flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result<Vec<QueryResult>> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { - let content = self - .frozen - .mailbox - .fetch_full( - *meta.uuid(), - &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) - .await?; + fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; - Ok(meta.into_full(content).expect("meta to be PartialResult")) - }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<Vec<_>, _>>() + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, |