diff options
Diffstat (limited to 'src/mail')
-rw-r--r-- | src/mail/mailbox.rs | 7 | ||||
-rw-r--r-- | src/mail/mod.rs | 2 | ||||
-rw-r--r-- | src/mail/query.rs | 170 | ||||
-rw-r--r-- | src/mail/snapshot.rs | 62 | ||||
-rw-r--r-- | src/mail/uidindex.rs | 3 |
5 files changed, 240 insertions, 4 deletions
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index e424ba3..2a0a24a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -82,6 +82,10 @@ impl Mailbox { self.mbox.read().await.fetch_full(id, message_key).await } + pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox { + super::snapshot::FrozenMailbox::new(self.clone()).await + } + // ---- Functions for changing the mailbox ---- /// Add flags to message @@ -149,7 +153,6 @@ impl Mailbox { /// Move an email from an other Mailbox to this mailbox /// (use this when possible, as it allows for a certain number of storage optimizations) - #[allow(dead_code)] pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { if self.id == from.id { bail!("Cannot copy move same mailbox"); @@ -403,8 +406,6 @@ impl MailboxInternal { Ok(new_id) } - #[allow(dead_code)] - // 2023-05-15 will probably be used later async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { self.copy_internal(from, id, id).await?; from.delete(id).await?; diff --git a/src/mail/mod.rs b/src/mail/mod.rs index bbe4033..1836052 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -3,6 +3,8 @@ use std::io::Write; pub mod incoming; pub mod mailbox; +pub mod query; +pub mod snapshot; pub mod uidindex; pub mod unique_ident; pub mod user; diff --git a/src/mail/query.rs b/src/mail/query.rs new file mode 100644 index 0000000..8de73e6 --- /dev/null +++ b/src/mail/query.rs @@ -0,0 +1,170 @@ +use super::mailbox::MailMeta; +use super::snapshot::FrozenMailbox; +use super::uidindex::IndexEntry; +use super::unique_ident::UniqueIdent; +use anyhow::{anyhow, Result}; +use futures::stream::{FuturesUnordered, StreamExt}; + +/// Query is in charge of fetching efficiently +/// requested data for a list of emails +pub struct Query<'a, 'b> { + pub frozen: &'a FrozenMailbox, + pub emails: &'b [UniqueIdent], + pub scope: QueryScope, +} + +#[allow(dead_code)] +pub enum QueryScope { + Index, + Partial, + Full, +} + +impl<'a, 'b> Query<'a, 'b> { + pub async fn fetch(&self) -> Result<Vec<QueryResult<'a>>> { + match self.scope { + QueryScope::Index => self.index(), + QueryScope::Partial => self.partial().await, + QueryScope::Full => self.full().await, + } + } + + // --- functions below are private *for reasons* + + fn index(&self) -> Result<Vec<QueryResult<'a>>> { + self.emails + .iter() + .map(|uuid| { + self.frozen + .snapshot + .table + .get(uuid) + .map(|index| QueryResult::IndexResult { uuid: *uuid, index }) + .ok_or(anyhow!("missing email in index")) + }) + .collect::<Result<Vec<_>, _>>() + } + + async fn partial(&self) -> Result<Vec<QueryResult<'a>>> { + let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; + let result = meta + .into_iter() + .zip(self.index()?) + .map(|(metadata, index)| { + index + .into_partial(metadata) + .expect("index to be IndexResult") + }) + .collect::<Vec<_>>(); + Ok(result) + } + + /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY + /// AND GENERATE SO MUCH NETWORK TRAFFIC. + /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH + /// SOMETHING LIKE AN ITERATOR + async fn full(&self) -> Result<Vec<QueryResult<'a>>> { + let meta_list = self.partial().await?; + meta_list + .into_iter() + .map(|meta| async move { + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) + .collect::<FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await + .into_iter() + .collect::<Result<Vec<_>, _>>() + } +} + +pub enum QueryResult<'a> { + IndexResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + }, + PartialResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + }, + FullResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + content: Vec<u8>, + }, +} +impl<'a> QueryResult<'a> { + pub fn uuid(&self) -> &UniqueIdent { + match self { + Self::IndexResult { uuid, .. } => uuid, + Self::PartialResult { uuid, .. } => uuid, + Self::FullResult { uuid, .. } => uuid, + } + } + + #[allow(dead_code)] + pub fn index(&self) -> &IndexEntry { + match self { + Self::IndexResult { index, .. } => index, + Self::PartialResult { index, .. } => index, + Self::FullResult { index, .. } => index, + } + } + + pub fn metadata(&'a self) -> Option<&'a MailMeta> { + match self { + Self::IndexResult { .. } => None, + Self::PartialResult { metadata, .. } => Some(metadata), + Self::FullResult { metadata, .. } => Some(metadata), + } + } + + #[allow(dead_code)] + pub fn content(&'a self) -> Option<&'a [u8]> { + match self { + Self::FullResult { content, .. } => Some(content), + _ => None, + } + } + + fn into_partial(self, metadata: MailMeta) -> Option<Self> { + match self { + Self::IndexResult { uuid, index } => Some(Self::PartialResult { + uuid, + index, + metadata, + }), + _ => None, + } + } + + fn into_full(self, content: Vec<u8>) -> Option<Self> { + match self { + Self::PartialResult { + uuid, + index, + metadata, + } => Some(Self::FullResult { + uuid, + index, + metadata, + content, + }), + _ => None, + } + } +} diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs new file mode 100644 index 0000000..0834f09 --- /dev/null +++ b/src/mail/snapshot.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use anyhow::Result; + +use super::mailbox::Mailbox; +use super::query::{Query, QueryScope}; +use super::uidindex::UidIndex; +use super::unique_ident::UniqueIdent; + +/// A Frozen Mailbox has a snapshot of the current mailbox +/// state that is desynchronized with the real mailbox state. +/// It's up to the user to choose when their snapshot must be updated +/// to give useful information to their clients +/// +/// +pub struct FrozenMailbox { + pub mailbox: Arc<Mailbox>, + pub snapshot: UidIndex, +} + +impl FrozenMailbox { + /// Create a snapshot from a mailbox, the mailbox + the snapshot + /// becomes the "Frozen Mailbox". + pub async fn new(mailbox: Arc<Mailbox>) -> Self { + let state = mailbox.current_uid_index().await; + + Self { + mailbox, + snapshot: state, + } + } + + /// Force the synchronization of the inner mailbox + /// but do not update the local snapshot + pub async fn sync(&self) -> Result<()> { + self.mailbox.opportunistic_sync().await + } + + /// Peek snapshot without updating the frozen mailbox + /// Can be useful if you want to plan some writes + /// while sending a diff to the client later + pub async fn peek(&self) -> UidIndex { + self.mailbox.current_uid_index().await + } + + /// Update the FrozenMailbox local snapshot. + /// Returns the old snapshot, so you can build a diff + pub async fn update(&mut self) -> UidIndex { + let old_snapshot = self.snapshot.clone(); + self.snapshot = self.mailbox.current_uid_index().await; + + old_snapshot + } + + pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { + Query { + frozen: self, + emails: uuids, + scope, + } + } +} diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 956b194..01f8c9c 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -9,6 +9,7 @@ use crate::mail::unique_ident::UniqueIdent; pub type ImapUid = NonZeroU32; pub type ImapUidvalidity = NonZeroU32; pub type Flag = String; +pub type IndexEntry = (ImapUid, Vec<Flag>); /// A UidIndex handles the mutable part of a mailbox /// It is built by running the event log on it @@ -18,7 +19,7 @@ pub type Flag = String; #[derive(Clone)] pub struct UidIndex { // Source of trust - pub table: OrdMap<UniqueIdent, (ImapUid, Vec<Flag>)>, + pub table: OrdMap<UniqueIdent, IndexEntry>, // Indexes optimized for queries pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>, |