diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-02-22 11:35:39 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-02-22 11:35:39 +0100 |
commit | 4d501b6947497d7705c64dcaf093e5e5a09a9235 (patch) | |
tree | 841c86683256802e928bf6ca162b2ee18bb348f1 /src/mail | |
parent | de5717a020ed741ef206ebb649bac2cd678572fa (diff) | |
download | aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.tar.gz aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.zip |
Compile streams
Diffstat (limited to 'src/mail')
-rw-r--r-- | src/mail/mailbox.rs | 2 | ||||
-rw-r--r-- | src/mail/query.rs | 68 |
2 files changed, 32 insertions, 38 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c20d815..9190883 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou<UidIndex>) { /// The metadata of a message that is stored in K2V /// at pk = mail/<mailbox uuid>, sk = <message uuid> -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MailMeta { /// INTERNALDATE field (milliseconds since epoch) pub internaldate: u64, diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..cbc5fe6 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{Stream, StreamExt, BoxStream}; +use futures::future::FutureExt; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,57 @@ impl QueryScope { } } +//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result<Vec<QueryResult>> { + pub fn fetch(&self) -> BoxStream<Result<QueryResult>> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + async move { + let maybe_meta_list: Result<Vec<MailMeta>> = self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + ) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result<Vec<QueryResult>> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::<Vec<_>>(); - - Ok(result) + futures::stream::iter(list_res) + }.flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result<Vec<QueryResult>> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { + fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send { + self.partial() + .then(move |maybe_meta| async move { + let meta = maybe_meta?; + let content = self .frozen .mailbox .fetch_full( *meta.uuid(), &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) .await?; Ok(meta.into_full(content).expect("meta to be PartialResult")) }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<Vec<_>, _>>() } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, |