diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-02-22 11:35:39 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-02-22 11:35:39 +0100 |
commit | 4d501b6947497d7705c64dcaf093e5e5a09a9235 (patch) | |
tree | 841c86683256802e928bf6ca162b2ee18bb348f1 /src/imap | |
parent | de5717a020ed741ef206ebb649bac2cd678572fa (diff) | |
download | aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.tar.gz aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.zip |
Compile streams
Diffstat (limited to 'src/imap')
-rw-r--r-- | src/imap/mailbox_view.rs | 125 | ||||
-rw-r--r-- | src/imap/search.rs | 21 |
2 files changed, 66 insertions, 80 deletions
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index d57e9a3..5055c40 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Error, Result}; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{StreamExt, TryStreamExt}; use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::fetch::MessageDataItem; @@ -362,46 +362,33 @@ impl MailboxView { .iter() .map(|midx| midx.uuid) .collect::<Vec<_>>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [3/6] Derive an IMAP-specific view from the results, apply the filters - let views = query_result - .iter() - .zip(mail_idx_list.into_iter()) - .map(|(qr, midx)| MailView::new(qr, midx)) - .collect::<Result<Vec<_>, _>>()?; - - // [4/6] Apply the IMAP transformation, bubble up any error - // We get 2 results: - // - The one we send to the client - // - The \Seen flags we must set internally - let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views - .iter() - .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body))) - .collect::<Result<Vec<_>, _>>()? - .into_iter() - .unzip(); + let query = self.internal.query(&uuids, query_scope); + //let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [5/6] Register the \Seen flags - flag_mgmt - .iter() - .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _seen)| async move { - let seen_flag = Flag::Seen.to_string(); - self.internal - .mailbox - .add_flags(*mv.query_result.uuid(), &[seen_flag]) - .await?; - Ok::<_, anyhow::Error>(()) + let query_stream = query + .fetch() + .zip(futures::stream::iter(mail_idx_list)) + // [3/6] Derive an IMAP-specific view from the results, apply the filters + .map(|(maybe_qr, midx)| match maybe_qr { + Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), + Err(e) => Err(e), }) - .collect::<FuturesOrdered<_>>() - .collect::<Vec<_>>() - .await - .into_iter() - .collect::<Result<_, _>>()?; + // [4/6] Apply the IMAP transformation + .then(|maybe_ret| async move { + let ((body, seen), midx) = maybe_ret?; + + // [5/6] Register the \Seen flags + if matches!(seen, SeenFlag::MustAdd) { + let seen_flag = Flag::Seen.to_string(); + self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?; + } + + Ok::<_, anyhow::Error>(body) + }); // [6/6] Build the final result that will be sent to the client. - Ok(imap_ret) + query_stream.try_collect().await } /// A naive search implementation... @@ -423,39 +410,55 @@ impl MailboxView { // 3. Filter the selection based on the ID / UID / Flags let (kept_idx, to_fetch) = crit.filter_on_idx(&selection); - // 4. Fetch additional info about the emails + // 4.a Fetch additional info about the emails let query_scope = crit.query_scope(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; + let query = self.internal.query(&uuids, query_scope); + + // 4.b We don't want to keep all data in memory, so we do the computing in a stream + let query_stream = query + .fetch() + .zip(futures::stream::iter(&to_fetch)) + // 5.a Build a mailview with the body, might fail with an error + // 5.b If needed, filter the selection based on the body, but keep the errors + // 6. Drop the query+mailbox, keep only the mail index + // Here we release a lot of memory, this is the most important part ^^ + .filter_map(|(maybe_qr, midx)| { + let r = match maybe_qr { + Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) { + Ok(true) => Some(Ok(*midx)), + Ok(_) => None, + Err(e) => Some(Err(e)), + } + Err(e) => Some(Err(e)), + }; + futures::future::ready(r) + }); - // 5. If needed, filter the selection based on the body - let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; - // 6. Format the result according to the client's taste: - // either return UID or ID. - let final_selection = kept_idx.iter().chain(kept_query.iter()); - let selection_fmt = match uid { - true => final_selection.map(|in_idx| in_idx.uid).collect(), - _ => final_selection.map(|in_idx| in_idx.i).collect(), - }; + // 7. Chain both streams (part resolved from index, part resolved from metadata+body) + let main_stream = futures::stream::iter(kept_idx) + .map(Ok) + .chain(query_stream) + .map_ok(|idx| match uid { + true => (idx.uid, idx.modseq), + _ => (idx.i, idx.modseq), + }); - // 7. Add the modseq entry if needed - let is_modseq = crit.is_modseq(); - let maybe_modseq = match is_modseq { - true => { - let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection - .map(|in_idx| in_idx.modseq) - .max() - .map(|r| NonZeroU64::try_from(r)) - .transpose()? - } + // 8. Do the actual computation + let internal_result: Vec<_> = main_stream.try_collect().await?; + let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip(); + + // 9. Aggregate the maximum modseq value + let maybe_modseq = match crit.is_modseq() { + true => modseqs.into_iter().max(), _ => None, }; + // 10. Return the final result Ok(( - vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], - is_modseq, + vec![Body::Data(Data::Search(selection, maybe_modseq))], + maybe_modseq.is_some(), )) } @@ -678,7 +681,7 @@ mod tests { content: rfc822.to_vec(), }; - let mv = MailView::new(&qr, &mail_in_idx)?; + let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?; let (res_body, _seen) = mv.filter(&ap)?; let fattr = match res_body { diff --git a/src/imap/search.rs b/src/imap/search.rs index d06c3bd..4ff70ee 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,13 +1,12 @@ use std::num::{NonZeroU32, NonZeroU64}; -use anyhow::Result; use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use crate::imap::index::MailIndex; use crate::imap::mail_view::MailView; -use crate::mail::query::{QueryResult, QueryScope}; +use crate::mail::query::QueryScope; pub enum SeqType { Undefined, @@ -145,22 +144,6 @@ impl<'a> Criteria<'a> { (to_keep, to_fetch) } - pub fn filter_on_query<'b>( - &self, - midx_list: &[&'b MailIndex<'b>], - query_result: &'b Vec<QueryResult>, - ) -> Result<Vec<&'b MailIndex<'b>>> { - Ok(midx_list - .iter() - .zip(query_result.iter()) - .map(|(midx, qr)| MailView::new(qr, midx)) - .collect::<Result<Vec<_>, _>>()? - .into_iter() - .filter(|mail_view| self.is_keep_on_query(mail_view)) - .map(|mail_view| mail_view.in_idx) - .collect()) - } - // ---- /// Here we are doing a partial filtering: we do not have access @@ -213,7 +196,7 @@ impl<'a> Criteria<'a> { /// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true, /// we could simplify the request to just body(x) and truncate the first OR. Today, we are /// not doing that, and thus we reevaluate everything. - fn is_keep_on_query(&self, mail_view: &MailView) -> bool { + pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool { use SearchKey::*; match self.0 { // Combinator logic |