From 3d23f0c936516ed89f2888fb44babb3994e8d579 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 08:22:15 +0100 Subject: WIP refactor idle --- src/imap/command/authenticated.rs | 4 +- src/imap/command/selected.rs | 52 +++++++++- src/imap/flow.rs | 36 ++++--- src/imap/mod.rs | 205 +++++++++++++++++++++++--------------- src/imap/request.rs | 8 ++ src/imap/response.rs | 8 ++ src/imap/session.rs | 46 ++++++--- 7 files changed, 247 insertions(+), 112 deletions(-) create mode 100644 src/imap/request.rs (limited to 'src') diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 9b6bb24..e17699a 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> { .code(Code::ReadWrite) .set_body(data) .ok()?, - flow::Transition::Select(mb), + flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite), )) } @@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> { .code(Code::ReadOnly) .set_body(data) .ok()?, - flow::Transition::Examine(mb), + flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly), )) } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index c13b71a..c9c5337 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -25,6 +25,7 @@ pub struct SelectedContext<'a> { pub mailbox: &'a mut MailboxView, pub server_capabilities: &'a ServerCapability, pub client_capabilities: &'a mut ClientCapability, + pub perm: &'a flow::MailboxPerm, } pub async fn dispatch<'a>( @@ -39,7 +40,10 @@ pub async fn dispatch<'a>( CommandBody::Logout => anystate::logout(), // Specific to this state (7 commands + NOOP) - CommandBody::Close => ctx.close().await, + CommandBody::Close => match ctx.perm { + flow::MailboxPerm::ReadWrite => ctx.close().await, + flow::MailboxPerm::ReadOnly => ctx.examine_close().await, + }, CommandBody::Noop | CommandBody::Check => ctx.noop().await, CommandBody::Fetch { sequence_set, @@ -75,6 +79,11 @@ pub async fn dispatch<'a>( // UNSELECT extension (rfc3691) CommandBody::Unselect => ctx.unselect().await, + // IDLE extension (rfc2177) + CommandBody::Idle => { + unimplemented!() + } + // In selected mode, we fallback to authenticated when needed _ => { authenticated::dispatch(authenticated::AuthenticatedContext { @@ -102,6 +111,18 @@ impl<'a> SelectedContext<'a> { )) } + /// CLOSE in examined state is not the same as in selected state + /// (in selected state it also does an EXPUNGE, here it doesn't) + async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> { + Ok(( + Response::build() + .to_req(self.req) + .message("CLOSE completed") + .ok()?, + flow::Transition::Unselect, + )) + } + async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() @@ -189,6 +210,10 @@ impl<'a> SelectedContext<'a> { } async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let tag = self.req.tag.clone(); let data = self.mailbox.expunge().await?; @@ -211,6 +236,10 @@ impl<'a> SelectedContext<'a> { modifiers: &[StoreModifier], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let mut unchanged_since: Option = None; modifiers.iter().for_each(|m| match m { StoreModifier::UnchangedSince(val) => { @@ -251,6 +280,11 @@ impl<'a> SelectedContext<'a> { mailbox: &MailboxCodec<'a>, uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + //@FIXME Could copy be valid in EXAMINE mode? + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -303,6 +337,10 @@ impl<'a> SelectedContext<'a> { mailbox: &MailboxCodec<'a>, uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -350,4 +388,16 @@ impl<'a> SelectedContext<'a> { flow::Transition::None, )) } + + fn fail_read_only(&self) -> Option> { + match self.perm { + flow::MailboxPerm::ReadWrite => None, + flow::MailboxPerm::ReadOnly => { + Some(Response::build() + .to_req(self.req) + .message("Write command are forbidden while exmining mailbox") + .no().unwrap()) + }, + } + } } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 95810c1..ff348ca 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -19,17 +19,23 @@ impl StdError for Error {} pub enum State { NotAuthenticated, Authenticated(Arc), - Selected(Arc, MailboxView), - // Examined is like Selected, but indicates that the mailbox is read-only - Examined(Arc, MailboxView), + Selected(Arc, MailboxView, MailboxPerm), + Idle(Arc, MailboxView, MailboxPerm), Logout, } +#[derive(Clone)] +pub enum MailboxPerm { + ReadOnly, + ReadWrite, +} + pub enum Transition { None, Authenticate(Arc), - Examine(MailboxView), - Select(MailboxView), + Select(MailboxView, MailboxPerm), + Idle, + UnIdle, Unselect, Logout, } @@ -38,20 +44,22 @@ pub enum Transition { // https://datatracker.ietf.org/doc/html/rfc3501#page-13 impl State { pub fn apply(&mut self, tr: Transition) -> Result<(), Error> { - let new_state = match (&self, tr) { + let new_state = match (std::mem::replace(self, State::NotAuthenticated), tr) { (_s, Transition::None) => return Ok(()), (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), ( - State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), - Transition::Select(m), - ) => State::Selected(u.clone(), m), - ( - State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), - Transition::Examine(m), - ) => State::Examined(u.clone(), m), - (State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => { + State::Authenticated(u) | State::Selected(u, _, _), + Transition::Select(m, p), + ) => State::Selected(u, m, p), + (State::Selected(u, _, _) , Transition::Unselect) => { State::Authenticated(u.clone()) } + (State::Selected(u, m, p), Transition::Idle) => { + State::Idle(u, m, p) + }, + (State::Idle(u, m, p), Transition::UnIdle) => { + State::Selected(u, m, p) + }, (_, Transition::Logout) => State::Logout, _ => return Err(Error::ForbiddenTransition), }; diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 61a265a..baa15f7 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -8,22 +8,28 @@ mod index; mod mail_view; mod mailbox_view; mod mime_view; +mod request; mod response; mod search; mod session; use std::net::SocketAddr; -use anyhow::Result; +use anyhow::{Result, bail}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; use tokio::sync::watch; +use tokio::sync::mpsc; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; +use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status}; +use crate::imap::response::{Body, ResponseOrIdle}; +use crate::imap::session::Instance; +use crate::imap::request::Request; use crate::config::ImapConfig; use crate::imap::capability::ServerCapability; use crate::login::ArcLoginProvider; @@ -35,8 +41,8 @@ pub struct Server { capabilities: ServerCapability, } +#[derive(Clone)] struct ClientContext { - stream: AnyStream, addr: SocketAddr, login_provider: ArcLoginProvider, must_exit: watch::Receiver, @@ -74,13 +80,12 @@ impl Server { tracing::info!("IMAP: accepted connection from {}", remote_addr); let client = ClientContext { - stream: AnyStream::new(socket), addr: remote_addr.clone(), login_provider: self.login_provider.clone(), must_exit: must_exit.clone(), server_capabilities: self.capabilities.clone(), }; - let conn = tokio::spawn(client_wrapper(client)); + let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket))); connections.push(conn); } drop(tcp); @@ -92,46 +97,74 @@ impl Server { } } -async fn client_wrapper(ctx: ClientContext) { - let addr = ctx.addr.clone(); - match client(ctx).await { - Ok(()) => { - tracing::debug!("closing successful session for {:?}", addr); - } - Err(e) => { - tracing::error!("closing errored session for {:?}: {}", addr, e); +use tokio::sync::mpsc::*; +enum LoopMode { + Quit, + Interactive, + IdleUntil(tokio::sync::Notify), +} + +struct NetLoop { + ctx: ClientContext, + server: ServerFlow, + cmd_tx: Sender, + resp_rx: UnboundedReceiver, +} + +impl NetLoop { + async fn handler(ctx: ClientContext, sock: AnyStream) { + let addr = ctx.addr.clone(); + + let nl = match Self::new(ctx, sock).await { + Ok(nl) => { + tracing::debug!(addr=?addr, "netloop successfully initialized"); + nl + }, + Err(e) => { + tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); + return + } + }; + + match nl.core().await { + Ok(()) => { + tracing::debug!("closing successful netloop core for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored netloop core for {:?}: {}", addr, e); + } } } -} -async fn client(mut ctx: ClientContext) -> Result<()> { - // Send greeting - let (mut server, _) = ServerFlow::send_greeting( - ctx.stream, - ServerFlowOptions { - crlf_relaxed: false, - literal_accept_text: Text::unvalidated("OK"), - literal_reject_text: Text::unvalidated("Literal rejected"), - ..ServerFlowOptions::default() - }, - Greeting::ok( - Some(Code::Capability(ctx.server_capabilities.to_vec())), - "Aerogramme", - ) - .unwrap(), - ) - .await?; - - use crate::imap::response::{Body, Response as MyResponse}; - use crate::imap::session::Instance; - use imap_codec::imap_types::command::Command; - use imap_codec::imap_types::response::{Code, Response, Status}; - - use tokio::sync::mpsc; - let (cmd_tx, mut cmd_rx) = mpsc::channel::>(10); - let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::>(); - - let bckgrnd = tokio::spawn(async move { + async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + sock, + ServerFlowOptions { + crlf_relaxed: false, + literal_accept_text: Text::unvalidated("OK"), + literal_reject_text: Text::unvalidated("Literal rejected"), + ..ServerFlowOptions::default() + }, + Greeting::ok( + Some(Code::Capability(ctx.server_capabilities.to_vec())), + "Aerogramme", + ) + .unwrap(), + ) + .await?; + + // Start a mailbox session in background + let (cmd_tx, mut cmd_rx) = mpsc::channel::(3); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::(); + tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); + + // Return the object + Ok(NetLoop { ctx, server, cmd_tx, resp_rx }) + } + + /// Coms with the background session + async fn session(ctx: ClientContext, mut cmd_rx: Receiver, resp_tx: UnboundedSender) -> () { let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); loop { let cmd = match cmd_rx.recv().await { @@ -140,8 +173,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command"); - let maybe_response = session.command(cmd).await; - tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response"); + let maybe_response = session.request(cmd).await; + tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response"); match resp_tx.send(maybe_response) { Err(_) => break, @@ -149,67 +182,81 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; } tracing::info!("runner is quitting"); - }); + } + + async fn core(mut self) -> Result<()> { + let mut mode = LoopMode::Interactive; + loop { + mode = match mode { + LoopMode::Interactive => self.interactive_mode().await?, + LoopMode::IdleUntil(notif) => self.idle_mode(notif).await?, + LoopMode::Quit => break, + } + } + Ok(()) + } + - // Main loop - loop { + async fn interactive_mode(&mut self) -> Result { tokio::select! { // Managing imap_flow stuff - srv_evt = server.progress() => match srv_evt? { + srv_evt = self.server.progress() => match srv_evt? { ServerFlowEvent::ResponseSent { handle: _handle, response } => { match response { - Response::Status(Status::Bye(_)) => break, - _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } }, ServerFlowEvent::CommandReceived { command } => { - match cmd_tx.try_send(command) { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { Ok(_) => (), Err(mpsc::error::TrySendError::Full(_)) => { - server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); } _ => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); } } }, flow => { - server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow); - + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } }, // Managing response generated by Aerogramme - maybe_msg = resp_rx.recv() => { - let response = match maybe_msg { - None => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); - continue - }, - Some(r) => r, - }; - - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => server.enqueue_data(d), - Body::Status(s) => server.enqueue_status(s), - }; - } - server.enqueue_status(response.completion); + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::Idle) => { + let cr = CommandContinuationRequest::basic(None, "idling")?; + self.server.enqueue_continuation(cr); + return Ok(LoopMode::IdleUntil(tokio::sync::Notify::new())) + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, }, // When receiving a CTRL+C - _ = ctx.must_exit.changed() => { - server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + _ = self.ctx.must_exit.changed() => { + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); }, }; + Ok(LoopMode::Interactive) } - drop(cmd_tx); - bckgrnd.await?; - Ok(()) + async fn idle_mode(&mut self, notif: tokio::sync::Notify) -> Result { + Ok(LoopMode::IdleUntil(notif)) + } } diff --git a/src/imap/request.rs b/src/imap/request.rs new file mode 100644 index 0000000..c458276 --- /dev/null +++ b/src/imap/request.rs @@ -0,0 +1,8 @@ +use imap_codec::imap_types::command::Command; +use tokio::sync::Notify; + +#[derive(Debug)] +pub enum Request { + ImapCommand(Command<'static>), + IdleUntil(Notify), +} diff --git a/src/imap/response.rs b/src/imap/response.rs index d20e58e..a9978e1 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -3,6 +3,7 @@ use imap_codec::imap_types::command::Command; use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::response::{Code, Data, Status}; +#[derive(Debug)] pub enum Body<'a> { Data(Data<'a>), Status(Status<'a>), @@ -88,6 +89,7 @@ impl<'a> ResponseBuilder<'a> { } } +#[derive(Debug)] pub struct Response<'a> { pub body: Vec>, pub completion: Status<'a>, @@ -110,3 +112,9 @@ impl<'a> Response<'a> { }) } } + +#[derive(Debug)] +pub enum ResponseOrIdle { + Response(Response<'static>), + Idle, +} diff --git a/src/imap/session.rs b/src/imap/session.rs index 6b26478..d86e6ff 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,7 +1,9 @@ +use anyhow::anyhow; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anonymous, authenticated, examined, selected}; use crate::imap::flow; -use crate::imap::response::Response; +use crate::imap::request::Request; +use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; use imap_codec::imap_types::command::Command; @@ -23,7 +25,24 @@ impl Instance { } } - pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> { + pub async fn request(&mut self, req: Request) -> ResponseOrIdle { + match req { + Request::IdleUntil(stop) => ResponseOrIdle::Response(self.idle(stop).await), + Request::ImapCommand(cmd) => self.command(cmd).await, + } + } + + pub async fn idle(&mut self, stop: tokio::sync::Notify) -> Response<'static> { + let (user, mbx) = match &mut self.state { + flow::State::Idle(ref user, ref mut mailbox, ref perm) => (user, mailbox), + _ => unreachable!(), + }; + + unimplemented!(); + } + + + pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle { // Command behavior is modulated by the state. // To prevent state error, we handle the same command in separate code paths. let (resp, tr) = match &mut self.state { @@ -44,26 +63,18 @@ impl Instance { }; authenticated::dispatch(ctx).await } - flow::State::Selected(ref user, ref mut mailbox) => { + flow::State::Selected(ref user, ref mut mailbox, ref perm) => { let ctx = selected::SelectedContext { req: &cmd, server_capabilities: &self.server_capabilities, client_capabilities: &mut self.client_capabilities, user, mailbox, + perm, }; selected::dispatch(ctx).await } - flow::State::Examined(ref user, ref mut mailbox) => { - let ctx = examined::ExaminedContext { - req: &cmd, - server_capabilities: &self.server_capabilities, - client_capabilities: &mut self.client_capabilities, - user, - mailbox, - }; - examined::dispatch(ctx).await - } + flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")), flow::State::Logout => Response::build() .tag(cmd.tag.clone()) .message("No commands are allowed in the LOGOUT state.") @@ -88,15 +99,18 @@ impl Instance { e, cmd ); - return Response::build() + return ResponseOrIdle::Response(Response::build() .to_req(&cmd) .message( "Internal error, processing command triggered an illegal IMAP state transition", ) .bad() - .unwrap(); + .unwrap()); } - resp + match self.state { + flow::State::Idle(..) => ResponseOrIdle::Idle, + _ => ResponseOrIdle::Response(resp), + } } } -- cgit v1.2.3