aboutsummaryrefslogtreecommitdiff
path: root/src/imap
diff options
context:
space:
mode:
Diffstat (limited to 'src/imap')
-rw-r--r--src/imap/attributes.rs50
-rw-r--r--src/imap/capability.rs14
-rw-r--r--src/imap/command/authenticated.rs6
-rw-r--r--src/imap/command/examined.rs164
-rw-r--r--src/imap/command/mod.rs1
-rw-r--r--src/imap/command/selected.rs110
-rw-r--r--src/imap/flow.rs80
-rw-r--r--src/imap/mailbox_view.rs80
-rw-r--r--src/imap/mod.rs275
-rw-r--r--src/imap/request.rs7
-rw-r--r--src/imap/response.rs11
-rw-r--r--src/imap/search.rs29
-rw-r--r--src/imap/session.rs69
13 files changed, 533 insertions, 363 deletions
diff --git a/src/imap/attributes.rs b/src/imap/attributes.rs
index d094f1a..89446a8 100644
--- a/src/imap/attributes.rs
+++ b/src/imap/attributes.rs
@@ -1,5 +1,5 @@
-use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
use imap_codec::imap_types::command::FetchModifier;
+use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
/// Internal decisions based on fetched attributes
/// passed by the client
@@ -8,7 +8,11 @@ pub struct AttributesProxy {
pub attrs: Vec<MessageDataItemName<'static>>,
}
impl AttributesProxy {
- pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self {
+ pub fn new(
+ attrs: &MacroOrMessageDataItemNames<'static>,
+ modifiers: &[FetchModifier],
+ is_uid_fetch: bool,
+ ) -> Self {
// Expand macros
let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => {
@@ -44,32 +48,30 @@ impl AttributesProxy {
}
pub fn is_enabling_condstore(&self) -> bool {
- self.attrs.iter().any(|x| {
- matches!(x, MessageDataItemName::ModSeq)
- })
+ self.attrs
+ .iter()
+ .any(|x| matches!(x, MessageDataItemName::ModSeq))
}
pub fn need_body(&self) -> bool {
- self.attrs.iter().any(|x| {
- match x {
- MessageDataItemName::Body
- | MessageDataItemName::Rfc822
- | MessageDataItemName::Rfc822Text
- | MessageDataItemName::BodyStructure => true,
+ self.attrs.iter().any(|x| match x {
+ MessageDataItemName::Body
+ | MessageDataItemName::Rfc822
+ | MessageDataItemName::Rfc822Text
+ | MessageDataItemName::BodyStructure => true,
- MessageDataItemName::BodyExt {
- section: Some(section),
- partial: _,
- peek: _,
- } => match section {
- Section::Header(None)
- | Section::HeaderFields(None, _)
- | Section::HeaderFieldsNot(None, _) => false,
- _ => true,
- },
- MessageDataItemName::BodyExt { .. } => true,
- _ => false,
- }
+ MessageDataItemName::BodyExt {
+ section: Some(section),
+ partial: _,
+ peek: _,
+ } => match section {
+ Section::Header(None)
+ | Section::HeaderFields(None, _)
+ | Section::HeaderFieldsNot(None, _) => false,
+ _ => true,
+ },
+ MessageDataItemName::BodyExt { .. } => true,
+ _ => false,
})
}
}
diff --git a/src/imap/capability.rs b/src/imap/capability.rs
index 6533ccb..7959832 100644
--- a/src/imap/capability.rs
+++ b/src/imap/capability.rs
@@ -1,4 +1,4 @@
-use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier};
+use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability;
@@ -30,6 +30,7 @@ impl Default for ServerCapability {
Capability::Enable,
Capability::Move,
Capability::LiteralPlus,
+ Capability::Idle,
capability_unselect(),
capability_condstore(),
//capability_qresync(),
@@ -72,7 +73,6 @@ impl ClientStatus {
}
}
-
pub struct ClientCapability {
pub condstore: ClientStatus,
pub utf8kind: Option<Utf8Kind>,
@@ -100,13 +100,19 @@ impl ClientCapability {
}
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
- if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) {
+ if mods
+ .iter()
+ .any(|x| matches!(x, FetchModifier::ChangedSince(..)))
+ {
self.enable_condstore()
}
}
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
- if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) {
+ if mods
+ .iter()
+ .any(|x| matches!(x, StoreModifier::UnchangedSince(..)))
+ {
self.enable_condstore()
}
}
diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs
index 9b6bb24..3fd132f 100644
--- a/src/imap/command/authenticated.rs
+++ b/src/imap/command/authenticated.rs
@@ -405,7 +405,7 @@ impl<'a> AuthenticatedContext<'a> {
it is therefore correct to not return it even if there are unseen messages
RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE
For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications.
-
+
20 select "INBOX.achats"
* FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1)
@@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadWrite)
.set_body(data)
.ok()?,
- flow::Transition::Select(mb),
+ flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite),
))
}
@@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadOnly)
.set_body(data)
.ok()?,
- flow::Transition::Examine(mb),
+ flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly),
))
}
diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs
deleted file mode 100644
index 9fc0990..0000000
--- a/src/imap/command/examined.rs
+++ /dev/null
@@ -1,164 +0,0 @@
-use std::sync::Arc;
-use std::num::NonZeroU64;
-
-use anyhow::Result;
-use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier};
-use imap_codec::imap_types::core::Charset;
-use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
-use imap_codec::imap_types::search::SearchKey;
-use imap_codec::imap_types::sequence::SequenceSet;
-
-use crate::imap::attributes::AttributesProxy;
-use crate::imap::capability::{ClientCapability, ServerCapability};
-use crate::imap::command::{anystate, authenticated};
-use crate::imap::flow;
-use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
-use crate::imap::response::Response;
-use crate::mail::user::User;
-
-pub struct ExaminedContext<'a> {
- pub req: &'a Command<'static>,
- pub user: &'a Arc<User>,
- pub mailbox: &'a mut MailboxView,
- pub server_capabilities: &'a ServerCapability,
- pub client_capabilities: &'a mut ClientCapability,
-}
-
-pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
- match &ctx.req.body {
- // Any State
- // noop is specific to this state
- CommandBody::Capability => {
- anystate::capability(ctx.req.tag.clone(), ctx.server_capabilities)
- }
- CommandBody::Logout => anystate::logout(),
-
- // Specific to the EXAMINE state (specialization of the SELECTED state)
- // ~3 commands -> close, fetch, search + NOOP
- CommandBody::Close => ctx.close("CLOSE").await,
- CommandBody::Fetch {
- sequence_set,
- macro_or_item_names,
- modifiers,
- uid,
- } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
- CommandBody::Search {
- charset,
- criteria,
- uid,
- } => ctx.search(charset, criteria, uid).await,
- CommandBody::Noop | CommandBody::Check => ctx.noop().await,
- CommandBody::Expunge { .. } | CommandBody::Store { .. } => Ok((
- Response::build()
- .to_req(ctx.req)
- .message("Forbidden command: can't write in read-only mode (EXAMINE)")
- .no()?,
- flow::Transition::None,
- )),
-
- // UNSELECT extension (rfc3691)
- CommandBody::Unselect => ctx.close("UNSELECT").await,
-
- // In examined mode, we fallback to authenticated when needed
- _ => {
- authenticated::dispatch(authenticated::AuthenticatedContext {
- req: ctx.req,
- server_capabilities: ctx.server_capabilities,
- client_capabilities: ctx.client_capabilities,
- user: ctx.user,
- })
- .await
- }
- }
-}
-
-// --- PRIVATE ---
-
-impl<'a> ExaminedContext<'a> {
- /// CLOSE in examined state is not the same as in selected state
- /// (in selected state it also does an EXPUNGE, here it doesn't)
- async fn close(self, kind: &str) -> Result<(Response<'static>, flow::Transition)> {
- Ok((
- Response::build()
- .to_req(self.req)
- .message(format!("{} completed", kind))
- .ok()?,
- flow::Transition::Unselect,
- ))
- }
-
- pub async fn fetch(
- self,
- sequence_set: &SequenceSet,
- attributes: &'a MacroOrMessageDataItemNames<'static>,
- modifiers: &[FetchModifier],
- uid: &bool,
- ) -> Result<(Response<'static>, flow::Transition)> {
- let ap = AttributesProxy::new(attributes, modifiers, *uid);
- let mut changed_since: Option<NonZeroU64> = None;
- modifiers.iter().for_each(|m| match m {
- FetchModifier::ChangedSince(val) => {
- changed_since = Some(*val);
- },
- });
-
- match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
- Ok(resp) => {
- // Capabilities enabling logic only on successful command
- // (according to my understanding of the spec)
- self.client_capabilities.attributes_enable(&ap);
- self.client_capabilities.fetch_modifiers_enable(modifiers);
-
- Ok((
- Response::build()
- .to_req(self.req)
- .message("FETCH completed")
- .set_body(resp)
- .ok()?,
- flow::Transition::None,
- ))
- },
- Err(e) => Ok((
- Response::build()
- .to_req(self.req)
- .message(e.to_string())
- .no()?,
- flow::Transition::None,
- )),
- }
- }
-
- pub async fn search(
- self,
- charset: &Option<Charset<'a>>,
- criteria: &SearchKey<'a>,
- uid: &bool,
- ) -> Result<(Response<'static>, flow::Transition)> {
- let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?;
- if enable_condstore {
- self.client_capabilities.enable_condstore();
- }
- Ok((
- Response::build()
- .to_req(self.req)
- .set_body(found)
- .message("SEARCH completed")
- .ok()?,
- flow::Transition::None,
- ))
- }
-
- pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
- self.mailbox.internal.mailbox.force_sync().await?;
-
- let updates = self.mailbox.update(UpdateParameters::default()).await?;
- Ok((
- Response::build()
- .to_req(self.req)
- .message("NOOP completed.")
- .set_body(updates)
- .ok()?,
- flow::Transition::None,
- ))
- }
-}
diff --git a/src/imap/command/mod.rs b/src/imap/command/mod.rs
index dc95746..073040e 100644
--- a/src/imap/command/mod.rs
+++ b/src/imap/command/mod.rs
@@ -1,7 +1,6 @@
pub mod anonymous;
pub mod anystate;
pub mod authenticated;
-pub mod examined;
pub mod selected;
use crate::mail::user::INBOX;
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
index c13b71a..98b3b00 100644
--- a/src/imap/command/selected.rs
+++ b/src/imap/command/selected.rs
@@ -1,5 +1,5 @@
-use std::sync::Arc;
use std::num::NonZeroU64;
+use std::sync::Arc;
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
@@ -11,12 +11,12 @@ use imap_codec::imap_types::response::{Code, CodeOther};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
+use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response;
-use crate::imap::attributes::AttributesProxy;
use crate::mail::user::User;
pub struct SelectedContext<'a> {
@@ -25,6 +25,7 @@ pub struct SelectedContext<'a> {
pub mailbox: &'a mut MailboxView,
pub server_capabilities: &'a ServerCapability,
pub client_capabilities: &'a mut ClientCapability,
+ pub perm: &'a flow::MailboxPerm,
}
pub async fn dispatch<'a>(
@@ -39,14 +40,20 @@ pub async fn dispatch<'a>(
CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP)
- CommandBody::Close => ctx.close().await,
+ CommandBody::Close => match ctx.perm {
+ flow::MailboxPerm::ReadWrite => ctx.close().await,
+ flow::MailboxPerm::ReadOnly => ctx.examine_close().await,
+ },
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Fetch {
sequence_set,
macro_or_item_names,
modifiers,
uid,
- } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
+ } => {
+ ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
+ .await
+ }
CommandBody::Search {
charset,
criteria,
@@ -60,7 +67,10 @@ pub async fn dispatch<'a>(
flags,
modifiers,
uid,
- } => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await,
+ } => {
+ ctx.store(sequence_set, kind, response, flags, modifiers, uid)
+ .await
+ }
CommandBody::Copy {
sequence_set,
mailbox,
@@ -75,6 +85,15 @@ pub async fn dispatch<'a>(
// UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.unselect().await,
+ // IDLE extension (rfc2177)
+ CommandBody::Idle => Ok((
+ Response::build()
+ .to_req(ctx.req)
+ .message("DUMMY command due to anti-pattern in the code")
+ .ok()?,
+ flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
+ )),
+
// In selected mode, we fallback to authenticated when needed
_ => {
authenticated::dispatch(authenticated::AuthenticatedContext {
@@ -102,6 +121,18 @@ impl<'a> SelectedContext<'a> {
))
}
+ /// CLOSE in examined state is not the same as in selected state
+ /// (in selected state it also does an EXPUNGE, here it doesn't)
+ async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> {
+ Ok((
+ Response::build()
+ .to_req(self.req)
+ .message("CLOSE completed")
+ .ok()?,
+ flow::Transition::Unselect,
+ ))
+ }
+
async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
@@ -124,10 +155,14 @@ impl<'a> SelectedContext<'a> {
modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => {
changed_since = Some(*val);
- },
+ }
});
- match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
+ match self
+ .mailbox
+ .fetch(sequence_set, &ap, changed_since, uid)
+ .await
+ {
Ok(resp) => {
// Capabilities enabling logic only on successful command
// (according to my understanding of the spec)
@@ -143,7 +178,7 @@ impl<'a> SelectedContext<'a> {
.ok()?,
flow::Transition::None,
))
- },
+ }
Err(e) => Ok((
Response::build()
.to_req(self.req)
@@ -189,6 +224,10 @@ impl<'a> SelectedContext<'a> {
}
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None));
+ }
+
let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?;
@@ -211,11 +250,15 @@ impl<'a> SelectedContext<'a> {
modifiers: &[StoreModifier],
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None));
+ }
+
let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => {
unchanged_since = Some(*val);
- },
+ }
});
let (data, modified) = self
@@ -224,25 +267,30 @@ impl<'a> SelectedContext<'a> {
.await?;
let mut ok_resp = Response::build()
- .to_req(self.req)
- .message("STORE completed")
- .set_body(data);
-
+ .to_req(self.req)
+ .message("STORE completed")
+ .set_body(data);
match modified[..] {
[] => (),
[_head, ..] => {
- let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(","));
- ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes())));
- },
+ let modified_str = format!(
+ "MODIFIED {}",
+ modified
+ .into_iter()
+ .map(|x| x.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ );
+ ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(
+ modified_str.into_bytes(),
+ )));
+ }
};
-
self.client_capabilities.store_modifiers_enable(modifiers);
- Ok((ok_resp.ok()?,
- flow::Transition::None,
- ))
+ Ok((ok_resp.ok()?, flow::Transition::None))
}
async fn copy(
@@ -251,6 +299,11 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ //@FIXME Could copy be valid in EXAMINE mode?
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None));
+ }
+
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -303,6 +356,10 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None));
+ }
+
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -350,4 +407,17 @@ impl<'a> SelectedContext<'a> {
flow::Transition::None,
))
}
+
+ fn fail_read_only(&self) -> Option<Response<'static>> {
+ match self.perm {
+ flow::MailboxPerm::ReadWrite => None,
+ flow::MailboxPerm::ReadOnly => Some(
+ Response::build()
+ .to_req(self.req)
+ .message("Write command are forbidden while exmining mailbox")
+ .no()
+ .unwrap(),
+ ),
+ }
+ }
}
diff --git a/src/imap/flow.rs b/src/imap/flow.rs
index 95810c1..6ddd092 100644
--- a/src/imap/flow.rs
+++ b/src/imap/flow.rs
@@ -1,9 +1,11 @@
use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;
+use tokio::sync::Notify;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User;
+use imap_codec::imap_types::core::Tag;
#[derive(Debug)]
pub enum Error {
@@ -19,44 +21,84 @@ impl StdError for Error {}
pub enum State {
NotAuthenticated,
Authenticated(Arc<User>),
- Selected(Arc<User>, MailboxView),
- // Examined is like Selected, but indicates that the mailbox is read-only
- Examined(Arc<User>, MailboxView),
+ Selected(Arc<User>, MailboxView, MailboxPerm),
+ Idle(
+ Arc<User>,
+ MailboxView,
+ MailboxPerm,
+ Tag<'static>,
+ Arc<Notify>,
+ ),
Logout,
}
+impl fmt::Display for State {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use State::*;
+ match self {
+ NotAuthenticated => write!(f, "NotAuthenticated"),
+ Authenticated(..) => write!(f, "Authenticated"),
+ Selected(..) => write!(f, "Selected"),
+ Idle(..) => write!(f, "Idle"),
+ Logout => write!(f, "Logout"),
+ }
+ }
+}
+
+#[derive(Clone)]
+pub enum MailboxPerm {
+ ReadOnly,
+ ReadWrite,
+}
pub enum Transition {
None,
Authenticate(Arc<User>),
- Examine(MailboxView),
- Select(MailboxView),
+ Select(MailboxView, MailboxPerm),
+ Idle(Tag<'static>, Notify),
+ UnIdle,
Unselect,
Logout,
}
+impl fmt::Display for Transition {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use Transition::*;
+ match self {
+ None => write!(f, "None"),
+ Authenticate(..) => write!(f, "Authenticated"),
+ Select(..) => write!(f, "Selected"),
+ Idle(..) => write!(f, "Idle"),
+ UnIdle => write!(f, "UnIdle"),
+ Unselect => write!(f, "Unselect"),
+ Logout => write!(f, "Logout"),
+ }
+ }
+}
// See RFC3501 section 3.
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State {
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
- let new_state = match (&self, tr) {
- (_s, Transition::None) => return Ok(()),
+ tracing::debug!(state=%self, transition=%tr, "try change state");
+
+ let new_state = match (std::mem::replace(self, State::Logout), tr) {
+ (s, Transition::None) => s,
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
- (
- State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
- Transition::Select(m),
- ) => State::Selected(u.clone(), m),
- (
- State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
- Transition::Examine(m),
- ) => State::Examined(u.clone(), m),
- (State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
- State::Authenticated(u.clone())
+ (State::Authenticated(u) | State::Selected(u, _, _), Transition::Select(m, p)) => {
+ State::Selected(u, m, p)
}
+ (State::Selected(u, _, _), Transition::Unselect) => State::Authenticated(u.clone()),
+ (State::Selected(u, m, p), Transition::Idle(t, s)) => {
+ State::Idle(u, m, p, t, Arc::new(s))
+ }
+ (State::Idle(u, m, p, _, _), Transition::UnIdle) => State::Selected(u, m, p),
(_, Transition::Logout) => State::Logout,
- _ => return Err(Error::ForbiddenTransition),
+ (s, t) => {
+ tracing::error!(state=%s, transition=%t, "forbidden transition");
+ return Err(Error::ForbiddenTransition);
+ }
};
-
*self = new_state;
+ tracing::debug!(state=%self, "transition succeeded");
Ok(())
}
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index 07fa3ad..0efa987 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -1,6 +1,6 @@
+use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc;
-use std::collections::HashSet;
use anyhow::{anyhow, Error, Result};
@@ -13,11 +13,11 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
-use crate::mail::unique_ident::UniqueIdent;
use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
+use crate::mail::unique_ident::UniqueIdent;
use crate::imap::attributes::AttributesProxy;
use crate::imap::flags;
@@ -64,7 +64,7 @@ pub struct MailboxView {
impl MailboxView {
/// Creates a new IMAP view into a mailbox.
pub async fn new(mailbox: Arc<Mailbox>, is_cond: bool) -> Self {
- Self {
+ Self {
internal: mailbox.frozen().await,
is_condstore: is_cond,
}
@@ -130,11 +130,9 @@ impl MailboxView {
let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, modseq, flags)) = new_mail {
- let mut items = vec![
- MessageDataItem::Flags(
- flags.iter().filter_map(|f| flags::from_str(f)).collect(),
- ),
- ];
+ let mut items = vec![MessageDataItem::Flags(
+ flags.iter().filter_map(|f| flags::from_str(f)).collect(),
+ )];
if params.with_uid {
items.push(MessageDataItem::Uid(*uid));
@@ -169,7 +167,7 @@ impl MailboxView {
data.push(self.highestmodseq_status()?);
}
/*self.unseen_first_status()?
- .map(|unseen_status| data.push(unseen_status));*/
+ .map(|unseen_status| data.push(unseen_status));*/
Ok(data)
}
@@ -188,8 +186,8 @@ impl MailboxView {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let idx = self.index()?;
- let (editable, in_conflict) = idx
- .fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
+ let (editable, in_conflict) =
+ idx.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
for mi in editable.iter() {
match kind {
@@ -215,15 +213,30 @@ impl MailboxView {
_ => in_conflict.into_iter().map(|midx| midx.i).collect(),
};
- let summary = self.update(UpdateParameters {
- with_uid: *is_uid_store,
- with_modseq: unchanged_since.is_some(),
- silence,
- }).await?;
+ let summary = self
+ .update(UpdateParameters {
+ with_uid: *is_uid_store,
+ with_modseq: unchanged_since.is_some(),
+ silence,
+ })
+ .await?;
Ok((summary, conflict_id_or_uid))
}
+ pub async fn idle_sync(&mut self) -> Result<Vec<Body<'static>>> {
+ self.internal
+ .mailbox
+ .notify()
+ .await
+ .upgrade()
+ .ok_or(anyhow!("test"))?
+ .notified()
+ .await;
+ self.internal.mailbox.opportunistic_sync().await?;
+ self.update(UpdateParameters::default()).await
+ }
+
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
self.internal.sync().await?;
let state = self.internal.peek().await;
@@ -294,10 +307,12 @@ impl MailboxView {
ret.push((mi.uid, dest_uid));
}
- let update = self.update(UpdateParameters {
- with_uid: *is_uid_copy,
- ..UpdateParameters::default()
- }).await?;
+ let update = self
+ .update(UpdateParameters {
+ with_uid: *is_uid_copy,
+ ..UpdateParameters::default()
+ })
+ .await?;
Ok((to_state.uidvalidity, ret, update))
}
@@ -321,11 +336,7 @@ impl MailboxView {
};
tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?;
- let mail_idx_list = idx.fetch_changed_since(
- sequence_set,
- changed_since,
- *is_uid_fetch
- )?;
+ let mail_idx_list = idx.fetch_changed_since(sequence_set, changed_since, *is_uid_fetch)?;
// [2/6] Fetch the emails
let uuids = mail_idx_list
@@ -414,12 +425,19 @@ impl MailboxView {
let maybe_modseq = match is_modseq {
true => {
let final_selection = kept_idx.iter().chain(kept_query.iter());
- final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()?
- },
+ final_selection
+ .map(|in_idx| in_idx.modseq)
+ .max()
+ .map(|r| NonZeroU64::try_from(r))
+ .transpose()?
+ }
_ => None,
};
- Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq))
+ Ok((
+ vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
+ is_modseq,
+ ))
}
// ----
@@ -463,8 +481,10 @@ impl MailboxView {
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
Ok(Body::Status(Status::ok(
- None,
- Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))),
+ None,
+ Some(Code::Other(CodeOther::unvalidated(
+ format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes(),
+ ))),
"Highest",
)?))
}
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 61a265a..40c4d4f 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -8,24 +8,30 @@ mod index;
mod mail_view;
mod mailbox_view;
mod mime_view;
+mod request;
mod response;
mod search;
mod session;
use std::net::SocketAddr;
-use anyhow::Result;
+use anyhow::{bail, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
+use tokio::sync::mpsc;
use tokio::sync::watch;
+use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
use crate::config::ImapConfig;
use crate::imap::capability::ServerCapability;
+use crate::imap::request::Request;
+use crate::imap::response::{Body, ResponseOrIdle};
+use crate::imap::session::Instance;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
@@ -35,8 +41,8 @@ pub struct Server {
capabilities: ServerCapability,
}
+#[derive(Clone)]
struct ClientContext {
- stream: AnyStream,
addr: SocketAddr,
login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>,
@@ -74,13 +80,12 @@ impl Server {
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
- stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
server_capabilities: self.capabilities.clone(),
};
- let conn = tokio::spawn(client_wrapper(client));
+ let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
connections.push(conn);
}
drop(tcp);
@@ -92,46 +97,87 @@ impl Server {
}
}
-async fn client_wrapper(ctx: ClientContext) {
- let addr = ctx.addr.clone();
- match client(ctx).await {
- Ok(()) => {
- tracing::debug!("closing successful session for {:?}", addr);
- }
- Err(e) => {
- tracing::error!("closing errored session for {:?}: {}", addr, e);
+use std::sync::Arc;
+use tokio::sync::mpsc::*;
+use tokio::sync::Notify;
+use tokio_util::bytes::BytesMut;
+enum LoopMode {
+ Quit,
+ Interactive,
+ Idle(BytesMut, Arc<Notify>),
+}
+
+// @FIXME a full refactor of this part of the code will be needed sooner or later
+struct NetLoop {
+ ctx: ClientContext,
+ server: ServerFlow,
+ cmd_tx: Sender<Request>,
+ resp_rx: UnboundedReceiver<ResponseOrIdle>,
+}
+
+impl NetLoop {
+ async fn handler(ctx: ClientContext, sock: AnyStream) {
+ let addr = ctx.addr.clone();
+
+ let nl = match Self::new(ctx, sock).await {
+ Ok(nl) => {
+ tracing::debug!(addr=?addr, "netloop successfully initialized");
+ nl
+ }
+ Err(e) => {
+ tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
+ return;
+ }
+ };
+
+ match nl.core().await {
+ Ok(()) => {
+ tracing::debug!("closing successful netloop core for {:?}", addr);
+ }
+ Err(e) => {
+ tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
+ }
}
}
-}
-async fn client(mut ctx: ClientContext) -> Result<()> {
- // Send greeting
- let (mut server, _) = ServerFlow::send_greeting(
- ctx.stream,
- ServerFlowOptions {
- crlf_relaxed: false,
- literal_accept_text: Text::unvalidated("OK"),
- literal_reject_text: Text::unvalidated("Literal rejected"),
- ..ServerFlowOptions::default()
- },
- Greeting::ok(
- Some(Code::Capability(ctx.server_capabilities.to_vec())),
- "Aerogramme",
+ async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
+ // Send greeting
+ let (mut server, _) = ServerFlow::send_greeting(
+ sock,
+ ServerFlowOptions {
+ crlf_relaxed: false,
+ literal_accept_text: Text::unvalidated("OK"),
+ literal_reject_text: Text::unvalidated("Literal rejected"),
+ ..ServerFlowOptions::default()
+ },
+ Greeting::ok(
+ Some(Code::Capability(ctx.server_capabilities.to_vec())),
+ "Aerogramme",
+ )
+ .unwrap(),
)
- .unwrap(),
- )
- .await?;
+ .await?;
- use crate::imap::response::{Body, Response as MyResponse};
- use crate::imap::session::Instance;
- use imap_codec::imap_types::command::Command;
- use imap_codec::imap_types::response::{Code, Response, Status};
+ // Start a mailbox session in background
+ let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
+ let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
+ tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
- use tokio::sync::mpsc;
- let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
- let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
+ // Return the object
+ Ok(NetLoop {
+ ctx,
+ server,
+ cmd_tx,
+ resp_rx,
+ })
+ }
- let bckgrnd = tokio::spawn(async move {
+ /// Coms with the background session
+ async fn session(
+ ctx: ClientContext,
+ mut cmd_rx: Receiver<Request>,
+ resp_tx: UnboundedSender<ResponseOrIdle>,
+ ) -> () {
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
loop {
let cmd = match cmd_rx.recv().await {
@@ -140,8 +186,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
- let maybe_response = session.command(cmd).await;
- tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response");
+ let maybe_response = session.request(cmd).await;
+ tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
match resp_tx.send(maybe_response) {
Err(_) => break,
@@ -149,67 +195,150 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
}
tracing::info!("runner is quitting");
- });
+ }
- // Main loop
- loop {
+ async fn core(mut self) -> Result<()> {
+ let mut mode = LoopMode::Interactive;
+ loop {
+ mode = match mode {
+ LoopMode::Interactive => self.interactive_mode().await?,
+ LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
+ LoopMode::Quit => break,
+ }
+ }
+ Ok(())
+ }
+
+ async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! {
// Managing imap_flow stuff
- srv_evt = server.progress() => match srv_evt? {
+ srv_evt = self.server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
- Response::Status(Status::Bye(_)) => break,
- _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
+ Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
+ _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
- match cmd_tx.try_send(command) {
+ match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
- server.enqueue_status(Status::bye(None, "Too fast").unwrap());
- tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
},
flow => {
- server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
- tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow);
-
+ self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
+ tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
},
// Managing response generated by Aerogramme
- maybe_msg = resp_rx.recv() => {
- let response = match maybe_msg {
- None => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
- continue
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ Some(ResponseOrIdle::Response(response)) => {
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ },
+ Some(ResponseOrIdle::StartIdle(stop)) => {
+ let cr = CommandContinuationRequest::basic(None, "Idling")?;
+ self.server.enqueue_continuation(cr);
+ self.cmd_tx.try_send(Request::Idle)?;
+ return Ok(LoopMode::Idle(BytesMut::new(), stop))
+ },
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ Some(_) => unreachable!(),
+
+ },
+
+ // When receiving a CTRL+C
+ _ = self.ctx.must_exit.changed() => {
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ },
+ };
+ Ok(LoopMode::Interactive)
+ }
+
+ async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
+ // Flush send
+ loop {
+ match self.server.progress_send().await? {
+ Some(..) => continue,
+ None => break,
+ }
+ }
+
+ tokio::select! {
+ // Receiving IDLE event from background
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ // Session decided idle is terminated
+ Some(ResponseOrIdle::Response(response)) => {
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ return Ok(LoopMode::Interactive)
+ },
+ // Session has some information for user
+ Some(ResponseOrIdle::IdleEvent(elems)) => {
+ for body_elem in elems.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.cmd_tx.try_send(Request::Idle)?;
+ return Ok(LoopMode::Idle(buff, stop))
+ },
+
+ // Session crashed
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ return Ok(LoopMode::Interactive)
+ },
+
+ // Session can't start idling while already idling, it's a logic error!
+ Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"),
+ },
+
+ // User is trying to interact with us
+ _read_client_bytes = self.server.stream.read(&mut buff) => {
+ use imap_codec::decode::Decoder;
+ let codec = imap_codec::IdleDoneCodec::new();
+ match codec.decode(&buff) {
+ Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => {
+ // Session will be informed that it must stop idle
+ // It will generate the "done" message and change the loop mode
+ stop.notify_one()
},
- Some(r) => r,
+ Err(_) => (),
+ _ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."),
};
- for body_elem in response.body.into_iter() {
- let _handle = match body_elem {
- Body::Data(d) => server.enqueue_data(d),
- Body::Status(s) => server.enqueue_status(s),
- };
- }
- server.enqueue_status(response.completion);
+ return Ok(LoopMode::Idle(buff, stop))
},
// When receiving a CTRL+C
- _ = ctx.must_exit.changed() => {
- server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ _ = self.ctx.must_exit.changed() => {
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ return Ok(LoopMode::Interactive)
},
};
}
-
- drop(cmd_tx);
- bckgrnd.await?;
- Ok(())
}
diff --git a/src/imap/request.rs b/src/imap/request.rs
new file mode 100644
index 0000000..49b4992
--- /dev/null
+++ b/src/imap/request.rs
@@ -0,0 +1,7 @@
+use imap_codec::imap_types::command::Command;
+
+#[derive(Debug)]
+pub enum Request {
+ ImapCommand(Command<'static>),
+ Idle,
+}
diff --git a/src/imap/response.rs b/src/imap/response.rs
index d20e58e..40e6927 100644
--- a/src/imap/response.rs
+++ b/src/imap/response.rs
@@ -2,7 +2,10 @@ use anyhow::Result;
use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag;
use imap_codec::imap_types::response::{Code, Data, Status};
+use std::sync::Arc;
+use tokio::sync::Notify;
+#[derive(Debug)]
pub enum Body<'a> {
Data(Data<'a>),
Status(Status<'a>),
@@ -88,6 +91,7 @@ impl<'a> ResponseBuilder<'a> {
}
}
+#[derive(Debug)]
pub struct Response<'a> {
pub body: Vec<Body<'a>>,
pub completion: Status<'a>,
@@ -110,3 +114,10 @@ impl<'a> Response<'a> {
})
}
}
+
+#[derive(Debug)]
+pub enum ResponseOrIdle {
+ Response(Response<'static>),
+ StartIdle(Arc<Notify>),
+ IdleEvent(Vec<Body<'static>>),
+}
diff --git a/src/imap/search.rs b/src/imap/search.rs
index 61cbad5..d06c3bd 100644
--- a/src/imap/search.rs
+++ b/src/imap/search.rs
@@ -2,7 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64};
use anyhow::Result;
use imap_codec::imap_types::core::NonEmptyVec;
-use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch};
+use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex;
@@ -115,12 +115,15 @@ impl<'a> Criteria<'a> {
pub fn is_modseq(&self) -> bool {
use SearchKey::*;
match self.0 {
- And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()),
+ And(and_list) => and_list
+ .as_ref()
+ .iter()
+ .any(|child| Criteria(child).is_modseq()),
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
Not(child) => Criteria(child).is_modseq(),
ModSeq { .. } => true,
_ => false,
- }
+ }
}
/// Returns emails that we now for sure we want to keep
@@ -187,7 +190,10 @@ impl<'a> Criteria<'a> {
// Sequence logic
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
- ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(),
+ ModSeq {
+ metadata_item,
+ modseq,
+ } => is_keep_modseq(metadata_item, modseq, midx).into(),
// All the stuff we can't evaluate yet
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
@@ -225,7 +231,10 @@ impl<'a> Criteria<'a> {
//@FIXME Reevaluating our previous logic...
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
- ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
+ ModSeq {
+ metadata_item,
+ modseq,
+ } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
// Filter on mail meta
Before(search_naive) => match mail_view.stored_naive_date() {
@@ -331,7 +340,7 @@ fn approx_sequence_set_size(seq_set: &SequenceSet) -> u64 {
}
// This is wrong as sequence UID can have holes,
-// as we don't know the number of messages in the mailbox also
+// as we don't know the number of messages in the mailbox also
// we gave to guess
fn approx_sequence_size(seq: &Sequence) -> u64 {
match seq {
@@ -473,9 +482,13 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool {
}
}
-fn is_keep_modseq(filter: &Option<MetadataItemSearch>, modseq: &NonZeroU64, midx: &MailIndex) -> bool {
+fn is_keep_modseq(
+ filter: &Option<MetadataItemSearch>,
+ modseq: &NonZeroU64,
+ midx: &MailIndex,
+) -> bool {
if filter.is_some() {
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
}
- modseq <= &midx.modseq
+ modseq <= &midx.modseq
}
diff --git a/src/imap/session.rs b/src/imap/session.rs
index 6b26478..12bbfee 100644
--- a/src/imap/session.rs
+++ b/src/imap/session.rs
@@ -1,8 +1,10 @@
use crate::imap::capability::{ClientCapability, ServerCapability};
-use crate::imap::command::{anonymous, authenticated, examined, selected};
+use crate::imap::command::{anonymous, authenticated, selected};
use crate::imap::flow;
-use crate::imap::response::Response;
+use crate::imap::request::Request;
+use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider;
+use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::Command;
//-----
@@ -23,7 +25,45 @@ impl Instance {
}
}
- pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
+ pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
+ match req {
+ Request::Idle => self.idle().await,
+ Request::ImapCommand(cmd) => self.command(cmd).await,
+ }
+ }
+
+ pub async fn idle(&mut self) -> ResponseOrIdle {
+ match self.idle_happy().await {
+ Ok(r) => r,
+ Err(e) => {
+ tracing::error!(err=?e, "something bad happened in idle");
+ ResponseOrIdle::Response(Response::bye().unwrap())
+ }
+ }
+ }
+
+ pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> {
+ let (mbx, tag, stop) = match &mut self.state {
+ flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()),
+ _ => bail!("Invalid session state, can't idle"),
+ };
+
+ tokio::select! {
+ _ = stop.notified() => {
+ self.state.apply(flow::Transition::UnIdle)?;
+ return Ok(ResponseOrIdle::Response(Response::build()
+ .tag(tag.clone())
+ .message("IDLE completed")
+ .ok()?))
+ },
+ change = mbx.idle_sync() => {
+ tracing::debug!("idle event");
+ return Ok(ResponseOrIdle::IdleEvent(change?));
+ }
+ }
+ }
+
+ pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let (resp, tr) = match &mut self.state {
@@ -44,26 +84,18 @@ impl Instance {
};
authenticated::dispatch(ctx).await
}
- flow::State::Selected(ref user, ref mut mailbox) => {
+ flow::State::Selected(ref user, ref mut mailbox, ref perm) => {
let ctx = selected::SelectedContext {
req: &cmd,
server_capabilities: &self.server_capabilities,
client_capabilities: &mut self.client_capabilities,
user,
mailbox,
+ perm,
};
selected::dispatch(ctx).await
}
- flow::State::Examined(ref user, ref mut mailbox) => {
- let ctx = examined::ExaminedContext {
- req: &cmd,
- server_capabilities: &self.server_capabilities,
- client_capabilities: &mut self.client_capabilities,
- user,
- mailbox,
- };
- examined::dispatch(ctx).await
- }
+ flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")),
flow::State::Logout => Response::build()
.tag(cmd.tag.clone())
.message("No commands are allowed in the LOGOUT state.")
@@ -88,15 +120,18 @@ impl Instance {
e,
cmd
);
- return Response::build()
+ return ResponseOrIdle::Response(Response::build()
.to_req(&cmd)
.message(
"Internal error, processing command triggered an illegal IMAP state transition",
)
.bad()
- .unwrap();
+ .unwrap());
}
- resp
+ match &self.state {
+ flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()),
+ _ => ResponseOrIdle::Response(resp),
+ }
}
}