From 2adf73dd8e6e76997d4fb67f9f7b3fe065530722 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 17:30:40 +0100 Subject: Update imap-flow, clean IDLE --- src/imap/capability.rs | 4 +- src/imap/command/authenticated.rs | 4 +- src/imap/command/selected.rs | 14 +-- src/imap/flow.rs | 11 ++- src/imap/mailbox_view.rs | 6 +- src/imap/mime_view.rs | 12 +-- src/imap/mod.rs | 177 +++++++++++++++++++++----------------- src/imap/request.rs | 4 +- src/imap/response.rs | 3 +- src/imap/search.rs | 4 +- src/imap/session.rs | 48 +++++++++-- 11 files changed, 171 insertions(+), 116 deletions(-) (limited to 'src/imap') diff --git a/src/imap/capability.rs b/src/imap/capability.rs index 256d820..c76b51c 100644 --- a/src/imap/capability.rs +++ b/src/imap/capability.rs @@ -1,5 +1,5 @@ use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier}; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::response::Capability; use std::collections::HashSet; @@ -49,7 +49,7 @@ impl Default for ServerCapability { } impl ServerCapability { - pub fn to_vec(&self) -> NonEmptyVec> { + pub fn to_vec(&self) -> Vec1> { self.0 .iter() .map(|v| v.clone()) diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index d2e7815..129c2fd 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result}; use imap_codec::imap_types::command::{ Command, CommandBody, ListReturnItem, SelectExamineModifier, }; -use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar}; +use imap_codec::imap_types::core::{Atom, Literal, Vec1, QuotedChar}; use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::flag::{Flag, FlagNameAttribute}; @@ -584,7 +584,7 @@ impl<'a> AuthenticatedContext<'a> { fn enable( self, - cap_enable: &NonEmptyVec>, + cap_enable: &Vec1>, ) -> Result<(Response<'static>, flow::Transition)> { let mut response_builder = Response::build().to_req(self.req); let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref()); diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 73f8aec..e871509 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Vec1, Charset}; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; @@ -54,11 +54,12 @@ pub async fn dispatch<'a>( ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid) .await } + //@FIXME SearchKey::And is a legacy hack, should be refactored CommandBody::Search { charset, criteria, uid, - } => ctx.search(charset, criteria, uid).await, + } => ctx.search(charset, &SearchKey::And(criteria.clone()), uid).await, CommandBody::Expunge { // UIDPLUS (rfc4315) uid_sequence_set, @@ -88,15 +89,6 @@ pub async fn dispatch<'a>( // UNSELECT extension (rfc3691) CommandBody::Unselect => ctx.unselect().await, - // IDLE extension (rfc2177) - CommandBody::Idle => Ok(( - Response::build() - .to_req(ctx.req) - .message("DUMMY command due to anti-pattern in the code") - .ok()?, - flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), - )), - // In selected mode, we fallback to authenticated when needed _ => { authenticated::dispatch(authenticated::AuthenticatedContext { diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 6ddd092..4405e71 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,11 +1,12 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; + use tokio::sync::Notify; +use imap_codec::imap_types::core::Tag; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; -use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Error { @@ -31,6 +32,14 @@ pub enum State { ), Logout, } +impl State { + pub fn notify(&self) -> Option> { + match self { + Self::Idle(_, _, _, _, anotif) => Some(anotif.clone()), + _ => None, + } + } +} impl fmt::Display for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use State::*; diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 076f92e..f6395b4 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -6,7 +6,7 @@ use anyhow::{anyhow, Error, Result}; use futures::stream::{StreamExt, TryStreamExt}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::core::{Charset, Vec1}; use imap_codec::imap_types::fetch::MessageDataItem; use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType}; use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; @@ -629,7 +629,7 @@ impl MailboxView { mod tests { use super::*; use imap_codec::encode::Encoder; - use imap_codec::imap_types::core::NonEmptyVec; + use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::fetch::Section; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName}; use imap_codec::imap_types::response::Response; @@ -749,7 +749,7 @@ mod tests { let test_repr = Response::Data(Data::Fetch { seq: NonZeroU32::new(1).unwrap(), - items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure( + items: Vec1::from(MessageDataItem::Body(mime_view::bodystructure( &message.child, false, )?)), diff --git a/src/imap/mime_view.rs b/src/imap/mime_view.rs index 8fc043b..8bbbd2d 100644 --- a/src/imap/mime_view.rs +++ b/src/imap/mime_view.rs @@ -8,7 +8,7 @@ use imap_codec::imap_types::body::{ BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData, SpecificFields, }; -use imap_codec::imap_types::core::{AString, IString, NString, NonEmptyVec}; +use imap_codec::imap_types::core::{AString, IString, NString, Vec1}; use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection}; use eml_codec::{ @@ -141,8 +141,8 @@ impl<'a> NodeMime<'a> { enum SubsettedSection<'a> { Part, Header, - HeaderFields(&'a NonEmptyVec>), - HeaderFieldsNot(&'a NonEmptyVec>), + HeaderFields(&'a Vec1>), + HeaderFieldsNot(&'a Vec1>), Text, Mime, } @@ -238,7 +238,7 @@ impl<'a> SelectedMime<'a> { /// case-insensitive but otherwise exact. fn header_fields( &self, - fields: &'a NonEmptyVec>, + fields: &'a Vec1>, invert: bool, ) -> Result> { // Build a lowercase ascii hashset with the fields to fetch @@ -398,8 +398,8 @@ impl<'a> NodeMult<'a> { .filter_map(|inner| NodeMime(&inner).structure(is_ext).ok()) .collect::>(); - NonEmptyVec::validate(&inner_bodies)?; - let bodies = NonEmptyVec::unvalidated(inner_bodies); + Vec1::validate(&inner_bodies)?; + let bodies = Vec1::unvalidated(inner_bodies); Ok(BodyStructure::Multi { bodies, diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 544086e..dbf72fe 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,7 +15,7 @@ mod session; use std::net::SocketAddr; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; @@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut; const PIPELINABLE_COMMANDS: usize = 64; -#[derive(Debug)] -enum LoopMode { - Quit, - Interactive, - Idle(BytesMut, Arc), -} - // @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, @@ -163,7 +156,7 @@ impl NetLoop { async fn handler(ctx: ClientContext, sock: AnyStream) { let addr = ctx.addr.clone(); - let nl = match Self::new(ctx, sock).await { + let mut nl = match Self::new(ctx, sock).await { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl @@ -241,85 +234,111 @@ impl NetLoop { tracing::info!("runner is quitting"); } - async fn core(mut self) -> Result<()> { - tracing::trace!("Starting the core loop"); - let mut mode = LoopMode::Interactive; + async fn core(&mut self) -> Result<()> { + let mut maybe_idle: Option> = None; loop { - tracing::trace!(mode=?mode, "Core loop iter"); - mode = match mode { - LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, - LoopMode::Quit => break, - } - } - Ok(()) - } - - async fn interactive_mode(&mut self) -> Result { - tokio::select! { - // Managing imap_flow stuff - srv_evt = self.server.progress() => match srv_evt? { - ServerFlowEvent::ResponseSent { handle: _handle, response } => { - match response { - Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), - _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), - } - }, - ServerFlowEvent::CommandReceived { command } => { - match self.cmd_tx.try_send(Request::ImapCommand(command)) { - Ok(_) => (), - Err(mpsc::error::TrySendError::Full(_)) => { - self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + tokio::select! { + // Managing imap_flow stuff + srv_evt = self.server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => return Ok(()), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } - _ => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + ServerFlowEvent::CommandReceived { command } => { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } + } + }, + ServerFlowEvent::IdleCommandReceived { tag } => { + match self.cmd_tx.try_send(Request::IdleStart(tag)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } } } - }, - flow => { - self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); - } - }, - - // Managing response generated by Aerogramme - maybe_msg = self.resp_rx.recv() => match maybe_msg { - Some(ResponseOrIdle::Response(response)) => { - tracing::trace!("Interactive, server has a response for the client"); - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => self.server.enqueue_data(d), - Body::Status(s) => self.server.enqueue_status(s), - }; + ServerFlowEvent::IdleDoneReceived => { + tracing::trace!("client sent DONE and want to stop IDLE"); + maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one(); + maybe_idle = None; + } + flow => { + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } - self.server.enqueue_status(response.completion); - }, - Some(ResponseOrIdle::StartIdle(stop)) => { - tracing::trace!("Interactive, server agreed to switch in idle mode"); - let cr = CommandContinuationRequest::basic(None, "Idling")?; - self.server.enqueue_continuation(cr); - self.cmd_tx.try_send(Request::Idle)?; - return Ok(LoopMode::Idle(BytesMut::new(), stop)) - }, - None => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, - Some(_) => unreachable!(), - }, + // Managing response generated by Aerogramme + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + tracing::trace!("Interactive, server has a response for the client"); + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::IdleAccept(stop)) => { + tracing::trace!("Interactive, server agreed to switch in idle mode"); + let cr = CommandContinuationRequest::basic(None, "Idling")?; + self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?; + self.cmd_tx.try_send(Request::IdlePoll)?; + if maybe_idle.is_some() { + bail!("Can't start IDLE if already idling"); + } + maybe_idle = Some(stop); + }, + Some(ResponseOrIdle::IdleEvent(elems)) => { + tracing::trace!("server imap session has some change to communicate to the client"); + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.cmd_tx.try_send(Request::IdlePoll)?; + }, + Some(ResponseOrIdle::IdleReject(response)) => { + tracing::trace!("inform client that session rejected idle"); + self.server + .idle_reject(response.completion) + .or(Err(anyhow!("wrong reject command")))?; + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + Some(_) => unreachable!(), - // When receiving a CTRL+C - _ = self.ctx.must_exit.changed() => { - tracing::trace!("Interactive, CTRL+C, exiting"); - self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); - }, - }; - Ok(LoopMode::Interactive) + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + tracing::trace!("Interactive, CTRL+C, exiting"); + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; + } } + /* async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc) -> Result { // Flush send loop { @@ -398,5 +417,5 @@ impl NetLoop { return Ok(LoopMode::Interactive) }, }; - } + }*/ } diff --git a/src/imap/request.rs b/src/imap/request.rs index 49b4992..cff18a3 100644 --- a/src/imap/request.rs +++ b/src/imap/request.rs @@ -1,7 +1,9 @@ use imap_codec::imap_types::command::Command; +use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Request { ImapCommand(Command<'static>), - Idle, + IdleStart(Tag<'static>), + IdlePoll, } diff --git a/src/imap/response.rs b/src/imap/response.rs index 40e6927..b6a0e98 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -118,6 +118,7 @@ impl<'a> Response<'a> { #[derive(Debug)] pub enum ResponseOrIdle { Response(Response<'static>), - StartIdle(Arc), + IdleAccept(Arc), + IdleReject(Response<'static>), IdleEvent(Vec>), } diff --git a/src/imap/search.rs b/src/imap/search.rs index 4ff70ee..37a7e9e 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,6 +1,6 @@ use std::num::{NonZeroU32, NonZeroU64}; -use imap_codec::imap_types::core::NonEmptyVec; +use imap_codec::imap_types::core::Vec1; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; @@ -48,7 +48,7 @@ impl<'a> Criteria<'a> { let mut new_vec = base.0.into_inner(); new_vec.extend_from_slice(ext.0.as_ref()); let seq = SequenceSet( - NonEmptyVec::try_from(new_vec) + Vec1::try_from(new_vec) .expect("merging non empty vec lead to non empty vec"), ); (seq, x) diff --git a/src/imap/session.rs b/src/imap/session.rs index 12bbfee..9165574 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -4,8 +4,8 @@ use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; -use anyhow::{anyhow, bail, Result}; -use imap_codec::imap_types::command::Command; +use anyhow::{anyhow, bail, Result, Context}; +use imap_codec::imap_types::{core::Tag, command::Command}; //----- pub struct Instance { @@ -27,13 +27,44 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::Idle => self.idle().await, + Request::IdleStart(tag) => self.idle_init(tag), + Request::IdlePoll => self.idle_poll().await, Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self) -> ResponseOrIdle { - match self.idle_happy().await { + pub fn idle_init(&mut self, tag: Tag<'static>) -> ResponseOrIdle { + // Build transition + //@FIXME the notifier should be hidden inside the state and thus not part of the transition! + let transition = flow::Transition::Idle(tag.clone(), tokio::sync::Notify::new()); + + // Try to apply the transition and get the stop notifier + let maybe_stop = self + .state + .apply(transition) + .context("IDLE transition failed") + .and_then(|_| self.state.notify().ok_or(anyhow!("IDLE state has no Notify object"))); + + // Build an appropriate response + match maybe_stop { + Ok(stop) => ResponseOrIdle::IdleAccept(stop), + Err(e) => { + tracing::error!(err=?e, "unable to init idle due to a transition error"); + //ResponseOrIdle::IdleReject(tag) + let no = Response::build() + .tag(tag) + .message( + "Internal error, processing command triggered an illegal IMAP state transition", + ) + .no() + .unwrap(); + ResponseOrIdle::IdleReject(no) + } + } + } + + pub async fn idle_poll(&mut self) -> ResponseOrIdle { + match self.idle_poll_happy().await { Ok(r) => r, Err(e) => { tracing::error!(err=?e, "something bad happened in idle"); @@ -42,7 +73,7 @@ impl Instance { } } - pub async fn idle_happy(&mut self) -> Result { + pub async fn idle_poll_happy(&mut self) -> Result { let (mbx, tag, stop) = match &mut self.state { flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), _ => bail!("Invalid session state, can't idle"), @@ -128,10 +159,11 @@ impl Instance { .bad() .unwrap()); } + ResponseOrIdle::Response(resp) - match &self.state { + /*match &self.state { flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), - } + }*/ } } -- cgit v1.2.3