From ca4c2e7505f28acad688705d45cc5c5dca1799c3 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 20 Jun 2022 18:09:20 +0200 Subject: WIP refactor --- src/imap/command.rs | 29 -------- src/imap/command/anonymous.rs | 23 ++++--- src/imap/command/authenticated.rs | 138 ++++++++++++++++++++------------------ src/imap/command/mod.rs | 3 + src/imap/command/selected.rs | 43 ++++++++++-- src/imap/flow.rs | 17 +++-- src/imap/mod.rs | 26 +++---- src/imap/session.rs | 92 +++++++------------------ src/server.rs | 13 ++-- 9 files changed, 181 insertions(+), 203 deletions(-) delete mode 100644 src/imap/command.rs create mode 100644 src/imap/command/mod.rs (limited to 'src') diff --git a/src/imap/command.rs b/src/imap/command.rs deleted file mode 100644 index adf2f90..0000000 --- a/src/imap/command.rs +++ /dev/null @@ -1,29 +0,0 @@ -use anyhow::{Error, Result}; -use boitalettres::errors::Error as BalError; -use boitalettres::proto::{Request, Response}; -use imap_codec::types::core::{AString, Tag}; -use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; -use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; -use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; -use imap_codec::types::sequence::SequenceSet; - -use crate::mailbox::Mailbox; -use crate::session; - -pub struct Command<'a> { - tag: Tag, - session: &'a mut session::Instance, -} - -// @FIXME better handle errors, our conversions are bad due to my fork of BàL -// @FIXME store the IMAP state in the session as an enum. -impl<'a> Command<'a> { - pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self { - Self { tag, session } - } - - - - - -} diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs index e4222f7..55e701b 100644 --- a/src/imap/command/anonymous.rs +++ b/src/imap/command/anonymous.rs @@ -1,23 +1,28 @@ -use boitalettres::proto::{Request, Response}; -use crate::login::ArcLoginProvider; -use crate::imap::Context; +use anyhow::{Result, Error}; +use boitalettres::proto::Response; +use imap_codec::types::command::CommandBody; +use imap_codec::types::core::AString; +use imap_codec::types::response::{Capability, Data, Response as ImapRes, Status}; + +use crate::imap::flow; +use crate::imap::session::InnerContext; //--- dispatching -pub async fn dispatch(ctx: Context) -> Result { +pub async fn dispatch<'a>(ctx: &'a InnerContext<'a>) -> Result { match ctx.req.body { - CommandBody::Capability => anonymous::capability(ctx).await, - CommandBody::Login { username, password } => anonymous::login(ctx, username, password).await, + CommandBody::Capability => capability(ctx).await, + CommandBody::Login { username, password } => login(ctx, username, password).await, _ => Status::no(Some(ctx.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.") .map(|s| vec![ImapRes::Status(s)]) .map_err(Error::msg), } } -//--- Command controllers +//--- Command controllers, private -pub async fn capability(ctx: Context) -> Result { +async fn capability<'a>(ctx: InnerContext<'a>) -> Result { let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; let res = vec![ ImapRes::Data(Data::Capability(capabilities)), @@ -29,7 +34,7 @@ pub async fn capability(ctx: Context) -> Result { Ok(res) } -pub async fn login(ctx: Context, username: AString, password: AString) -> Result { +async fn login<'a>(ctx: InnerContext<'a>, username: AString, password: AString) -> Result { let (u, p) = (String::try_from(username)?, String::try_from(password)?); tracing::info!(user = %u, "command.login"); diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 188fc56..49bfa9c 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -1,91 +1,101 @@ -pub async fn dispatch(ctx: Context) -> Result { - match req.body { - CommandBody::Capability => anonymous::capability().await, // we use the same implem for now - CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await, - CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| { - self.state.select(mailbox); - Ok(response) - }), - _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - }, + +use anyhow::{Result, Error}; +use boitalettres::proto::Response; +use imap_codec::types::command::CommandBody; +use imap_codec::types::core::Tag; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Code, Data, Response as ImapRes, Status}; + +use crate::imap::command::anonymous; +use crate::imap::session::InnerContext; +use crate::imap::flow::User; +use crate::mailbox::Mailbox; + +pub async fn dispatch<'a>(inner: &'a InnerContext<'a>, user: &'a User) -> Result { + let ctx = StateContext { inner, user, tag: &inner.req.tag }; + + match ctx.req.body.as_ref() { + CommandBody::Lsub { reference, mailbox_wildcard, } => ctx.lsub(reference, mailbox_wildcard).await, + CommandBody::List { reference, mailbox_wildcard, } => ctx.list(reference, mailbox_wildcard).await, + CommandBody::Select { mailbox } => ctx.select(mailbox).await, + _ => anonymous::dispatch(ctx.inner).await, + } +} + +// --- PRIVATE --- + +struct StateContext<'a> { + inner: InnerContext<'a>, + user: &'a User, + tag: &'a Tag, } - pub async fn lsub( +impl<'a> StateContext<'a> { + async fn lsub( &self, reference: MailboxCodec, mailbox_wildcard: ListMailbox, - ) -> Result { + ) -> Result { Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) + Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, + )]) } - pub async fn list( + async fn list( &self, reference: MailboxCodec, mailbox_wildcard: ListMailbox, - ) -> Result { - Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) + ) -> Result { + Ok(vec![ + ImapRes::Status(Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?), + ]) } /* - * TRACE BEGIN --- - - - Example: C: A142 SELECT INBOX - S: * 172 EXISTS - S: * 1 RECENT - S: * OK [UNSEEN 12] Message 12 is first unseen - S: * OK [UIDVALIDITY 3857529045] UIDs valid - S: * OK [UIDNEXT 4392] Predicted next UID - S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) - S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited - S: A142 OK [READ-WRITE] SELECT completed - - * TRACE END --- - */ - pub async fn select(&mut self, mailbox: MailboxCodec) -> Result { + * TRACE BEGIN --- + + + Example: C: A142 SELECT INBOX + S: * 172 EXISTS + S: * 1 RECENT + S: * OK [UNSEEN 12] Message 12 is first unseen + S: * OK [UIDVALIDITY 3857529045] UIDs valid + S: * OK [UIDNEXT 4392] Predicted next UID + S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) + S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited + S: A142 OK [READ-WRITE] SELECT completed + + * TRACE END --- + */ + async fn select(&self, mailbox: MailboxCodec) -> Result { let name = String::try_from(mailbox)?; - let user = match self.session.user.as_ref() { - Some(u) => u, - _ => { - return Ok(vec![ImapRes::Status( - Status::no(Some(self.tag.clone()), None, "Not implemented") - .map_err(Error::msg)?, - )]) - } - }; - let mut mb = Mailbox::new(&user.creds, name.clone())?; - tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected"); + let mut mb = Mailbox::new(self.user.creds, name.clone())?; + tracing::info!(username=%self.user.name, mailbox=%name, "mailbox.selected"); let sum = mb.summary().await?; tracing::trace!(summary=%sum, "mailbox.summary"); let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)]; - self.session.selected = Some(mb); + self.inner.state.select(mb)?; let r_unseen = Status::ok(None, Some(Code::Unseen(0)), "").map_err(Error::msg)?; - let r_permanentflags = Status::ok(None, Some(Code:: + //let r_permanentflags = Status::ok(None, Some(Code:: Ok(vec![ - ImapRes::Data(Data::Exists(0)), - ImapRes::Data(Data::Recent(0)), - ImapRes::Data(Data::Flags(vec![]), - ImapRes::Status(), - ImapRes::Status(), - ImapRes::Status() - Status::ok( - Some(self.tag.clone()), - Some(Code::ReadWrite), - "Select completed", - ) - .map_err(Error::msg)?, - )]) + ImapRes::Data(Data::Exists(0)), + ImapRes::Data(Data::Recent(0)), + ImapRes::Data(Data::Flags(vec![])), + /*ImapRes::Status(), + ImapRes::Status(), + ImapRes::Status(),*/ + Status::ok( + Some(self.tag.clone()), + Some(Code::ReadWrite), + "Select completed", + ) + .map_err(Error::msg)?, + ]) } +} diff --git a/src/imap/command/mod.rs b/src/imap/command/mod.rs new file mode 100644 index 0000000..c4fa4d8 --- /dev/null +++ b/src/imap/command/mod.rs @@ -0,0 +1,3 @@ +pub mod anonymous; +pub mod authenticated; +pub mod selected; diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index b320ec2..61e9c1a 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,10 +1,45 @@ + +use anyhow::{Result, Error}; +use boitalettres::proto::Response; +use imap_codec::types::command::CommandBody; +use imap_codec::types::core::Tag; +use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; +use imap_codec::types::response::{Response as ImapRes, Status}; +use imap_codec::types::sequence::SequenceSet; + +use crate::imap::command::authenticated; +use crate::imap::session::InnerContext; +use crate::imap::flow::User; +use crate::mailbox::Mailbox; + +pub async fn dispatch<'a>(inner: &'a InnerContext<'a>, user: &'a User, mailbox: &'a Mailbox) -> Result { + let ctx = StateContext { inner, user, mailbox, tag: &inner.req.tag }; + + match ctx.inner.req.body { + CommandBody::Fetch { sequence_set, attributes, uid, } => ctx.fetch(sequence_set, attributes, uid).await, + _ => authenticated::dispatch(inner, user).await, + } +} + +// --- PRIVATE --- + +struct StateContext<'a> { + inner: InnerContext<'a>, + user: &'a User, + mailbox: &'a Mailbox, + tag: &'a Tag, +} + + +impl<'a> StateContext<'a> { pub async fn fetch( &self, sequence_set: SequenceSet, attributes: MacroOrFetchAttributes, uid: bool, - ) -> Result { - Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) + ) -> Result { + Ok(vec![ + ImapRes::Status(Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?), + ]) } +} diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 8ba75bc..6d8b581 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,3 +1,6 @@ + + +use crate::login::Credentials; use crate::mailbox::Mailbox; pub struct User { @@ -19,9 +22,9 @@ pub enum Error { // https://datatracker.ietf.org/doc/html/rfc3501#page-13 impl State { pub fn authenticate(&mut self, user: User) -> Result<(), Error> { - self = match state { + self = match self { State::NotAuthenticated => State::Authenticated(user), - _ => return Err(ForbiddenTransition), + _ => return Err(Error::ForbiddenTransition), }; Ok(()) } @@ -32,17 +35,17 @@ impl State { } pub fn select(&mut self, mailbox: Mailbox) -> Result<(), Error> { - self = match state { + self = match self { State::Authenticated(user) => State::Selected(user, mailbox), - _ => return Err(ForbiddenTransition), + _ => return Err(Error::ForbiddenTransition), }; Ok(()) } - pub fn unselect(state: State) -> Result<(), Error> { - self = match state { + pub fn unselect(&mut self) -> Result<(), Error> { + self = match self { State::Selected(user, _) => State::Authenticated(user), - _ => return Err(ForbiddenTransition), + _ => return Err(Error::ForbiddenTransition), }; Ok(()) } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 45480ce..7e042d5 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -2,39 +2,41 @@ mod session; mod flow; mod command; -use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; +use boitalettres::server::accept::addr::AddrIncoming; use boitalettres::server::accept::addr::AddrStream; use boitalettres::server::Server as ImapServer; use futures::future::BoxFuture; use futures::future::FutureExt; +use tokio::sync::watch; use tower::Service; -use crate::LoginProvider; +use crate::login::ArcLoginProvider; +use crate::config::ImapConfig; /// Server is a thin wrapper to register our Services in BàL -pub struct Server(ImapServer); +pub struct Server(ImapServer); pub async fn new( config: ImapConfig, - login: Arc, + login: ArcLoginProvider, ) -> Result { //@FIXME add a configuration parameter let incoming = AddrIncoming::new(config.bind_addr).await?; - let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone())); + tracing::info!("IMAP activated, will listen on {:#}", imap.incoming.local_addr); - tracing::info!("IMAP activated, will listen on {:#}", self.imap.incoming.local_addr); - Server(imap) + let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); + Ok(Server(imap)) } impl Server { - pub async fn run(&self, mut must_exit: watch::Receiver) -> Result<()> { + pub async fn run(self, mut must_exit: watch::Receiver) -> Result<()> { tracing::info!("IMAP started!"); tokio::select! { - s = self => s?, + s = self.0 => s?, _ = must_exit.changed() => tracing::info!("Stopped IMAP server"), } @@ -47,10 +49,10 @@ impl Server { /// Instance is the main Tokio Tower service that we register in BàL. /// It receives new connection demands and spawn a dedicated service. struct Instance { - login_provider: Arc, + login_provider: ArcLoginProvider, } impl Instance { - pub fn new(login_provider: Arc) -> Self { + pub fn new(login_provider: ArcLoginProvider) -> Self { Self { login_provider } } } @@ -78,7 +80,7 @@ struct Connection { session: session::Manager, } impl Connection { - pub fn new(login_provider: Arc) -> Self { + pub fn new(login_provider: ArcLoginProvider) -> Self { Self { session: session::Manager::new(login_provider), } diff --git a/src/imap/session.rs b/src/imap/session.rs index 2035634..fccd4bf 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,19 +1,15 @@ -use std::sync::Arc; - use anyhow::Error; use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; use futures::future::FutureExt; -use imap_codec::types::command::CommandBody; -use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; +use imap_codec::types::response::{Response as ImapRes, Status}; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::{mpsc, oneshot}; -use crate::command::{anonymous,authenticated,selected}; -use crate::login::Credentials; -use crate::mailbox::Mailbox; -use crate::LoginProvider; +use crate::imap::command::{anonymous,authenticated,selected}; +use crate::imap::flow; +use crate::login::ArcLoginProvider; /* This constant configures backpressure in the system, * or more specifically, how many pipelined messages are allowed @@ -89,10 +85,10 @@ impl Manager { //----- -pub struct Context<'a> { +pub struct InnerContext<'a> { req: &'a Request, - state: &'a mut flow::State, - login: ArcLoginProvider, + state: &'a flow::State, + login: &'a ArcLoginProvider, } pub struct Instance { @@ -105,7 +101,7 @@ impl Instance { fn new( login_provider: ArcLoginProvider, rx: mpsc::Receiver, - ) -> Self { + ) -> Self { Self { login_provider, rx, @@ -122,80 +118,38 @@ impl Instance { tracing::debug!("starting runner"); while let Some(msg) = self.rx.recv().await { - let ctx = Context { req: &msg.req, state: &mut self.state, login: self.login_provider }; + let ctx = InnerContext { req: &msg.req, state: &self.state, login: &self.login_provider }; // Command behavior is modulated by the state. // To prevent state error, we handle the same command in separate code path depending // on the State. - let cmd_res = match self.state { - flow::State::NotAuthenticated => anonymous::dispatch(ctx).await, - flow::State::Authenticated(user) => authenticated::dispatch(ctx).await, - flow::State::Selected(user, mailbox) => selected::dispatch(ctx).await, + let cmd_res = match ctx.state { + flow::State::NotAuthenticated => anonymous::dispatch(&ctx).await, + flow::State::Authenticated(user) => authenticated::dispatch(&ctx, &user).await, + flow::State::Selected(user, mailbox) => selected::dispatch(&ctx, &user, &mailbox).await, flow::State::Logout => Status::bad(Some(ctx.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.") .map(|s| vec![ImapRes::Status(s)]) .map_err(Error::msg), }; -/* - - match req.body { - CommandBody::Capability => anonymous::capability().await, - CommandBody::Login { username, password } => anonymous::login(self.login_provider, username, password).await.and_then(|(user, response)| { - self.state.authenticate(user)?; - Ok(response) - }, - _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - - }, - flow::State::Authenticated(user) => match req.body { - CommandBody::Capability => anonymous::capability().await, // we use the same implem for now - CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await, - CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| { - self.state.select(mailbox); - Ok(response) - }), - _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - }, - flow::State::Selected(user, mailbox) => match req.body { - CommandBody::Capability => anonymous::capability().await, // we use the same implem for now - CommandBody::Fetch { sequence_set, attributes, uid, } => selected::fetch(sequence_set, attributes, uid).await, - _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the SELECTED state.") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - }, - flow::State::Logout => Status::bad(Some(msg.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - } - */ - - let imap_res = match cmd_res { - Ok(new_state, imap_res) => { - self.state = new_state; - Ok(imap_res) - }, - Err(e) if Ok(be) = e.downcast::() => Err(be), + let imap_res = cmd_res.or_else(|e| match e.downcast::() { + Ok(be) => Err(be), Err(e) => { tracing::warn!(error=%e, "internal.error"); - Ok(Status::bad(Some(msg.req.tag.clone()), None, "Internal error") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(|e| BalError::Text(e.to_string()))) + Status::bad(Some(msg.req.tag.clone()), None, "Internal error") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(|e| BalError::Text(e.to_string())) } - }; + }); //@FIXME I think we should quit this thread on error and having our manager watch it, // and then abort the session as it is corrupted. msg.tx.send(imap_res).unwrap_or_else(|e| { tracing::warn!("failed to send imap response to manager: {:#?}", e) }); - } - - //@FIXME add more info about the runner - tracing::debug!("exiting runner"); } + + //@FIXME add more info about the runner + tracing::debug!("exiting runner"); +} } diff --git a/src/server.rs b/src/server.rs index acf9cf2..908ed11 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,21 +1,16 @@ use std::sync::Arc; -use boitalettres::server::accept::addr::AddrIncoming; -use boitalettres::server::accept::addr::AddrStream; -use boitalettres::server::Server as ImapServer; - use anyhow::{bail, Result}; use futures::{try_join, StreamExt}; use log::*; use rusoto_signature::Region; use tokio::sync::watch; -use tower::Service; use crate::config::*; use crate::lmtp::*; use crate::login::{ldap_provider::*, static_provider::*, *}; -use crate::mailbox::Mailbox; use crate::imap; +use crate::login::ArcLoginProvider; pub struct Server { lmtp_server: Option>, @@ -60,7 +55,7 @@ impl Server { } } -fn build(config: Config) -> Result<(Arc, Option, Option> { +fn build(config: Config) -> Result<(ArcLoginProvider, Option, Option)> { let s3_region = Region::Custom { name: config.aws_region.clone(), endpoint: config.s3_endpoint, @@ -70,7 +65,7 @@ fn build(config: Config) -> Result<(Arc, Option endpoint: config.k2v_endpoint, }; - let lp: Arc = match (config.login_static, config.login_ldap) { + let lp: ArcLoginProvider = match (config.login_static, config.login_ldap) { (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), (Some(_), Some(_)) => { @@ -79,7 +74,7 @@ fn build(config: Config) -> Result<(Arc, Option (None, None) => bail!("No login provider is set up in config file"), }; - Ok(lp, self.lmtp_config, self.imap_config) + Ok(lp, config.lmtp_config, config.imap_config) } pub fn watch_ctrl_c() -> (watch::Receiver, Arc>) { -- cgit v1.2.3