From 4806f7ff84c595ec6647744577388fe4fab33736 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 5 Jan 2024 18:59:19 +0100 Subject: WIP rewrite with a query manager --- src/imap/command/examined.rs | 2 +- src/imap/command/selected.rs | 2 +- src/imap/mail_view.rs | 26 +++++-- src/imap/mailbox_view.rs | 180 +++++++++++++++++-------------------------- src/mail/mailbox.rs | 2 +- src/mail/query.rs | 114 +++++++++++++++++++++------ src/mail/snapshot.rs | 10 +++ 7 files changed, 193 insertions(+), 143 deletions(-) diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs index 0d688c0..ec16973 100644 --- a/src/imap/command/examined.rs +++ b/src/imap/command/examined.rs @@ -125,7 +125,7 @@ impl<'a> ExaminedContext<'a> { } pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { - self.mailbox.mailbox.force_sync().await?; + self.mailbox.0.mailbox.force_sync().await?; let updates = self.mailbox.update().await?; Ok(( diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 933f397..35c3eb4 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -152,7 +152,7 @@ impl<'a> SelectedContext<'a> { } pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { - self.mailbox.mailbox.force_sync().await?; + self.mailbox.0.mailbox.force_sync().await?; let updates = self.mailbox.update().await?; Ok(( diff --git a/src/imap/mail_view.rs b/src/imap/mail_view.rs index c95c733..94215dc 100644 --- a/src/imap/mail_view.rs +++ b/src/imap/mail_view.rs @@ -1,6 +1,6 @@ use std::num::NonZeroU32; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Result, Context}; use chrono::{Offset, TimeZone, Utc}; use imap_codec::imap_types::core::{IString, NString}; @@ -22,16 +22,31 @@ use crate::imap::imf_view::message_envelope; use crate::imap::mailbox_view::MailIdentifiers; use crate::imap::mime_view; use crate::imap::response::Body; -use crate::mail::mailbox::MailMeta; +use crate::mail::query::QueryResult; pub struct MailView<'a> { - pub ids: &'a MailIdentifiers, - pub meta: &'a MailMeta, - pub flags: &'a Vec, + pub query_result: &'a QueryResult<'a>, pub content: FetchedMail<'a>, } impl<'a> MailView<'a> { + pub fn new(query_result: &'a QueryResult<'a>) -> Result { + Ok(Self { + query_result, + content: match query_result { + QueryResult::FullResult { content, .. } => { + let (_, parsed) = eml_codec::parse_message(content).context("Invalid mail body")?; + FetchedMail::new_from_message(parsed) + }, + QueryResult::PartialResult { metadata, .. } => { + let (_, parsed) = eml_codec::parse_imf(&metadata.headers).context("Invalid mail headers")?; + FetchedMail::Partial(parsed) + } + QueryResult::IndexResult { .. } => FetchedMail::None, + } + }) + } + fn uid(&self) -> MessageDataItem<'static> { MessageDataItem::Uid(self.ids.uid.clone()) } @@ -193,6 +208,7 @@ pub enum SeenFlag { // ------------------- pub enum FetchedMail<'a> { + None, Partial(imf::Imf<'a>), Full(AnyPart<'a>), } diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 6db1bd2..9cc72c1 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -12,15 +12,18 @@ use imap_codec::imap_types::response::{Code, Data, Status}; use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::sequence::{self, SequenceSet}; +use crate::mail::mailbox::Mailbox; +use crate::mail::snapshot::FrozenMailbox; +use crate::mail::query::QueryScope; +use crate::mail::uidindex::{ImapUid, ImapUidvalidity}; +use crate::mail::unique_ident::UniqueIdent; + use crate::imap::attributes::AttributesProxy; use crate::imap::flags; -use crate::imap::mail_view::SeenFlag; +use crate::imap::mail_view::{MailView, SeenFlag}; use crate::imap::response::Body; use crate::imap::search; -use crate::imap::selectors::MailSelectionBuilder; -use crate::mail::mailbox::Mailbox; -use crate::mail::uidindex::{ImapUid, ImapUidvalidity, UidIndex}; -use crate::mail::unique_ident::UniqueIdent; + const DEFAULT_FLAGS: [Flag; 5] = [ Flag::Seen, @@ -37,20 +40,12 @@ const DEFAULT_FLAGS: [Flag; 5] = [ /// To do this, it keeps a variable `known_state` that corresponds to /// what the client knows, and produces IMAP messages to be sent to the /// client that go along updates to `known_state`. -pub struct MailboxView { - pub(crate) mailbox: Arc, - known_state: UidIndex, -} +pub struct MailboxView (pub FrozenMailbox); impl MailboxView { /// Creates a new IMAP view into a mailbox. pub async fn new(mailbox: Arc) -> Self { - let state = mailbox.current_uid_index().await; - - Self { - mailbox, - known_state: state, - } + Self(mailbox.frozen().await) } /// Create an updated view, useful to make a diff @@ -60,11 +55,8 @@ impl MailboxView { /// This does NOT trigger a sync, it bases itself on what is currently /// loaded in RAM by Bayou. pub async fn update(&mut self) -> Result>> { - let old_view: &mut Self = self; - let new_view = Self { - mailbox: old_view.mailbox.clone(), - known_state: old_view.mailbox.current_uid_index().await, - }; + let old_snapshot = self.0.update().await; + let new_snapshot = &self.0.snapshot; let mut data = Vec::::new(); @@ -85,8 +77,8 @@ impl MailboxView { // - notify client of expunged mails let mut n_expunge = 0; - for (i, (_uid, uuid)) in old_view.known_state.idx_by_uid.iter().enumerate() { - if !new_view.known_state.table.contains_key(uuid) { + for (i, (_uid, uuid)) in old_snapshot.idx_by_uid.iter().enumerate() { + if !new_snapshot.table.contains_key(uuid) { data.push(Body::Data(Data::Expunge( NonZeroU32::try_from((i + 1 - n_expunge) as u32).unwrap(), ))); @@ -95,21 +87,21 @@ impl MailboxView { } // - if new mails arrived, notify client of number of existing mails - if new_view.known_state.table.len() != old_view.known_state.table.len() - n_expunge - || new_view.known_state.uidvalidity != old_view.known_state.uidvalidity + if new_snapshot.table.len() != old_snapshot.table.len() - n_expunge + || new_snapshot.uidvalidity != old_snapshot.uidvalidity { - data.push(new_view.exists_status()?); + data.push(self.exists_status()?); } - if new_view.known_state.uidvalidity != old_view.known_state.uidvalidity { + if new_snapshot.uidvalidity != old_snapshot.uidvalidity { // TODO: do we want to push less/more info than this? - data.push(new_view.uidvalidity_status()?); - data.push(new_view.uidnext_status()?); + data.push(self.uidvalidity_status()?); + data.push(self.uidnext_status()?); } else { // - if flags changed for existing mails, tell client - for (i, (_uid, uuid)) in new_view.known_state.idx_by_uid.iter().enumerate() { - let old_mail = old_view.known_state.table.get(uuid); - let new_mail = new_view.known_state.table.get(uuid); + for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() { + let old_mail = old_snapshot.table.get(uuid); + let new_mail = new_snapshot.table.get(uuid); if old_mail.is_some() && old_mail != new_mail { if let Some((uid, flags)) = new_mail { data.push(Body::Data(Data::Fetch { @@ -126,7 +118,6 @@ impl MailboxView { } } } - *old_view = new_view; Ok(data) } @@ -152,7 +143,7 @@ impl MailboxView { flags: &[Flag<'a>], is_uid_store: &bool, ) -> Result>> { - self.mailbox.opportunistic_sync().await?; + self.0.sync().await?; let flags = flags.iter().map(|x| x.to_string()).collect::>(); @@ -160,13 +151,13 @@ impl MailboxView { for mi in mails.iter() { match kind { StoreType::Add => { - self.mailbox.add_flags(mi.uuid, &flags[..]).await?; + self.0.mailbox.add_flags(mi.uuid, &flags[..]).await?; } StoreType::Remove => { - self.mailbox.del_flags(mi.uuid, &flags[..]).await?; + self.0.mailbox.del_flags(mi.uuid, &flags[..]).await?; } StoreType::Replace => { - self.mailbox.set_flags(mi.uuid, &flags[..]).await?; + self.0.mailbox.set_flags(mi.uuid, &flags[..]).await?; } } } @@ -176,10 +167,10 @@ impl MailboxView { } pub async fn expunge(&mut self) -> Result>> { - self.mailbox.opportunistic_sync().await?; + self.0.sync().await?; + let state = self.0.peek().await; let deleted_flag = Flag::Deleted.to_string(); - let state = self.mailbox.current_uid_index().await; let msgs = state .table .iter() @@ -187,7 +178,7 @@ impl MailboxView { .map(|(uuid, _)| *uuid); for msg in msgs { - self.mailbox.delete(msg).await?; + self.0.mailbox.delete(msg).await?; } self.update().await @@ -203,7 +194,7 @@ impl MailboxView { let mut new_uuids = vec![]; for mi in mails.iter() { - new_uuids.push(to.copy_from(&self.mailbox, mi.uuid).await?); + new_uuids.push(to.copy_from(&self.0.mailbox, mi.uuid).await?); } let mut ret = vec![]; @@ -229,7 +220,7 @@ impl MailboxView { let mails = self.get_mail_ids(sequence_set, *is_uid_copy)?; for mi in mails.iter() { - to.move_from(&self.mailbox, mi.uuid).await?; + to.move_from(&self.0.mailbox, mi.uuid).await?; } let mut ret = vec![]; @@ -256,82 +247,49 @@ impl MailboxView { attributes: &'b MacroOrMessageDataItemNames<'static>, is_uid_fetch: &bool, ) -> Result>> { + // [1/6] Pre-compute data + // a. what are the uuids of the emails we want? + // b. do we need to fetch the full body? let ap = AttributesProxy::new(attributes, *is_uid_fetch); - - // Prepare data + let query_scope = match ap.need_body() { + true => QueryScope::Full, + _ => QueryScope::Partial, + }; let mids = MailIdentifiersList(self.get_mail_ids(sequence_set, *is_uid_fetch)?); - let mail_count = mids.0.len(); let uuids = mids.uuids(); - let meta = self.mailbox.fetch_meta(&uuids).await?; - let flags = uuids - .iter() - .map(|uuid| { - self.known_state - .table - .get(uuid) - .map(|(_uuid, f)| f) - .ok_or(anyhow!("missing email from the flag table")) - }) - .collect::, _>>()?; - - // Start filling data to build the view - let mut selection = MailSelectionBuilder::new(ap.need_body(), mail_count); - selection - .with_mail_identifiers(&mids.0) - .with_metadata(&meta) - .with_flags(&flags); - // Asynchronously fetch full bodies (if needed) - let btc = selection.bodies_to_collect(); - let future_bodies = btc - .iter() - .map(|bi| async move { - let body = self.mailbox.fetch_full(*bi.msg_uuid, bi.msg_key).await?; - Ok::<_, anyhow::Error>(body) - }) - .collect::>(); - let bodies = future_bodies - .collect::>() - .await - .into_iter() + // [2/6] Fetch the emails + let query = self.0.query(&uuids, query_scope); + let query_result = query.fetch().await?; + + // [3/6] Derive an IMAP-specific view from the results, apply the filters + let views = query_result.iter() + .map(MailView::new) .collect::, _>>()?; - // Add bodies - selection.with_bodies(bodies.as_slice()); - - // Build mail selection views - let views = selection.build()?; - - // Filter views to build the result - // Also identify what must be put as seen - let filtered_view = views + // [4/6] Apply the IMAP transformation to keep only relevant fields + let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views .iter() - .filter_map(|mv| mv.filter(&ap).ok().map(|(body, seen)| (mv, body, seen))) - .collect::>(); + .filter_map(|mv| mv.filter(&ap).ok().map(|(body, seen)| ((mv, seen), body))) + .unzip(); - // Register seen flags - let future_flags = filtered_view + // [5/6] Register seen flags + flag_mgmt .iter() - .filter(|(_mv, _body, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _body, _seen)| async move { + .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) + .map(|(mv, _seen)| async move { let seen_flag = Flag::Seen.to_string(); - self.mailbox.add_flags(mv.ids.uuid, &[seen_flag]).await?; + self.0.mailbox.add_flags(*mv.query_result.uuid(), &[seen_flag]).await?; Ok::<_, anyhow::Error>(()) }) - .collect::>(); - - future_flags + .collect::>() .collect::>() .await .into_iter() .collect::>()?; - let command_body = filtered_view - .into_iter() - .map(|(_mv, body, _seen)| body) - .collect::>(); - - Ok(command_body) + // [6/6] Build the final result that will be sent to the client. + Ok(imap_ret) } /// A very naive search implementation... @@ -367,7 +325,8 @@ impl MailboxView { by_uid: bool, ) -> Result> { let mail_vec = self - .known_state + .0 + .snapshot .idx_by_uid .iter() .map(|(uid, uuid)| (*uid, *uuid)) @@ -439,7 +398,7 @@ impl MailboxView { } pub(crate) fn uidvalidity(&self) -> ImapUidvalidity { - self.known_state.uidvalidity + self.0.snapshot.uidvalidity } /// Produce an OK [UIDNEXT _] message corresponding to `known_state` @@ -454,7 +413,7 @@ impl MailboxView { } pub(crate) fn uidnext(&self) -> ImapUid { - self.known_state.uidnext + self.0.snapshot.uidnext } /// Produce an EXISTS message corresponding to the number of mails @@ -464,7 +423,7 @@ impl MailboxView { } pub(crate) fn exists(&self) -> Result { - Ok(u32::try_from(self.known_state.idx_by_uid.len())?) + Ok(u32::try_from(self.0.snapshot.idx_by_uid.len())?) } /// Produce a RECENT message corresponding to the number of @@ -475,7 +434,8 @@ impl MailboxView { pub(crate) fn recent(&self) -> Result { let recent = self - .known_state + .0 + .snapshot .idx_by_flag .get(&"\\Recent".to_string()) .map(|os| os.len()) @@ -490,8 +450,8 @@ impl MailboxView { // 1. Collecting all the possible flags in the mailbox // 1.a Fetch them from our index - let mut known_flags: Vec = self - .known_state + let mut known_flags: Vec = self.0 + .snapshot .idx_by_flag .flags() .filter_map(|f| match flags::from_str(f) { @@ -530,9 +490,9 @@ impl MailboxView { } pub(crate) fn unseen_count(&self) -> usize { - let total = self.known_state.table.len(); - let seen = self - .known_state + let total = self.0.snapshot.table.len(); + let seen = self.0 + .snapshot .idx_by_flag .get(&Flag::Seen.to_string()) .map(|x| x.len()) diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 306fd7d..2a0a24a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -82,7 +82,7 @@ impl Mailbox { self.mbox.read().await.fetch_full(id, message_key).await } - async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { + pub async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { super::snapshot::FrozenMailbox::new(self.clone()).await } diff --git a/src/mail/query.rs b/src/mail/query.rs index 631ad56..5beff37 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -10,10 +10,27 @@ use futures::stream::{FuturesUnordered, StreamExt}; pub struct Query<'a,'b> { pub frozen: &'a FrozenMailbox, pub emails: &'b [UniqueIdent], + pub scope: QueryScope, +} + +pub enum QueryScope { + Index, + Partial, + Full, } impl<'a,'b> Query<'a,'b> { - pub fn index(&self) -> Result> { + pub async fn fetch(&self) -> Result> { + match self.scope { + QueryScope::Index => self.index(), + QueryScope::Partial => self.partial().await, + QueryScope::Full => self.full().await, + } + } + + // --- functions below are private *for reasons* + + fn index(&self) -> Result> { self .emails .iter() @@ -23,18 +40,18 @@ impl<'a,'b> Query<'a,'b> { .snapshot .table .get(uuid) - .map(|index| IndexResult { uuid: *uuid, index }) + .map(|index| QueryResult::IndexResult { uuid: *uuid, index }) .ok_or(anyhow!("missing email in index")) }) .collect::, _>>() } - pub async fn partial(&self) -> Result> { + async fn partial(&self) -> Result> { let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; let result = meta .into_iter() .zip(self.index()?) - .map(|(metadata, index)| PartialResult { uuid: index.uuid, index: index.index, metadata }) + .map(|(metadata, index)| index.into_partial(metadata).expect("index to be IndexResult")) .collect::>(); Ok(result) } @@ -43,18 +60,17 @@ impl<'a,'b> Query<'a,'b> { /// AND GENERATE SO MUCH NETWORK TRAFFIC. /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH /// SOMETHING LIKE AN ITERATOR - pub async fn full(&self) -> Result> { + async fn full(&self) -> Result> { let meta_list = self.partial().await?; meta_list .into_iter() .map(|meta| async move { - let content = self.frozen.mailbox.fetch_full(meta.uuid, &meta.metadata.message_key).await?; - Ok(FullResult { - uuid: meta.uuid, - index: meta.index, - metadata: meta.metadata, - content, - }) + let content = self.frozen.mailbox.fetch_full( + *meta.uuid(), + &meta.metadata().expect("meta to be PartialResult").message_key + ).await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) }) .collect::>() .collect::>() @@ -64,18 +80,66 @@ impl<'a,'b> Query<'a,'b> { } } -pub struct IndexResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, -} -pub struct PartialResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, - pub metadata: MailMeta, +pub enum QueryResult<'a> { + IndexResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + }, + PartialResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + }, + FullResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + content: Vec, + } } -pub struct FullResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, - pub metadata: MailMeta, - pub content: Vec, +impl<'a> QueryResult<'a> { + pub fn uuid(&self) -> &UniqueIdent { + match self { + Self::IndexResult { uuid, .. } => uuid, + Self::PartialResult { uuid, .. } => uuid, + Self::FullResult { uuid, .. } => uuid, + } + } + + pub fn index(&self) -> &IndexEntry { + match self { + Self::IndexResult { index, .. } => index, + Self::PartialResult { index, .. } => index, + Self::FullResult { index, .. } => index, + } + } + + pub fn metadata(&self) -> Option<&MailMeta> { + match self { + Self::IndexResult { .. } => None, + Self::PartialResult { metadata, .. } => Some(metadata), + Self::FullResult { metadata, .. } => Some(metadata), + } + } + + pub fn content(&self) -> Option<&[u8]> { + match self { + Self::FullResult { content, .. } => Some(content), + _ => None, + } + } + + fn into_partial(self, metadata: MailMeta) -> Option { + match self { + Self::IndexResult { uuid, index } => Some(Self::PartialResult { uuid, index, metadata }), + _ => None, + } + } + + fn into_full(self, content: Vec) -> Option { + match self { + Self::PartialResult { uuid, index, metadata } => Some(Self::FullResult { uuid, index, metadata, content }), + _ => None, + } + } } diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs index 54bec64..c3145b4 100644 --- a/src/mail/snapshot.rs +++ b/src/mail/snapshot.rs @@ -4,6 +4,8 @@ use anyhow::Result; use super::mailbox::Mailbox; use super::uidindex::UidIndex; +use super::unique_ident::UniqueIdent; +use super::query::{Query, QueryScope}; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. @@ -49,4 +51,12 @@ impl FrozenMailbox { old_snapshot } + + pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { + Query { + frozen: self, + emails: uuids, + scope, + } + } } -- cgit v1.2.3