From 335750a29a83edba9bce2fb7e1452001e4962d1f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 5 Jan 2024 15:36:40 +0100 Subject: MOVE command is optimized --- src/mail/mailbox.rs | 3 --- src/mail/mod.rs | 2 ++ src/mail/query.rs | 0 src/mail/snapshot.rs | 11 +++++++++++ 4 files changed, 13 insertions(+), 3 deletions(-) create mode 100644 src/mail/query.rs create mode 100644 src/mail/snapshot.rs (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index e424ba3..b011110 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -149,7 +149,6 @@ impl Mailbox { /// Move an email from an other Mailbox to this mailbox /// (use this when possible, as it allows for a certain number of storage optimizations) - #[allow(dead_code)] pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> { if self.id == from.id { bail!("Cannot copy move same mailbox"); @@ -403,8 +402,6 @@ impl MailboxInternal { Ok(new_id) } - #[allow(dead_code)] - // 2023-05-15 will probably be used later async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> { self.copy_internal(from, id, id).await?; from.delete(id).await?; diff --git a/src/mail/mod.rs b/src/mail/mod.rs index bbe4033..7371b53 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -3,6 +3,8 @@ use std::io::Write; pub mod incoming; pub mod mailbox; +pub mod snapshot; +pub mod query; pub mod uidindex; pub mod unique_ident; pub mod user; diff --git a/src/mail/query.rs b/src/mail/query.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs new file mode 100644 index 0000000..7256d50 --- /dev/null +++ b/src/mail/snapshot.rs @@ -0,0 +1,11 @@ +use std::sync::Arc; +use super::mailbox::Mailbox; +use super::uidindex::UidIndex; + +pub struct Snapshot { + pub mailbox: Arc, + pub snapshot: UidIndex, +} + +impl Snapshot { +} -- cgit v1.2.3 From adf4d33f226a745330a3bb802fe9b96f263a0895 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 5 Jan 2024 17:46:16 +0100 Subject: added some utility structures --- src/mail/mailbox.rs | 4 +++ src/mail/query.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mail/snapshot.rs | 45 +++++++++++++++++++++++++++-- src/mail/uidindex.rs | 3 +- 4 files changed, 130 insertions(+), 3 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index b011110..306fd7d 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -82,6 +82,10 @@ impl Mailbox { self.mbox.read().await.fetch_full(id, message_key).await } + async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { + super::snapshot::FrozenMailbox::new(self.clone()).await + } + // ---- Functions for changing the mailbox ---- /// Add flags to message diff --git a/src/mail/query.rs b/src/mail/query.rs index e69de29..631ad56 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -0,0 +1,81 @@ +use anyhow::{Result, anyhow}; +use super::mailbox::MailMeta; +use super::snapshot::FrozenMailbox; +use super::unique_ident::UniqueIdent; +use super::uidindex::IndexEntry; +use futures::stream::{FuturesUnordered, StreamExt}; + +/// Query is in charge of fetching efficiently +/// requested data for a list of emails +pub struct Query<'a,'b> { + pub frozen: &'a FrozenMailbox, + pub emails: &'b [UniqueIdent], +} + +impl<'a,'b> Query<'a,'b> { + pub fn index(&self) -> Result> { + self + .emails + .iter() + .map(|uuid| { + self + .frozen + .snapshot + .table + .get(uuid) + .map(|index| IndexResult { uuid: *uuid, index }) + .ok_or(anyhow!("missing email in index")) + }) + .collect::, _>>() + } + + pub async fn partial(&self) -> Result> { + let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; + let result = meta + .into_iter() + .zip(self.index()?) + .map(|(metadata, index)| PartialResult { uuid: index.uuid, index: index.index, metadata }) + .collect::>(); + Ok(result) + } + + /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY + /// AND GENERATE SO MUCH NETWORK TRAFFIC. + /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH + /// SOMETHING LIKE AN ITERATOR + pub async fn full(&self) -> Result> { + let meta_list = self.partial().await?; + meta_list + .into_iter() + .map(|meta| async move { + let content = self.frozen.mailbox.fetch_full(meta.uuid, &meta.metadata.message_key).await?; + Ok(FullResult { + uuid: meta.uuid, + index: meta.index, + metadata: meta.metadata, + content, + }) + }) + .collect::>() + .collect::>() + .await + .into_iter() + .collect::, _>>() + } +} + +pub struct IndexResult<'a> { + pub uuid: UniqueIdent, + pub index: &'a IndexEntry, +} +pub struct PartialResult<'a> { + pub uuid: UniqueIdent, + pub index: &'a IndexEntry, + pub metadata: MailMeta, +} +pub struct FullResult<'a> { + pub uuid: UniqueIdent, + pub index: &'a IndexEntry, + pub metadata: MailMeta, + pub content: Vec, +} diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs index 7256d50..54bec64 100644 --- a/src/mail/snapshot.rs +++ b/src/mail/snapshot.rs @@ -1,11 +1,52 @@ use std::sync::Arc; + +use anyhow::Result; + use super::mailbox::Mailbox; use super::uidindex::UidIndex; -pub struct Snapshot { +/// A Frozen Mailbox has a snapshot of the current mailbox +/// state that is desynchronized with the real mailbox state. +/// It's up to the user to choose when their snapshot must be updated +/// to give useful information to their clients +/// +/// +pub struct FrozenMailbox { pub mailbox: Arc, pub snapshot: UidIndex, } -impl Snapshot { +impl FrozenMailbox { + /// Create a snapshot from a mailbox, the mailbox + the snapshot + /// becomes the "Frozen Mailbox". + pub async fn new(mailbox: Arc) -> Self { + let state = mailbox.current_uid_index().await; + + Self { + mailbox, + snapshot: state, + } + } + + /// Force the synchronization of the inner mailbox + /// but do not update the local snapshot + pub async fn sync(&self) -> Result<()> { + self.mailbox.opportunistic_sync().await + } + + /// Peek snapshot without updating the frozen mailbox + /// Can be useful if you want to plan some writes + /// while sending a diff to the client later + pub async fn peek(&self) -> UidIndex { + self.mailbox.current_uid_index().await + } + + /// Update the FrozenMailbox local snapshot. + /// Returns the old snapshot, so you can build a diff + pub async fn update(&mut self) -> UidIndex { + let old_snapshot = self.snapshot.clone(); + self.snapshot = self.mailbox.current_uid_index().await; + + old_snapshot + } } diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 956b194..01f8c9c 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -9,6 +9,7 @@ use crate::mail::unique_ident::UniqueIdent; pub type ImapUid = NonZeroU32; pub type ImapUidvalidity = NonZeroU32; pub type Flag = String; +pub type IndexEntry = (ImapUid, Vec); /// A UidIndex handles the mutable part of a mailbox /// It is built by running the event log on it @@ -18,7 +19,7 @@ pub type Flag = String; #[derive(Clone)] pub struct UidIndex { // Source of trust - pub table: OrdMap)>, + pub table: OrdMap, // Indexes optimized for queries pub idx_by_uid: OrdMap, -- cgit v1.2.3 From 4806f7ff84c595ec6647744577388fe4fab33736 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 5 Jan 2024 18:59:19 +0100 Subject: WIP rewrite with a query manager --- src/mail/mailbox.rs | 2 +- src/mail/query.rs | 114 ++++++++++++++++++++++++++++++++++++++++----------- src/mail/snapshot.rs | 10 +++++ 3 files changed, 100 insertions(+), 26 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 306fd7d..2a0a24a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -82,7 +82,7 @@ impl Mailbox { self.mbox.read().await.fetch_full(id, message_key).await } - async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { + pub async fn frozen(self: &std::sync::Arc) -> super::snapshot::FrozenMailbox { super::snapshot::FrozenMailbox::new(self.clone()).await } diff --git a/src/mail/query.rs b/src/mail/query.rs index 631ad56..5beff37 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -10,10 +10,27 @@ use futures::stream::{FuturesUnordered, StreamExt}; pub struct Query<'a,'b> { pub frozen: &'a FrozenMailbox, pub emails: &'b [UniqueIdent], + pub scope: QueryScope, +} + +pub enum QueryScope { + Index, + Partial, + Full, } impl<'a,'b> Query<'a,'b> { - pub fn index(&self) -> Result> { + pub async fn fetch(&self) -> Result> { + match self.scope { + QueryScope::Index => self.index(), + QueryScope::Partial => self.partial().await, + QueryScope::Full => self.full().await, + } + } + + // --- functions below are private *for reasons* + + fn index(&self) -> Result> { self .emails .iter() @@ -23,18 +40,18 @@ impl<'a,'b> Query<'a,'b> { .snapshot .table .get(uuid) - .map(|index| IndexResult { uuid: *uuid, index }) + .map(|index| QueryResult::IndexResult { uuid: *uuid, index }) .ok_or(anyhow!("missing email in index")) }) .collect::, _>>() } - pub async fn partial(&self) -> Result> { + async fn partial(&self) -> Result> { let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; let result = meta .into_iter() .zip(self.index()?) - .map(|(metadata, index)| PartialResult { uuid: index.uuid, index: index.index, metadata }) + .map(|(metadata, index)| index.into_partial(metadata).expect("index to be IndexResult")) .collect::>(); Ok(result) } @@ -43,18 +60,17 @@ impl<'a,'b> Query<'a,'b> { /// AND GENERATE SO MUCH NETWORK TRAFFIC. /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH /// SOMETHING LIKE AN ITERATOR - pub async fn full(&self) -> Result> { + async fn full(&self) -> Result> { let meta_list = self.partial().await?; meta_list .into_iter() .map(|meta| async move { - let content = self.frozen.mailbox.fetch_full(meta.uuid, &meta.metadata.message_key).await?; - Ok(FullResult { - uuid: meta.uuid, - index: meta.index, - metadata: meta.metadata, - content, - }) + let content = self.frozen.mailbox.fetch_full( + *meta.uuid(), + &meta.metadata().expect("meta to be PartialResult").message_key + ).await?; + + Ok(meta.into_full(content).expect("meta to be PartialResult")) }) .collect::>() .collect::>() @@ -64,18 +80,66 @@ impl<'a,'b> Query<'a,'b> { } } -pub struct IndexResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, -} -pub struct PartialResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, - pub metadata: MailMeta, +pub enum QueryResult<'a> { + IndexResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + }, + PartialResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + }, + FullResult { + uuid: UniqueIdent, + index: &'a IndexEntry, + metadata: MailMeta, + content: Vec, + } } -pub struct FullResult<'a> { - pub uuid: UniqueIdent, - pub index: &'a IndexEntry, - pub metadata: MailMeta, - pub content: Vec, +impl<'a> QueryResult<'a> { + pub fn uuid(&self) -> &UniqueIdent { + match self { + Self::IndexResult { uuid, .. } => uuid, + Self::PartialResult { uuid, .. } => uuid, + Self::FullResult { uuid, .. } => uuid, + } + } + + pub fn index(&self) -> &IndexEntry { + match self { + Self::IndexResult { index, .. } => index, + Self::PartialResult { index, .. } => index, + Self::FullResult { index, .. } => index, + } + } + + pub fn metadata(&self) -> Option<&MailMeta> { + match self { + Self::IndexResult { .. } => None, + Self::PartialResult { metadata, .. } => Some(metadata), + Self::FullResult { metadata, .. } => Some(metadata), + } + } + + pub fn content(&self) -> Option<&[u8]> { + match self { + Self::FullResult { content, .. } => Some(content), + _ => None, + } + } + + fn into_partial(self, metadata: MailMeta) -> Option { + match self { + Self::IndexResult { uuid, index } => Some(Self::PartialResult { uuid, index, metadata }), + _ => None, + } + } + + fn into_full(self, content: Vec) -> Option { + match self { + Self::PartialResult { uuid, index, metadata } => Some(Self::FullResult { uuid, index, metadata, content }), + _ => None, + } + } } diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs index 54bec64..c3145b4 100644 --- a/src/mail/snapshot.rs +++ b/src/mail/snapshot.rs @@ -4,6 +4,8 @@ use anyhow::Result; use super::mailbox::Mailbox; use super::uidindex::UidIndex; +use super::unique_ident::UniqueIdent; +use super::query::{Query, QueryScope}; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. @@ -49,4 +51,12 @@ impl FrozenMailbox { old_snapshot } + + pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { + Query { + frozen: self, + emails: uuids, + scope, + } + } } -- cgit v1.2.3 From a84ba4d42fcdb38be514178eb9fced777ba76055 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 6 Jan 2024 11:07:53 +0100 Subject: Mailbox View made more readable --- src/mail/query.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/mail') diff --git a/src/mail/query.rs b/src/mail/query.rs index 5beff37..70feb89 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -13,6 +13,7 @@ pub struct Query<'a,'b> { pub scope: QueryScope, } +#[allow(dead_code)] pub enum QueryScope { Index, Partial, @@ -106,6 +107,7 @@ impl<'a> QueryResult<'a> { } } + #[allow(dead_code)] pub fn index(&self) -> &IndexEntry { match self { Self::IndexResult { index, .. } => index, @@ -114,7 +116,7 @@ impl<'a> QueryResult<'a> { } } - pub fn metadata(&self) -> Option<&MailMeta> { + pub fn metadata(&'a self) -> Option<&'a MailMeta> { match self { Self::IndexResult { .. } => None, Self::PartialResult { metadata, .. } => Some(metadata), @@ -122,7 +124,8 @@ impl<'a> QueryResult<'a> { } } - pub fn content(&self) -> Option<&[u8]> { + #[allow(dead_code)] + pub fn content(&'a self) -> Option<&'a [u8]> { match self { Self::FullResult { content, .. } => Some(content), _ => None, -- cgit v1.2.3 From 1ca6cd5de0656910213425e1d8f05256af820f21 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 6 Jan 2024 11:33:40 +0100 Subject: search is re-enabled --- src/mail/query.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/mail') diff --git a/src/mail/query.rs b/src/mail/query.rs index 70feb89..7b26cb9 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -21,7 +21,7 @@ pub enum QueryScope { } impl<'a,'b> Query<'a,'b> { - pub async fn fetch(&self) -> Result> { + pub async fn fetch(&self) -> Result>> { match self.scope { QueryScope::Index => self.index(), QueryScope::Partial => self.partial().await, @@ -31,7 +31,7 @@ impl<'a,'b> Query<'a,'b> { // --- functions below are private *for reasons* - fn index(&self) -> Result> { + fn index(&self) -> Result>> { self .emails .iter() @@ -47,7 +47,7 @@ impl<'a,'b> Query<'a,'b> { .collect::, _>>() } - async fn partial(&self) -> Result> { + async fn partial(&self) -> Result>> { let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; let result = meta .into_iter() @@ -61,7 +61,7 @@ impl<'a,'b> Query<'a,'b> { /// AND GENERATE SO MUCH NETWORK TRAFFIC. /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result> { + async fn full(&self) -> Result>> { let meta_list = self.partial().await?; meta_list .into_iter() -- cgit v1.2.3 From 53dbf82cbce3cb17cbcffd09558677faf8702f54 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 6 Jan 2024 11:33:56 +0100 Subject: Format code again --- src/mail/mod.rs | 2 +- src/mail/query.rs | 56 ++++++++++++++++++++++++++++++++++++---------------- src/mail/snapshot.rs | 16 +++++++-------- 3 files changed, 48 insertions(+), 26 deletions(-) (limited to 'src/mail') diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 7371b53..1836052 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -3,8 +3,8 @@ use std::io::Write; pub mod incoming; pub mod mailbox; -pub mod snapshot; pub mod query; +pub mod snapshot; pub mod uidindex; pub mod unique_ident; pub mod user; diff --git a/src/mail/query.rs b/src/mail/query.rs index 7b26cb9..8de73e6 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -1,13 +1,13 @@ -use anyhow::{Result, anyhow}; use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; -use super::unique_ident::UniqueIdent; use super::uidindex::IndexEntry; +use super::unique_ident::UniqueIdent; +use anyhow::{anyhow, Result}; use futures::stream::{FuturesUnordered, StreamExt}; /// Query is in charge of fetching efficiently /// requested data for a list of emails -pub struct Query<'a,'b> { +pub struct Query<'a, 'b> { pub frozen: &'a FrozenMailbox, pub emails: &'b [UniqueIdent], pub scope: QueryScope, @@ -20,7 +20,7 @@ pub enum QueryScope { Full, } -impl<'a,'b> Query<'a,'b> { +impl<'a, 'b> Query<'a, 'b> { pub async fn fetch(&self) -> Result>> { match self.scope { QueryScope::Index => self.index(), @@ -32,12 +32,10 @@ impl<'a,'b> Query<'a,'b> { // --- functions below are private *for reasons* fn index(&self) -> Result>> { - self - .emails + self.emails .iter() .map(|uuid| { - self - .frozen + self.frozen .snapshot .table .get(uuid) @@ -52,7 +50,11 @@ impl<'a,'b> Query<'a,'b> { let result = meta .into_iter() .zip(self.index()?) - .map(|(metadata, index)| index.into_partial(metadata).expect("index to be IndexResult")) + .map(|(metadata, index)| { + index + .into_partial(metadata) + .expect("index to be IndexResult") + }) .collect::>(); Ok(result) } @@ -65,11 +67,18 @@ impl<'a,'b> Query<'a,'b> { let meta_list = self.partial().await?; meta_list .into_iter() - .map(|meta| async move { - let content = self.frozen.mailbox.fetch_full( - *meta.uuid(), - &meta.metadata().expect("meta to be PartialResult").message_key - ).await?; + .map(|meta| async move { + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) + .await?; Ok(meta.into_full(content).expect("meta to be PartialResult")) }) @@ -96,7 +105,7 @@ pub enum QueryResult<'a> { index: &'a IndexEntry, metadata: MailMeta, content: Vec, - } + }, } impl<'a> QueryResult<'a> { pub fn uuid(&self) -> &UniqueIdent { @@ -134,14 +143,27 @@ impl<'a> QueryResult<'a> { fn into_partial(self, metadata: MailMeta) -> Option { match self { - Self::IndexResult { uuid, index } => Some(Self::PartialResult { uuid, index, metadata }), + Self::IndexResult { uuid, index } => Some(Self::PartialResult { + uuid, + index, + metadata, + }), _ => None, } } fn into_full(self, content: Vec) -> Option { match self { - Self::PartialResult { uuid, index, metadata } => Some(Self::FullResult { uuid, index, metadata, content }), + Self::PartialResult { + uuid, + index, + metadata, + } => Some(Self::FullResult { + uuid, + index, + metadata, + content, + }), _ => None, } } diff --git a/src/mail/snapshot.rs b/src/mail/snapshot.rs index c3145b4..0834f09 100644 --- a/src/mail/snapshot.rs +++ b/src/mail/snapshot.rs @@ -3,16 +3,16 @@ use std::sync::Arc; use anyhow::Result; use super::mailbox::Mailbox; +use super::query::{Query, QueryScope}; use super::uidindex::UidIndex; use super::unique_ident::UniqueIdent; -use super::query::{Query, QueryScope}; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. /// It's up to the user to choose when their snapshot must be updated /// to give useful information to their clients /// -/// +/// pub struct FrozenMailbox { pub mailbox: Arc, pub snapshot: UidIndex, @@ -46,17 +46,17 @@ impl FrozenMailbox { /// Update the FrozenMailbox local snapshot. /// Returns the old snapshot, so you can build a diff pub async fn update(&mut self) -> UidIndex { - let old_snapshot = self.snapshot.clone(); - self.snapshot = self.mailbox.current_uid_index().await; + let old_snapshot = self.snapshot.clone(); + self.snapshot = self.mailbox.current_uid_index().await; - old_snapshot + old_snapshot } pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> { Query { - frozen: self, - emails: uuids, - scope, + frozen: self, + emails: uuids, + scope, } } } -- cgit v1.2.3