aboutsummaryrefslogtreecommitdiff
path: root/src/imap
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-02-22 11:35:39 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-02-22 11:35:39 +0100
commit4d501b6947497d7705c64dcaf093e5e5a09a9235 (patch)
tree841c86683256802e928bf6ca162b2ee18bb348f1 /src/imap
parentde5717a020ed741ef206ebb649bac2cd678572fa (diff)
downloadaerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.tar.gz
aerogramme-4d501b6947497d7705c64dcaf093e5e5a09a9235.zip
Compile streams
Diffstat (limited to 'src/imap')
-rw-r--r--src/imap/mailbox_view.rs125
-rw-r--r--src/imap/search.rs21
2 files changed, 66 insertions, 80 deletions
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index d57e9a3..5055c40 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Error, Result};
-use futures::stream::{FuturesOrdered, StreamExt};
+use futures::stream::{StreamExt, TryStreamExt};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MessageDataItem;
@@ -362,46 +362,33 @@ impl MailboxView {
.iter()
.map(|midx| midx.uuid)
.collect::<Vec<_>>();
- let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
- // [3/6] Derive an IMAP-specific view from the results, apply the filters
- let views = query_result
- .iter()
- .zip(mail_idx_list.into_iter())
- .map(|(qr, midx)| MailView::new(qr, midx))
- .collect::<Result<Vec<_>, _>>()?;
-
- // [4/6] Apply the IMAP transformation, bubble up any error
- // We get 2 results:
- // - The one we send to the client
- // - The \Seen flags we must set internally
- let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views
- .iter()
- .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body)))
- .collect::<Result<Vec<_>, _>>()?
- .into_iter()
- .unzip();
+ let query = self.internal.query(&uuids, query_scope);
+ //let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
- // [5/6] Register the \Seen flags
- flag_mgmt
- .iter()
- .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd))
- .map(|(mv, _seen)| async move {
- let seen_flag = Flag::Seen.to_string();
- self.internal
- .mailbox
- .add_flags(*mv.query_result.uuid(), &[seen_flag])
- .await?;
- Ok::<_, anyhow::Error>(())
+ let query_stream = query
+ .fetch()
+ .zip(futures::stream::iter(mail_idx_list))
+ // [3/6] Derive an IMAP-specific view from the results, apply the filters
+ .map(|(maybe_qr, midx)| match maybe_qr {
+ Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)),
+ Err(e) => Err(e),
})
- .collect::<FuturesOrdered<_>>()
- .collect::<Vec<_>>()
- .await
- .into_iter()
- .collect::<Result<_, _>>()?;
+ // [4/6] Apply the IMAP transformation
+ .then(|maybe_ret| async move {
+ let ((body, seen), midx) = maybe_ret?;
+
+ // [5/6] Register the \Seen flags
+ if matches!(seen, SeenFlag::MustAdd) {
+ let seen_flag = Flag::Seen.to_string();
+ self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?;
+ }
+
+ Ok::<_, anyhow::Error>(body)
+ });
// [6/6] Build the final result that will be sent to the client.
- Ok(imap_ret)
+ query_stream.try_collect().await
}
/// A naive search implementation...
@@ -423,39 +410,55 @@ impl MailboxView {
// 3. Filter the selection based on the ID / UID / Flags
let (kept_idx, to_fetch) = crit.filter_on_idx(&selection);
- // 4. Fetch additional info about the emails
+ // 4.a Fetch additional info about the emails
let query_scope = crit.query_scope();
let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>();
- let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
+ let query = self.internal.query(&uuids, query_scope);
+
+ // 4.b We don't want to keep all data in memory, so we do the computing in a stream
+ let query_stream = query
+ .fetch()
+ .zip(futures::stream::iter(&to_fetch))
+ // 5.a Build a mailview with the body, might fail with an error
+ // 5.b If needed, filter the selection based on the body, but keep the errors
+ // 6. Drop the query+mailbox, keep only the mail index
+ // Here we release a lot of memory, this is the most important part ^^
+ .filter_map(|(maybe_qr, midx)| {
+ let r = match maybe_qr {
+ Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) {
+ Ok(true) => Some(Ok(*midx)),
+ Ok(_) => None,
+ Err(e) => Some(Err(e)),
+ }
+ Err(e) => Some(Err(e)),
+ };
+ futures::future::ready(r)
+ });
- // 5. If needed, filter the selection based on the body
- let kept_query = crit.filter_on_query(&to_fetch, &query_result)?;
- // 6. Format the result according to the client's taste:
- // either return UID or ID.
- let final_selection = kept_idx.iter().chain(kept_query.iter());
- let selection_fmt = match uid {
- true => final_selection.map(|in_idx| in_idx.uid).collect(),
- _ => final_selection.map(|in_idx| in_idx.i).collect(),
- };
+ // 7. Chain both streams (part resolved from index, part resolved from metadata+body)
+ let main_stream = futures::stream::iter(kept_idx)
+ .map(Ok)
+ .chain(query_stream)
+ .map_ok(|idx| match uid {
+ true => (idx.uid, idx.modseq),
+ _ => (idx.i, idx.modseq),
+ });
- // 7. Add the modseq entry if needed
- let is_modseq = crit.is_modseq();
- let maybe_modseq = match is_modseq {
- true => {
- let final_selection = kept_idx.iter().chain(kept_query.iter());
- final_selection
- .map(|in_idx| in_idx.modseq)
- .max()
- .map(|r| NonZeroU64::try_from(r))
- .transpose()?
- }
+ // 8. Do the actual computation
+ let internal_result: Vec<_> = main_stream.try_collect().await?;
+ let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip();
+
+ // 9. Aggregate the maximum modseq value
+ let maybe_modseq = match crit.is_modseq() {
+ true => modseqs.into_iter().max(),
_ => None,
};
+ // 10. Return the final result
Ok((
- vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
- is_modseq,
+ vec![Body::Data(Data::Search(selection, maybe_modseq))],
+ maybe_modseq.is_some(),
))
}
@@ -678,7 +681,7 @@ mod tests {
content: rfc822.to_vec(),
};
- let mv = MailView::new(&qr, &mail_in_idx)?;
+ let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?;
let (res_body, _seen) = mv.filter(&ap)?;
let fattr = match res_body {
diff --git a/src/imap/search.rs b/src/imap/search.rs
index d06c3bd..4ff70ee 100644
--- a/src/imap/search.rs
+++ b/src/imap/search.rs
@@ -1,13 +1,12 @@
use std::num::{NonZeroU32, NonZeroU64};
-use anyhow::Result;
use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex;
use crate::imap::mail_view::MailView;
-use crate::mail::query::{QueryResult, QueryScope};
+use crate::mail::query::QueryScope;
pub enum SeqType {
Undefined,
@@ -145,22 +144,6 @@ impl<'a> Criteria<'a> {
(to_keep, to_fetch)
}
- pub fn filter_on_query<'b>(
- &self,
- midx_list: &[&'b MailIndex<'b>],
- query_result: &'b Vec<QueryResult>,
- ) -> Result<Vec<&'b MailIndex<'b>>> {
- Ok(midx_list
- .iter()
- .zip(query_result.iter())
- .map(|(midx, qr)| MailView::new(qr, midx))
- .collect::<Result<Vec<_>, _>>()?
- .into_iter()
- .filter(|mail_view| self.is_keep_on_query(mail_view))
- .map(|mail_view| mail_view.in_idx)
- .collect())
- }
-
// ----
/// Here we are doing a partial filtering: we do not have access
@@ -213,7 +196,7 @@ impl<'a> Criteria<'a> {
/// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true,
/// we could simplify the request to just body(x) and truncate the first OR. Today, we are
/// not doing that, and thus we reevaluate everything.
- fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
+ pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
use SearchKey::*;
match self.0 {
// Combinator logic