From 4d501b6947497d7705c64dcaf093e5e5a09a9235 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 11:35:39 +0100 Subject: Compile streams --- src/imap/mailbox_view.rs | 125 ++++++++++++++++++++++++----------------------- 1 file changed, 64 insertions(+), 61 deletions(-) (limited to 'src/imap/mailbox_view.rs') diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index d57e9a3..5055c40 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Error, Result}; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{StreamExt, TryStreamExt}; use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::fetch::MessageDataItem; @@ -362,46 +362,33 @@ impl MailboxView { .iter() .map(|midx| midx.uuid) .collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [3/6] Derive an IMAP-specific view from the results, apply the filters - let views = query_result - .iter() - .zip(mail_idx_list.into_iter()) - .map(|(qr, midx)| MailView::new(qr, midx)) - .collect::, _>>()?; - - // [4/6] Apply the IMAP transformation, bubble up any error - // We get 2 results: - // - The one we send to the client - // - The \Seen flags we must set internally - let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views - .iter() - .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body))) - .collect::, _>>()? - .into_iter() - .unzip(); + let query = self.internal.query(&uuids, query_scope); + //let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [5/6] Register the \Seen flags - flag_mgmt - .iter() - .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _seen)| async move { - let seen_flag = Flag::Seen.to_string(); - self.internal - .mailbox - .add_flags(*mv.query_result.uuid(), &[seen_flag]) - .await?; - Ok::<_, anyhow::Error>(()) + let query_stream = query + .fetch() + .zip(futures::stream::iter(mail_idx_list)) + // [3/6] Derive an IMAP-specific view from the results, apply the filters + .map(|(maybe_qr, midx)| match maybe_qr { + Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), + Err(e) => Err(e), }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::>()?; + // [4/6] Apply the IMAP transformation + .then(|maybe_ret| async move { + let ((body, seen), midx) = maybe_ret?; + + // [5/6] Register the \Seen flags + if matches!(seen, SeenFlag::MustAdd) { + let seen_flag = Flag::Seen.to_string(); + self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?; + } + + Ok::<_, anyhow::Error>(body) + }); // [6/6] Build the final result that will be sent to the client. - Ok(imap_ret) + query_stream.try_collect().await } /// A naive search implementation... @@ -423,39 +410,55 @@ impl MailboxView { // 3. Filter the selection based on the ID / UID / Flags let (kept_idx, to_fetch) = crit.filter_on_idx(&selection); - // 4. Fetch additional info about the emails + // 4.a Fetch additional info about the emails let query_scope = crit.query_scope(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; + let query = self.internal.query(&uuids, query_scope); + + // 4.b We don't want to keep all data in memory, so we do the computing in a stream + let query_stream = query + .fetch() + .zip(futures::stream::iter(&to_fetch)) + // 5.a Build a mailview with the body, might fail with an error + // 5.b If needed, filter the selection based on the body, but keep the errors + // 6. Drop the query+mailbox, keep only the mail index + // Here we release a lot of memory, this is the most important part ^^ + .filter_map(|(maybe_qr, midx)| { + let r = match maybe_qr { + Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) { + Ok(true) => Some(Ok(*midx)), + Ok(_) => None, + Err(e) => Some(Err(e)), + } + Err(e) => Some(Err(e)), + }; + futures::future::ready(r) + }); - // 5. If needed, filter the selection based on the body - let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; - // 6. Format the result according to the client's taste: - // either return UID or ID. - let final_selection = kept_idx.iter().chain(kept_query.iter()); - let selection_fmt = match uid { - true => final_selection.map(|in_idx| in_idx.uid).collect(), - _ => final_selection.map(|in_idx| in_idx.i).collect(), - }; + // 7. Chain both streams (part resolved from index, part resolved from metadata+body) + let main_stream = futures::stream::iter(kept_idx) + .map(Ok) + .chain(query_stream) + .map_ok(|idx| match uid { + true => (idx.uid, idx.modseq), + _ => (idx.i, idx.modseq), + }); - // 7. Add the modseq entry if needed - let is_modseq = crit.is_modseq(); - let maybe_modseq = match is_modseq { - true => { - let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection - .map(|in_idx| in_idx.modseq) - .max() - .map(|r| NonZeroU64::try_from(r)) - .transpose()? - } + // 8. Do the actual computation + let internal_result: Vec<_> = main_stream.try_collect().await?; + let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip(); + + // 9. Aggregate the maximum modseq value + let maybe_modseq = match crit.is_modseq() { + true => modseqs.into_iter().max(), _ => None, }; + // 10. Return the final result Ok(( - vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], - is_modseq, + vec![Body::Data(Data::Search(selection, maybe_modseq))], + maybe_modseq.is_some(), )) } @@ -678,7 +681,7 @@ mod tests { content: rfc822.to_vec(), }; - let mv = MailView::new(&qr, &mail_in_idx)?; + let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?; let (res_body, _seen) = mv.filter(&ap)?; let fattr = match res_body { -- cgit v1.2.3