diff options
author | Quentin <quentin@dufour.io> | 2024-02-23 17:32:38 +0000 |
---|---|---|
committer | Quentin <quentin@dufour.io> | 2024-02-23 17:32:38 +0000 |
commit | d92ae5220cdaddf941da5a216fbd2c3549ccdec3 (patch) | |
tree | 464814294b6e50811e0c667cd1430c6855006f8a /src/mail/query.rs | |
parent | 0bb7cdf696190200d1885ec822518ac45b685a9b (diff) | |
parent | 1ea3de30995b434c4e59123c1ab634d89a0274b5 (diff) | |
download | aerogramme-d92ae5220cdaddf941da5a216fbd2c3549ccdec3.tar.gz aerogramme-d92ae5220cdaddf941da5a216fbd2c3549ccdec3.zip |
Merge pull request 'Perf measurement & bottleneck fix' (#102) from perf/cpu-ram-bottleneck into main0.2.2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/102
Diffstat (limited to 'src/mail/query.rs')
-rw-r--r-- | src/mail/query.rs | 91 |
1 files changed, 45 insertions, 46 deletions
diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..3e6fe99 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,62 @@ impl QueryScope { } } +//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result<Vec<QueryResult>> { + pub fn fetch(&self) -> BoxStream<Result<QueryResult>> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + async move { + let maybe_meta_list: Result<Vec<MailMeta>> = + self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result<Vec<QueryResult>> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::<Vec<_>>(); - - Ok(result) + futures::stream::iter(list_res) + } + .flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result<Vec<QueryResult>> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { - let content = self - .frozen - .mailbox - .fetch_full( - *meta.uuid(), - &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) - .await?; + fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; - Ok(meta.into_full(content).expect("meta to be PartialResult")) - }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<Vec<_>, _>>() + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, |