From 28b1f4f14dffc5dcd5152ce931f6c50b17c134db Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 20 Feb 2024 11:42:51 +0100 Subject: Unsollicited responses on APPEND --- src/imap/command/authenticated.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 26b1946..baf3033 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -14,13 +14,13 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; use imap_codec::imap_types::response::{Code, CodeOther, Data}; use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName}; +use crate::imap::Body; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, MailboxName}; use crate::imap::flow; -use crate::imap::mailbox_view::MailboxView; +use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; -use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::*; use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW}; use crate::mail::IMF; @@ -558,9 +558,12 @@ impl<'a> AuthenticatedContext<'a> { ) -> Result<(Response<'static>, flow::Transition)> { let append_tag = self.req.tag.clone(); match self.append_internal(mailbox, flags, date, message).await { - Ok((_mb, uidvalidity, uid, _modseq)) => Ok(( + + + Ok((_mb_view, unsollicited, uidvalidity, uid, _modseq)) => Ok(( Response::build() .tag(append_tag) + .set_body(unsollicited) .message("APPEND completed") .code(Code::Other(CodeOther::unvalidated( format!("APPENDUID {} {}", uidvalidity, uid).into_bytes(), @@ -593,13 +596,14 @@ impl<'a> AuthenticatedContext<'a> { )) } + //@FIXME should be refactored and integrated to the mailbox view pub(crate) async fn append_internal( self, mailbox: &MailboxCodec<'a>, flags: &[Flag<'a>], date: &Option, message: &Literal<'a>, - ) -> Result<(Arc, ImapUidvalidity, ImapUid, ModSeq)> { + ) -> Result<(MailboxView, Vec>, ImapUidvalidity, ImapUid, ModSeq)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -607,6 +611,7 @@ impl<'a> AuthenticatedContext<'a> { Some(mb) => mb, None => bail!("Mailbox does not exist"), }; + let mut view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await; if date.is_some() { tracing::warn!("Cannot set date when appending message"); @@ -617,9 +622,10 @@ impl<'a> AuthenticatedContext<'a> { let flags = flags.iter().map(|x| x.to_string()).collect::>(); // TODO: filter allowed flags? ping @Quentin - let (uidvalidity, uid, modseq) = mb.append(msg, None, &flags[..]).await?; + let (uidvalidity, uid, modseq) = view.internal.mailbox.append(msg, None, &flags[..]).await?; + let unsollicited = view.update(UpdateParameters::default()).await?; - Ok((mb, uidvalidity, uid, modseq)) + Ok((view, unsollicited, uidvalidity, uid, modseq)) } } -- cgit v1.2.3 From 64b474f682a3c519ca1bda279132273a53ca2115 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 20 Feb 2024 13:24:42 +0100 Subject: Unsollicited response on APPEND was wrong, upgrade imap-flow to fix LITERAL+ --- src/imap/command/authenticated.rs | 11 ++++++----- src/imap/mod.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index baf3033..d2e7815 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -549,6 +549,8 @@ impl<'a> AuthenticatedContext<'a> { )) } + //@FIXME we should write a specific version for the "selected" state + //that returns some unsollicited responses async fn append( self, mailbox: &MailboxCodec<'a>, @@ -560,10 +562,9 @@ impl<'a> AuthenticatedContext<'a> { match self.append_internal(mailbox, flags, date, message).await { - Ok((_mb_view, unsollicited, uidvalidity, uid, _modseq)) => Ok(( + Ok((_mb_view, uidvalidity, uid, _modseq)) => Ok(( Response::build() .tag(append_tag) - .set_body(unsollicited) .message("APPEND completed") .code(Code::Other(CodeOther::unvalidated( format!("APPENDUID {} {}", uidvalidity, uid).into_bytes(), @@ -603,7 +604,7 @@ impl<'a> AuthenticatedContext<'a> { flags: &[Flag<'a>], date: &Option, message: &Literal<'a>, - ) -> Result<(MailboxView, Vec>, ImapUidvalidity, ImapUid, ModSeq)> { + ) -> Result<(MailboxView, ImapUidvalidity, ImapUid, ModSeq)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -623,9 +624,9 @@ impl<'a> AuthenticatedContext<'a> { // TODO: filter allowed flags? ping @Quentin let (uidvalidity, uid, modseq) = view.internal.mailbox.append(msg, None, &flags[..]).await?; - let unsollicited = view.update(UpdateParameters::default()).await?; + //let unsollicited = view.update(UpdateParameters::default()).await?; - Ok((view, unsollicited, uidvalidity, uid, modseq)) + Ok((view, uidvalidity, uid, modseq)) } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 58c4dc0..544086e 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -185,15 +185,15 @@ impl NetLoop { } async fn new(ctx: ClientContext, sock: AnyStream) -> Result { + let mut opts = ServerFlowOptions::default(); + opts.crlf_relaxed = false; + opts.literal_accept_text = Text::unvalidated("OK"); + opts.literal_reject_text = Text::unvalidated("Literal rejected"); + // Send greeting let (server, _) = ServerFlow::send_greeting( sock, - ServerFlowOptions { - crlf_relaxed: false, - literal_accept_text: Text::unvalidated("OK"), - literal_reject_text: Text::unvalidated("Literal rejected"), - ..ServerFlowOptions::default() - }, + opts, Greeting::ok( Some(Code::Capability(ctx.server_capabilities.to_vec())), "Aerogramme", -- cgit v1.2.3 From 4d501b6947497d7705c64dcaf093e5e5a09a9235 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 11:35:39 +0100 Subject: Compile streams --- src/imap/mailbox_view.rs | 125 ++++++++++++++++++++++++----------------------- src/imap/search.rs | 21 +------- src/mail/mailbox.rs | 2 +- src/mail/query.rs | 68 ++++++++++++-------------- 4 files changed, 98 insertions(+), 118 deletions(-) (limited to 'src') diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index d57e9a3..5055c40 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Error, Result}; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{StreamExt, TryStreamExt}; use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::fetch::MessageDataItem; @@ -362,46 +362,33 @@ impl MailboxView { .iter() .map(|midx| midx.uuid) .collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [3/6] Derive an IMAP-specific view from the results, apply the filters - let views = query_result - .iter() - .zip(mail_idx_list.into_iter()) - .map(|(qr, midx)| MailView::new(qr, midx)) - .collect::, _>>()?; - - // [4/6] Apply the IMAP transformation, bubble up any error - // We get 2 results: - // - The one we send to the client - // - The \Seen flags we must set internally - let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views - .iter() - .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body))) - .collect::, _>>()? - .into_iter() - .unzip(); + let query = self.internal.query(&uuids, query_scope); + //let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [5/6] Register the \Seen flags - flag_mgmt - .iter() - .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _seen)| async move { - let seen_flag = Flag::Seen.to_string(); - self.internal - .mailbox - .add_flags(*mv.query_result.uuid(), &[seen_flag]) - .await?; - Ok::<_, anyhow::Error>(()) + let query_stream = query + .fetch() + .zip(futures::stream::iter(mail_idx_list)) + // [3/6] Derive an IMAP-specific view from the results, apply the filters + .map(|(maybe_qr, midx)| match maybe_qr { + Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), + Err(e) => Err(e), }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::>()?; + // [4/6] Apply the IMAP transformation + .then(|maybe_ret| async move { + let ((body, seen), midx) = maybe_ret?; + + // [5/6] Register the \Seen flags + if matches!(seen, SeenFlag::MustAdd) { + let seen_flag = Flag::Seen.to_string(); + self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?; + } + + Ok::<_, anyhow::Error>(body) + }); // [6/6] Build the final result that will be sent to the client. - Ok(imap_ret) + query_stream.try_collect().await } /// A naive search implementation... @@ -423,39 +410,55 @@ impl MailboxView { // 3. Filter the selection based on the ID / UID / Flags let (kept_idx, to_fetch) = crit.filter_on_idx(&selection); - // 4. Fetch additional info about the emails + // 4.a Fetch additional info about the emails let query_scope = crit.query_scope(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; + let query = self.internal.query(&uuids, query_scope); + + // 4.b We don't want to keep all data in memory, so we do the computing in a stream + let query_stream = query + .fetch() + .zip(futures::stream::iter(&to_fetch)) + // 5.a Build a mailview with the body, might fail with an error + // 5.b If needed, filter the selection based on the body, but keep the errors + // 6. Drop the query+mailbox, keep only the mail index + // Here we release a lot of memory, this is the most important part ^^ + .filter_map(|(maybe_qr, midx)| { + let r = match maybe_qr { + Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) { + Ok(true) => Some(Ok(*midx)), + Ok(_) => None, + Err(e) => Some(Err(e)), + } + Err(e) => Some(Err(e)), + }; + futures::future::ready(r) + }); - // 5. If needed, filter the selection based on the body - let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; - // 6. Format the result according to the client's taste: - // either return UID or ID. - let final_selection = kept_idx.iter().chain(kept_query.iter()); - let selection_fmt = match uid { - true => final_selection.map(|in_idx| in_idx.uid).collect(), - _ => final_selection.map(|in_idx| in_idx.i).collect(), - }; + // 7. Chain both streams (part resolved from index, part resolved from metadata+body) + let main_stream = futures::stream::iter(kept_idx) + .map(Ok) + .chain(query_stream) + .map_ok(|idx| match uid { + true => (idx.uid, idx.modseq), + _ => (idx.i, idx.modseq), + }); - // 7. Add the modseq entry if needed - let is_modseq = crit.is_modseq(); - let maybe_modseq = match is_modseq { - true => { - let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection - .map(|in_idx| in_idx.modseq) - .max() - .map(|r| NonZeroU64::try_from(r)) - .transpose()? - } + // 8. Do the actual computation + let internal_result: Vec<_> = main_stream.try_collect().await?; + let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip(); + + // 9. Aggregate the maximum modseq value + let maybe_modseq = match crit.is_modseq() { + true => modseqs.into_iter().max(), _ => None, }; + // 10. Return the final result Ok(( - vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], - is_modseq, + vec![Body::Data(Data::Search(selection, maybe_modseq))], + maybe_modseq.is_some(), )) } @@ -678,7 +681,7 @@ mod tests { content: rfc822.to_vec(), }; - let mv = MailView::new(&qr, &mail_in_idx)?; + let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?; let (res_body, _seen) = mv.filter(&ap)?; let fattr = match res_body { diff --git a/src/imap/search.rs b/src/imap/search.rs index d06c3bd..4ff70ee 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,13 +1,12 @@ use std::num::{NonZeroU32, NonZeroU64}; -use anyhow::Result; use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use crate::imap::index::MailIndex; use crate::imap::mail_view::MailView; -use crate::mail::query::{QueryResult, QueryScope}; +use crate::mail::query::QueryScope; pub enum SeqType { Undefined, @@ -145,22 +144,6 @@ impl<'a> Criteria<'a> { (to_keep, to_fetch) } - pub fn filter_on_query<'b>( - &self, - midx_list: &[&'b MailIndex<'b>], - query_result: &'b Vec, - ) -> Result>> { - Ok(midx_list - .iter() - .zip(query_result.iter()) - .map(|(midx, qr)| MailView::new(qr, midx)) - .collect::, _>>()? - .into_iter() - .filter(|mail_view| self.is_keep_on_query(mail_view)) - .map(|mail_view| mail_view.in_idx) - .collect()) - } - // ---- /// Here we are doing a partial filtering: we do not have access @@ -213,7 +196,7 @@ impl<'a> Criteria<'a> { /// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true, /// we could simplify the request to just body(x) and truncate the first OR. Today, we are /// not doing that, and thus we reevaluate everything. - fn is_keep_on_query(&self, mail_view: &MailView) -> bool { + pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool { use SearchKey::*; match self.0 { // Combinator logic diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c20d815..9190883 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou) { /// The metadata of a message that is stored in K2V /// at pk = mail/, sk = -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MailMeta { /// INTERNALDATE field (milliseconds since epoch) pub internaldate: u64, diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..cbc5fe6 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{Stream, StreamExt, BoxStream}; +use futures::future::FutureExt; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,57 @@ impl QueryScope { } } +//type QueryResultStream = Box>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result> { + pub fn fetch(&self) -> BoxStream> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { + async move { + let maybe_meta_list: Result> = self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + ) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::>(); - - Ok(result) + futures::stream::iter(list_res) + }.flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { + fn full<'d>(&'d self) -> impl Stream> + 'd + Send { + self.partial() + .then(move |maybe_meta| async move { + let meta = maybe_meta?; + let content = self .frozen .mailbox .fetch_full( *meta.uuid(), &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) .await?; Ok(meta.into_full(content).expect("meta to be PartialResult")) }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::, _>>() } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent, -- cgit v1.2.3 From 3f204b102abb408a9f0b2adc45065585ddc16a88 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 11:51:58 +0100 Subject: fix test --- src/imap/mailbox_view.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 5055c40..076f92e 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -681,7 +681,7 @@ mod tests { content: rfc822.to_vec(), }; - let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?; + let mv = MailView::new(&qr, &mail_in_idx)?; let (res_body, _seen) = mv.filter(&ap)?; let fattr = match res_body { -- cgit v1.2.3 From 2adf73dd8e6e76997d4fb67f9f7b3fe065530722 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 17:30:40 +0100 Subject: Update imap-flow, clean IDLE --- src/imap/capability.rs | 4 +- src/imap/command/authenticated.rs | 4 +- src/imap/command/selected.rs | 14 +-- src/imap/flow.rs | 11 ++- src/imap/mailbox_view.rs | 6 +- src/imap/mime_view.rs | 12 +-- src/imap/mod.rs | 177 +++++++++++++++++++++----------------- src/imap/request.rs | 4 +- src/imap/response.rs | 3 +- src/imap/search.rs | 4 +- src/imap/session.rs | 48 +++++++++-- 11 files changed, 171 insertions(+), 116 deletions(-) (limited to 'src') diff --git a/src/imap/capability.rs b/src/imap/capability.rs index 256d820..c76b51c 100644 --- a/src/imap/capability.rs +++ b/src/imap/capability.rs @@ -1,5 +1,5 @@ use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier}; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::response::Capability; use std::collections::HashSet; @@ -49,7 +49,7 @@ impl Default for ServerCapability { } impl ServerCapability { - pub fn to_vec(&self) -> NonEmptyVec> { + pub fn to_vec(&self) -> Vec1> { self.0 .iter() .map(|v| v.clone()) diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index d2e7815..129c2fd 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result}; use imap_codec::imap_types::command::{ Command, CommandBody, ListReturnItem, SelectExamineModifier, }; -use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar}; +use imap_codec::imap_types::core::{Atom, Literal, Vec1, QuotedChar}; use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::flag::{Flag, FlagNameAttribute}; @@ -584,7 +584,7 @@ impl<'a> AuthenticatedContext<'a> { fn enable( self, - cap_enable: &NonEmptyVec>, + cap_enable: &Vec1>, ) -> Result<(Response<'static>, flow::Transition)> { let mut response_builder = Response::build().to_req(self.req); let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref()); diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 73f8aec..e871509 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Vec1, Charset}; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; @@ -54,11 +54,12 @@ pub async fn dispatch<'a>( ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid) .await } + //@FIXME SearchKey::And is a legacy hack, should be refactored CommandBody::Search { charset, criteria, uid, - } => ctx.search(charset, criteria, uid).await, + } => ctx.search(charset, &SearchKey::And(criteria.clone()), uid).await, CommandBody::Expunge { // UIDPLUS (rfc4315) uid_sequence_set, @@ -88,15 +89,6 @@ pub async fn dispatch<'a>( // UNSELECT extension (rfc3691) CommandBody::Unselect => ctx.unselect().await, - // IDLE extension (rfc2177) - CommandBody::Idle => Ok(( - Response::build() - .to_req(ctx.req) - .message("DUMMY command due to anti-pattern in the code") - .ok()?, - flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), - )), - // In selected mode, we fallback to authenticated when needed _ => { authenticated::dispatch(authenticated::AuthenticatedContext { diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 6ddd092..4405e71 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,11 +1,12 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; + use tokio::sync::Notify; +use imap_codec::imap_types::core::Tag; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; -use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Error { @@ -31,6 +32,14 @@ pub enum State { ), Logout, } +impl State { + pub fn notify(&self) -> Option> { + match self { + Self::Idle(_, _, _, _, anotif) => Some(anotif.clone()), + _ => None, + } + } +} impl fmt::Display for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use State::*; diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 076f92e..f6395b4 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, Error, Result}; use futures::stream::{StreamExt, TryStreamExt}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Charset, Vec1}; use imap_codec::imap_types::fetch::MessageDataItem; use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType}; use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; @@ -629,7 +629,7 @@ impl MailboxView { mod tests { use super::*; use imap_codec::encode::Encoder; - use imap_codec::imap_types::core::NonEmptyVec; + use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::fetch::Section; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName}; use imap_codec::imap_types::response::Response; @@ -749,7 +749,7 @@ mod tests { let test_repr = Response::Data(Data::Fetch { seq: NonZeroU32::new(1).unwrap(), - items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure( + items: Vec1::from(MessageDataItem::Body(mime_view::bodystructure( &message.child, false, )?)), diff --git a/src/imap/mime_view.rs b/src/imap/mime_view.rs index 8fc043b..8bbbd2d 100644 --- a/src/imap/mime_view.rs +++ b/src/imap/mime_view.rs @@ -8,7 +8,7 @@ use imap_codec::imap_types::body::{ BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData, SpecificFields, }; -use imap_codec::imap_types::core::{AString, IString, NString, NonEmptyVec}; +use imap_codec::imap_types::core::{AString, IString, NString, Vec1}; use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection}; use eml_codec::{ @@ -141,8 +141,8 @@ impl<'a> NodeMime<'a> { enum SubsettedSection<'a> { Part, Header, - HeaderFields(&'a NonEmptyVec>), - HeaderFieldsNot(&'a NonEmptyVec>), + HeaderFields(&'a Vec1>), + HeaderFieldsNot(&'a Vec1>), Text, Mime, } @@ -238,7 +238,7 @@ impl<'a> SelectedMime<'a> { /// case-insensitive but otherwise exact. fn header_fields( &self, - fields: &'a NonEmptyVec>, + fields: &'a Vec1>, invert: bool, ) -> Result> { // Build a lowercase ascii hashset with the fields to fetch @@ -398,8 +398,8 @@ impl<'a> NodeMult<'a> { .filter_map(|inner| NodeMime(&inner).structure(is_ext).ok()) .collect::>(); - NonEmptyVec::validate(&inner_bodies)?; - let bodies = NonEmptyVec::unvalidated(inner_bodies); + Vec1::validate(&inner_bodies)?; + let bodies = Vec1::unvalidated(inner_bodies); Ok(BodyStructure::Multi { bodies, diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 544086e..dbf72fe 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,7 +15,7 @@ mod session; use std::net::SocketAddr; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; @@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut; const PIPELINABLE_COMMANDS: usize = 64; -#[derive(Debug)] -enum LoopMode { - Quit, - Interactive, - Idle(BytesMut, Arc), -} - // @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, @@ -163,7 +156,7 @@ impl NetLoop { async fn handler(ctx: ClientContext, sock: AnyStream) { let addr = ctx.addr.clone(); - let nl = match Self::new(ctx, sock).await { + let mut nl = match Self::new(ctx, sock).await { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl @@ -241,85 +234,111 @@ impl NetLoop { tracing::info!("runner is quitting"); } - async fn core(mut self) -> Result<()> { - tracing::trace!("Starting the core loop"); - let mut mode = LoopMode::Interactive; + async fn core(&mut self) -> Result<()> { + let mut maybe_idle: Option> = None; loop { - tracing::trace!(mode=?mode, "Core loop iter"); - mode = match mode { - LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, - LoopMode::Quit => break, - } - } - Ok(()) - } - - async fn interactive_mode(&mut self) -> Result { - tokio::select! { - // Managing imap_flow stuff - srv_evt = self.server.progress() => match srv_evt? { - ServerFlowEvent::ResponseSent { handle: _handle, response } => { - match response { - Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), - _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), - } - }, - ServerFlowEvent::CommandReceived { command } => { - match self.cmd_tx.try_send(Request::ImapCommand(command)) { - Ok(_) => (), - Err(mpsc::error::TrySendError::Full(_)) => { - self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + tokio::select! { + // Managing imap_flow stuff + srv_evt = self.server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => return Ok(()), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } - _ => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + ServerFlowEvent::CommandReceived { command } => { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } + } + }, + ServerFlowEvent::IdleCommandReceived { tag } => { + match self.cmd_tx.try_send(Request::IdleStart(tag)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } } } - }, - flow => { - self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); - } - }, - - // Managing response generated by Aerogramme - maybe_msg = self.resp_rx.recv() => match maybe_msg { - Some(ResponseOrIdle::Response(response)) => { - tracing::trace!("Interactive, server has a response for the client"); - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => self.server.enqueue_data(d), - Body::Status(s) => self.server.enqueue_status(s), - }; + ServerFlowEvent::IdleDoneReceived => { + tracing::trace!("client sent DONE and want to stop IDLE"); + maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one(); + maybe_idle = None; + } + flow => { + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } - self.server.enqueue_status(response.completion); - }, - Some(ResponseOrIdle::StartIdle(stop)) => { - tracing::trace!("Interactive, server agreed to switch in idle mode"); - let cr = CommandContinuationRequest::basic(None, "Idling")?; - self.server.enqueue_continuation(cr); - self.cmd_tx.try_send(Request::Idle)?; - return Ok(LoopMode::Idle(BytesMut::new(), stop)) - }, - None => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, - Some(_) => unreachable!(), - }, + // Managing response generated by Aerogramme + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + tracing::trace!("Interactive, server has a response for the client"); + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::IdleAccept(stop)) => { + tracing::trace!("Interactive, server agreed to switch in idle mode"); + let cr = CommandContinuationRequest::basic(None, "Idling")?; + self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?; + self.cmd_tx.try_send(Request::IdlePoll)?; + if maybe_idle.is_some() { + bail!("Can't start IDLE if already idling"); + } + maybe_idle = Some(stop); + }, + Some(ResponseOrIdle::IdleEvent(elems)) => { + tracing::trace!("server imap session has some change to communicate to the client"); + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.cmd_tx.try_send(Request::IdlePoll)?; + }, + Some(ResponseOrIdle::IdleReject(response)) => { + tracing::trace!("inform client that session rejected idle"); + self.server + .idle_reject(response.completion) + .or(Err(anyhow!("wrong reject command")))?; + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + Some(_) => unreachable!(), - // When receiving a CTRL+C - _ = self.ctx.must_exit.changed() => { - tracing::trace!("Interactive, CTRL+C, exiting"); - self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); - }, - }; - Ok(LoopMode::Interactive) + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + tracing::trace!("Interactive, CTRL+C, exiting"); + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; + } } + /* async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc) -> Result { // Flush send loop { @@ -398,5 +417,5 @@ impl NetLoop { return Ok(LoopMode::Interactive) }, }; - } + }*/ } diff --git a/src/imap/request.rs b/src/imap/request.rs index 49b4992..cff18a3 100644 --- a/src/imap/request.rs +++ b/src/imap/request.rs @@ -1,7 +1,9 @@ use imap_codec::imap_types::command::Command; +use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Request { ImapCommand(Command<'static>), - Idle, + IdleStart(Tag<'static>), + IdlePoll, } diff --git a/src/imap/response.rs b/src/imap/response.rs index 40e6927..b6a0e98 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -118,6 +118,7 @@ impl<'a> Response<'a> { #[derive(Debug)] pub enum ResponseOrIdle { Response(Response<'static>), - StartIdle(Arc), + IdleAccept(Arc), + IdleReject(Response<'static>), IdleEvent(Vec>), } diff --git a/src/imap/search.rs b/src/imap/search.rs index 4ff70ee..37a7e9e 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,6 +1,6 @@ use std::num::{NonZeroU32, NonZeroU64}; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; @@ -48,7 +48,7 @@ impl<'a> Criteria<'a> { let mut new_vec = base.0.into_inner(); new_vec.extend_from_slice(ext.0.as_ref()); let seq = SequenceSet( - NonEmptyVec::try_from(new_vec) + Vec1::try_from(new_vec) .expect("merging non empty vec lead to non empty vec"), ); (seq, x) diff --git a/src/imap/session.rs b/src/imap/session.rs index 12bbfee..9165574 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -4,8 +4,8 @@ use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; -use anyhow::{anyhow, bail, Result}; -use imap_codec::imap_types::command::Command; +use anyhow::{anyhow, bail, Result, Context}; +use imap_codec::imap_types::{core::Tag, command::Command}; //----- pub struct Instance { @@ -27,13 +27,44 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::Idle => self.idle().await, + Request::IdleStart(tag) => self.idle_init(tag), + Request::IdlePoll => self.idle_poll().await, Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self) -> ResponseOrIdle { - match self.idle_happy().await { + pub fn idle_init(&mut self, tag: Tag<'static>) -> ResponseOrIdle { + // Build transition + //@FIXME the notifier should be hidden inside the state and thus not part of the transition! + let transition = flow::Transition::Idle(tag.clone(), tokio::sync::Notify::new()); + + // Try to apply the transition and get the stop notifier + let maybe_stop = self + .state + .apply(transition) + .context("IDLE transition failed") + .and_then(|_| self.state.notify().ok_or(anyhow!("IDLE state has no Notify object"))); + + // Build an appropriate response + match maybe_stop { + Ok(stop) => ResponseOrIdle::IdleAccept(stop), + Err(e) => { + tracing::error!(err=?e, "unable to init idle due to a transition error"); + //ResponseOrIdle::IdleReject(tag) + let no = Response::build() + .tag(tag) + .message( + "Internal error, processing command triggered an illegal IMAP state transition", + ) + .no() + .unwrap(); + ResponseOrIdle::IdleReject(no) + } + } + } + + pub async fn idle_poll(&mut self) -> ResponseOrIdle { + match self.idle_poll_happy().await { Ok(r) => r, Err(e) => { tracing::error!(err=?e, "something bad happened in idle"); @@ -42,7 +73,7 @@ impl Instance { } } - pub async fn idle_happy(&mut self) -> Result { + pub async fn idle_poll_happy(&mut self) -> Result { let (mbx, tag, stop) = match &mut self.state { flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), _ => bail!("Invalid session state, can't idle"), @@ -128,10 +159,11 @@ impl Instance { .bad() .unwrap()); } + ResponseOrIdle::Response(resp) - match &self.state { + /*match &self.state { flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), - } + }*/ } } -- cgit v1.2.3 From 9b26e251e3d6d7b850064a2c1ed5bb5870918f1e Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 17:31:03 +0100 Subject: formatting --- src/imap/command/authenticated.rs | 11 ++++---- src/imap/command/selected.rs | 7 ++++-- src/imap/flow.rs | 2 +- src/imap/mailbox_view.rs | 10 +++++--- src/imap/mod.rs | 2 +- src/imap/session.rs | 10 +++++--- src/mail/query.rs | 53 +++++++++++++++++++++------------------ 7 files changed, 54 insertions(+), 41 deletions(-) (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 129c2fd..eb8833d 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result}; use imap_codec::imap_types::command::{ Command, CommandBody, ListReturnItem, SelectExamineModifier, }; -use imap_codec::imap_types::core::{Atom, Literal, Vec1, QuotedChar}; +use imap_codec::imap_types::core::{Atom, Literal, QuotedChar, Vec1}; use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::flag::{Flag, FlagNameAttribute}; @@ -14,12 +14,12 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; use imap_codec::imap_types::response::{Code, CodeOther, Data}; use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName}; -use crate::imap::Body; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, MailboxName}; use crate::imap::flow; use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; +use crate::imap::Body; use crate::mail::uidindex::*; use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW}; @@ -560,8 +560,6 @@ impl<'a> AuthenticatedContext<'a> { ) -> Result<(Response<'static>, flow::Transition)> { let append_tag = self.req.tag.clone(); match self.append_internal(mailbox, flags, date, message).await { - - Ok((_mb_view, uidvalidity, uid, _modseq)) => Ok(( Response::build() .tag(append_tag) @@ -623,8 +621,9 @@ impl<'a> AuthenticatedContext<'a> { let flags = flags.iter().map(|x| x.to_string()).collect::>(); // TODO: filter allowed flags? ping @Quentin - let (uidvalidity, uid, modseq) = view.internal.mailbox.append(msg, None, &flags[..]).await?; - //let unsollicited = view.update(UpdateParameters::default()).await?; + let (uidvalidity, uid, modseq) = + view.internal.mailbox.append(msg, None, &flags[..]).await?; + //let unsollicited = view.update(UpdateParameters::default()).await?; Ok((view, uidvalidity, uid, modseq)) } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index e871509..d000905 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; -use imap_codec::imap_types::core::{Vec1, Charset}; +use imap_codec::imap_types::core::{Charset, Vec1}; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; @@ -59,7 +59,10 @@ pub async fn dispatch<'a>( charset, criteria, uid, - } => ctx.search(charset, &SearchKey::And(criteria.clone()), uid).await, + } => { + ctx.search(charset, &SearchKey::And(criteria.clone()), uid) + .await + } CommandBody::Expunge { // UIDPLUS (rfc4315) uid_sequence_set, diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 4405e71..e372d69 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -2,8 +2,8 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; -use tokio::sync::Notify; use imap_codec::imap_types::core::Tag; +use tokio::sync::Notify; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index f6395b4..1c53b93 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -370,7 +370,7 @@ impl MailboxView { .fetch() .zip(futures::stream::iter(mail_idx_list)) // [3/6] Derive an IMAP-specific view from the results, apply the filters - .map(|(maybe_qr, midx)| match maybe_qr { + .map(|(maybe_qr, midx)| match maybe_qr { Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), Err(e) => Err(e), }) @@ -381,7 +381,10 @@ impl MailboxView { // [5/6] Register the \Seen flags if matches!(seen, SeenFlag::MustAdd) { let seen_flag = Flag::Seen.to_string(); - self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?; + self.internal + .mailbox + .add_flags(midx.uuid, &[seen_flag]) + .await?; } Ok::<_, anyhow::Error>(body) @@ -429,13 +432,12 @@ impl MailboxView { Ok(true) => Some(Ok(*midx)), Ok(_) => None, Err(e) => Some(Err(e)), - } + }, Err(e) => Some(Err(e)), }; futures::future::ready(r) }); - // 7. Chain both streams (part resolved from index, part resolved from metadata+body) let main_stream = futures::stream::iter(kept_idx) .map(Ok) diff --git a/src/imap/mod.rs b/src/imap/mod.rs index dbf72fe..02ab9ce 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,7 +15,7 @@ mod session; use std::net::SocketAddr; -use anyhow::{anyhow, bail, Result, Context}; +use anyhow::{anyhow, bail, Context, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; diff --git a/src/imap/session.rs b/src/imap/session.rs index 9165574..fa3232a 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -4,8 +4,8 @@ use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; -use anyhow::{anyhow, bail, Result, Context}; -use imap_codec::imap_types::{core::Tag, command::Command}; +use anyhow::{anyhow, bail, Context, Result}; +use imap_codec::imap_types::{command::Command, core::Tag}; //----- pub struct Instance { @@ -43,7 +43,11 @@ impl Instance { .state .apply(transition) .context("IDLE transition failed") - .and_then(|_| self.state.notify().ok_or(anyhow!("IDLE state has no Notify object"))); + .and_then(|_| { + self.state + .notify() + .ok_or(anyhow!("IDLE state has no Notify object")) + }); // Build an appropriate response match maybe_stop { diff --git a/src/mail/query.rs b/src/mail/query.rs index cbc5fe6..3e6fe99 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,8 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{Stream, StreamExt, BoxStream}; use futures::future::FutureExt; +use futures::stream::{BoxStream, Stream, StreamExt}; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -34,7 +34,10 @@ impl QueryScope { impl<'a, 'b> Query<'a, 'b> { pub fn fetch(&self) -> BoxStream> { match self.scope { - QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))), + QueryScope::Index => Box::pin( + futures::stream::iter(self.emails) + .map(|&uuid| Ok(QueryResult::IndexResult { uuid })), + ), QueryScope::Partial => Box::pin(self.partial()), QueryScope::Full => Box::pin(self.full()), } @@ -42,40 +45,42 @@ impl<'a, 'b> Query<'a, 'b> { // --- functions below are private *for reasons* fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { - async move { - let maybe_meta_list: Result> = self.frozen.mailbox.fetch_meta(self.emails).await; + async move { + let maybe_meta_list: Result> = + self.frozen.mailbox.fetch_meta(self.emails).await; let list_res = maybe_meta_list - .map(|meta_list| meta_list - .into_iter() - .zip(self.emails) - .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) - .collect() - ) + .map(|meta_list| { + meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + }) .unwrap_or_else(|e| vec![Err(e)]); futures::stream::iter(list_res) - }.flatten_stream() + } + .flatten_stream() } fn full<'d>(&'d self) -> impl Stream> + 'd + Send { - self.partial() - .then(move |maybe_meta| async move { - let meta = maybe_meta?; + self.partial().then(move |maybe_meta| async move { + let meta = maybe_meta?; - let content = self - .frozen - .mailbox - .fetch_full( - *meta.uuid(), - &meta + let content = self + .frozen + .mailbox + .fetch_full( + *meta.uuid(), + &meta .metadata() .expect("meta to be PartialResult") .message_key, - ) - .await?; + ) + .await?; - Ok(meta.into_full(content).expect("meta to be PartialResult")) - }) + Ok(meta.into_full(content).expect("meta to be PartialResult")) + }) } } -- cgit v1.2.3 From 02a8537556236437d905cefe8aa2c5d0a96f129a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 23 Feb 2024 17:01:51 +0100 Subject: Replace with a single AWS HTTP client --- src/login/ldap_provider.rs | 7 ++++++- src/login/static_provider.rs | 6 ++++-- src/storage/garage.rs | 49 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 48 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index e73e1dc..42c993d 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -21,6 +21,7 @@ pub struct LdapLoginProvider { storage_specific: StorageSpecific, in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, } enum BucketSource { @@ -91,7 +92,11 @@ impl LdapLoginProvider { mail_attr: config.mail_attr, crypto_root_attr: config.crypto_root_attr, storage_specific: specific, + //@FIXME should be created outside of the login provider + //Login provider should return only a cryptoroot + a storage URI + //storage URI that should be resolved outside... in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new(), }) } @@ -114,7 +119,7 @@ impl LdapLoginProvider { BucketSource::Attr(a) => get_attr(user, &a)?, }; - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: from_config.aws_region.clone(), s3_endpoint: from_config.s3_endpoint.clone(), k2v_endpoint: from_config.k2v_endpoint.clone(), diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 1e1ecbf..e190a91 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -25,6 +25,7 @@ pub struct UserDatabase { pub struct StaticLoginProvider { user_db: watch::Receiver, in_memory_store: storage::in_memory::MemDb, + garage_store: storage::garage::GarageRoot, } pub async fn update_user_list(config: PathBuf, up: watch::Sender) -> Result<()> { @@ -84,6 +85,7 @@ impl StaticLoginProvider { Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new(), + garage_store: storage::garage::GarageRoot::new(), }) } } @@ -109,7 +111,7 @@ impl LoginProvider for StaticLoginProvider { let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(username).await, StaticStorage::Garage(grgconf) => { - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), @@ -140,7 +142,7 @@ impl LoginProvider for StaticLoginProvider { let storage: storage::Builder = match &user.config.storage { StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, StaticStorage::Garage(grgconf) => { - storage::garage::GarageBuilder::new(storage::garage::GarageConf { + self.garage_store.user(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 709e729..870854a 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,7 +1,38 @@ use crate::storage::*; use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_runtime_api::client::http::SharedHttpClient; +//use hyper_rustls::HttpsConnector; +//use hyper_util::client::legacy::connect::HttpConnector; + + use serde::Serialize; +pub struct GarageRoot { + aws_http: SharedHttpClient, +} + +impl GarageRoot { + pub fn new() -> Self { + /*let http = hyper_rustls::HttpsConnectorBuilder::new() + .https_or_http() + .with_native_roots() + .enable_http1() + .enable_http2() + .build();*/ + let aws_http = HyperClientBuilder::new().build_https(); + Self { aws_http } + } + + pub fn user(&self, conf: GarageConf) -> anyhow::Result> { + let mut unicity: Vec = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.append(&mut rmp_serde::to_vec(&conf)?); + + Ok(Arc::new(GarageUser { conf, aws_http: self.aws_http.clone(), unicity })) + } +} + #[derive(Clone, Debug, Serialize)] pub struct GarageConf { pub region: String, @@ -12,23 +43,18 @@ pub struct GarageConf { pub bucket: String, } +//@FIXME we should get rid of this builder +//and allocate a S3 + K2V client only once per user +//(and using a shared HTTP client) #[derive(Clone, Debug)] -pub struct GarageBuilder { +pub struct GarageUser { conf: GarageConf, + aws_http: SharedHttpClient, unicity: Vec, } -impl GarageBuilder { - pub fn new(conf: GarageConf) -> anyhow::Result> { - let mut unicity: Vec = vec![]; - unicity.extend_from_slice(file!().as_bytes()); - unicity.append(&mut rmp_serde::to_vec(&conf)?); - Ok(Arc::new(Self { conf, unicity })) - } -} - #[async_trait] -impl IBuilder for GarageBuilder { +impl IBuilder for GarageUser { async fn build(&self) -> Result { let s3_creds = s3::config::Credentials::new( self.conf.aws_access_key_id.clone(), @@ -41,6 +67,7 @@ impl IBuilder for GarageBuilder { let sdk_config = aws_config::from_env() .region(aws_config::Region::new(self.conf.region.clone())) .credentials_provider(s3_creds) + .http_client(self.aws_http.clone()) .endpoint_url(self.conf.s3_endpoint.clone()) .load() .await; -- cgit v1.2.3 From 2a084df300dc30d40cf3599c1cb7a90132d5a6e8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 23 Feb 2024 17:31:29 +0100 Subject: Also share HTTPClient for K2V --- src/login/ldap_provider.rs | 2 +- src/login/static_provider.rs | 2 +- src/storage/garage.rs | 38 +++++++++++++++++++++++--------------- 3 files changed, 25 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 42c993d..0af5676 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -96,7 +96,7 @@ impl LdapLoginProvider { //Login provider should return only a cryptoroot + a storage URI //storage URI that should be resolved outside... in_memory_store: storage::in_memory::MemDb::new(), - garage_store: storage::garage::GarageRoot::new(), + garage_store: storage::garage::GarageRoot::new()?, }) } diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index e190a91..79626df 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -85,7 +85,7 @@ impl StaticLoginProvider { Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new(), - garage_store: storage::garage::GarageRoot::new(), + garage_store: storage::garage::GarageRoot::new()?, }) } } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 870854a..a23bbb2 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,27 +1,29 @@ -use crate::storage::*; use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use aws_smithy_runtime_api::client::http::SharedHttpClient; -//use hyper_rustls::HttpsConnector; -//use hyper_util::client::legacy::connect::HttpConnector; - - +use hyper_rustls::HttpsConnector; +use hyper_util::rt::TokioExecutor; +use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; use serde::Serialize; +use crate::storage::*; + pub struct GarageRoot { + k2v_http: HttpClient, k2v_client::Body>, aws_http: SharedHttpClient, } impl GarageRoot { - pub fn new() -> Self { - /*let http = hyper_rustls::HttpsConnectorBuilder::new() - .https_or_http() - .with_native_roots() - .enable_http1() - .enable_http2() - .build();*/ + pub fn new() -> anyhow::Result { + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots()? + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); let aws_http = HyperClientBuilder::new().build_https(); - Self { aws_http } + Ok(Self { k2v_http, aws_http }) } pub fn user(&self, conf: GarageConf) -> anyhow::Result> { @@ -29,7 +31,12 @@ impl GarageRoot { unicity.extend_from_slice(file!().as_bytes()); unicity.append(&mut rmp_serde::to_vec(&conf)?); - Ok(Arc::new(GarageUser { conf, aws_http: self.aws_http.clone(), unicity })) + Ok(Arc::new(GarageUser { + conf, + aws_http: self.aws_http.clone(), + k2v_http: self.k2v_http.clone(), + unicity + })) } } @@ -50,6 +57,7 @@ pub struct GarageConf { pub struct GarageUser { conf: GarageConf, aws_http: SharedHttpClient, + k2v_http: HttpClient, k2v_client::Body>, unicity: Vec, } @@ -87,7 +95,7 @@ impl IBuilder for GarageUser { user_agent: None, }; - let k2v_client = match k2v_client::K2vClient::new(k2v_config) { + let k2v_client = match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { Err(e) => { tracing::error!("unable to build k2v client: {}", e); return Err(StorageError::Internal); -- cgit v1.2.3 From 0b122582e8374b9de43c8f096534301e18105f0c Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 23 Feb 2024 18:28:04 +0100 Subject: fix code formatting --- src/storage/garage.rs | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/storage/garage.rs b/src/storage/garage.rs index a23bbb2..7152764 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -2,8 +2,8 @@ use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectEr use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use aws_smithy_runtime_api::client::http::SharedHttpClient; use hyper_rustls::HttpsConnector; -use hyper_util::rt::TokioExecutor; use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; +use hyper_util::rt::TokioExecutor; use serde::Serialize; use crate::storage::*; @@ -15,13 +15,13 @@ pub struct GarageRoot { impl GarageRoot { pub fn new() -> anyhow::Result { - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots()? - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots()? + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); let aws_http = HyperClientBuilder::new().build_https(); Ok(Self { k2v_http, aws_http }) } @@ -31,11 +31,11 @@ impl GarageRoot { unicity.extend_from_slice(file!().as_bytes()); unicity.append(&mut rmp_serde::to_vec(&conf)?); - Ok(Arc::new(GarageUser { - conf, - aws_http: self.aws_http.clone(), + Ok(Arc::new(GarageUser { + conf, + aws_http: self.aws_http.clone(), k2v_http: self.k2v_http.clone(), - unicity + unicity, })) } } @@ -95,13 +95,14 @@ impl IBuilder for GarageUser { user_agent: None, }; - let k2v_client = match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { - Err(e) => { - tracing::error!("unable to build k2v client: {}", e); - return Err(StorageError::Internal); - } - Ok(v) => v, - }; + let k2v_client = + match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { + Err(e) => { + tracing::error!("unable to build k2v client: {}", e); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; Ok(Box::new(GarageStore { bucket: self.conf.bucket.clone(), -- cgit v1.2.3