From 5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 17 Jun 2022 18:39:36 +0200 Subject: WIP Refactor, code is broken --- .gitignore | 2 + src/command.rs | 141 -------------------------- src/config.rs | 6 ++ src/imap/command.rs | 29 ++++++ src/imap/command/anonymous.rs | 60 ++++++++++++ src/imap/command/authenticated.rs | 91 +++++++++++++++++ src/imap/command/selected.rs | 10 ++ src/imap/flow.rs | 50 ++++++++++ src/imap/mod.rs | 100 +++++++++++++++++++ src/imap/session.rs | 201 ++++++++++++++++++++++++++++++++++++++ src/login/mod.rs | 5 + src/main.rs | 4 +- src/server.rs | 33 +++---- src/service.rs | 62 ------------ src/session.rs | 166 ------------------------------- 15 files changed, 567 insertions(+), 393 deletions(-) delete mode 100644 src/command.rs create mode 100644 src/imap/command.rs create mode 100644 src/imap/command/anonymous.rs create mode 100644 src/imap/command/authenticated.rs create mode 100644 src/imap/command/selected.rs create mode 100644 src/imap/flow.rs create mode 100644 src/imap/mod.rs create mode 100644 src/imap/session.rs delete mode 100644 src/service.rs delete mode 100644 src/session.rs diff --git a/.gitignore b/.gitignore index c21e41a..79358d8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ .vimrc env.sh mailrage.toml +*.swo +*.swp diff --git a/src/command.rs b/src/command.rs deleted file mode 100644 index 77b780a..0000000 --- a/src/command.rs +++ /dev/null @@ -1,141 +0,0 @@ -use anyhow::{Error, Result}; -use boitalettres::errors::Error as BalError; -use boitalettres::proto::{Request, Response}; -use imap_codec::types::core::{AString, Tag}; -use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; -use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; -use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; -use imap_codec::types::sequence::SequenceSet; - -use crate::mailbox::Mailbox; -use crate::session; - -pub struct Command<'a> { - tag: Tag, - session: &'a mut session::Instance, -} - -impl<'a> Command<'a> { - pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self { - Self { tag, session } - } - - pub async fn capability(&self) -> Result { - let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; - let res = vec![ - ImapRes::Data(Data::Capability(capabilities)), - ImapRes::Status( - Status::ok(Some(self.tag.clone()), None, "Server capabilities") - .map_err(Error::msg)?, - ), - ]; - Ok(res) - } - - pub async fn login(&mut self, username: AString, password: AString) -> Result { - let (u, p) = (String::try_from(username)?, String::try_from(password)?); - tracing::info!(user = %u, "command.login"); - - let creds = match self.session.login_provider.login(&u, &p).await { - Err(e) => { - tracing::debug!(error=%e, "authentication failed"); - return Ok(vec![ImapRes::Status( - Status::no(Some(self.tag.clone()), None, "Authentication failed") - .map_err(Error::msg)?, - )]); - } - Ok(c) => c, - }; - - self.session.user = Some(session::User { - creds, - name: u.clone(), - }); - - tracing::info!(username=%u, "connected"); - Ok(vec![ - //@FIXME we could send a capability status here too - ImapRes::Status( - Status::ok(Some(self.tag.clone()), None, "completed").map_err(Error::msg)?, - ), - ]) - } - - pub async fn lsub( - &self, - reference: MailboxCodec, - mailbox_wildcard: ListMailbox, - ) -> Result { - Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) - } - - pub async fn list( - &self, - reference: MailboxCodec, - mailbox_wildcard: ListMailbox, - ) -> Result { - Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) - } - - /* - * TRACE BEGIN --- - - - Example: C: A142 SELECT INBOX - S: * 172 EXISTS - S: * 1 RECENT - S: * OK [UNSEEN 12] Message 12 is first unseen - S: * OK [UIDVALIDITY 3857529045] UIDs valid - S: * OK [UIDNEXT 4392] Predicted next UID - S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) - S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited - S: A142 OK [READ-WRITE] SELECT completed - - * TRACE END --- - */ - pub async fn select(&mut self, mailbox: MailboxCodec) -> Result { - let name = String::try_from(mailbox)?; - let user = match self.session.user.as_ref() { - Some(u) => u, - _ => { - return Ok(vec![ImapRes::Status( - Status::no(Some(self.tag.clone()), None, "Not implemented") - .map_err(Error::msg)?, - )]) - } - }; - - let mut mb = Mailbox::new(&user.creds, name.clone())?; - tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected"); - - let sum = mb.summary().await?; - tracing::trace!(summary=%sum, "mailbox.summary"); - - let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)]; - - self.session.selected = Some(mb); - Ok(vec![ImapRes::Status( - Status::ok( - Some(self.tag.clone()), - Some(Code::ReadWrite), - "Select completed", - ) - .map_err(Error::msg)?, - )]) - } - - pub async fn fetch( - &self, - sequence_set: SequenceSet, - attributes: MacroOrFetchAttributes, - uid: bool, - ) -> Result { - Ok(vec![ImapRes::Status( - Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, - )]) - } -} diff --git a/src/config.rs b/src/config.rs index 5afcabd..074c192 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,6 +16,7 @@ pub struct Config { pub login_ldap: Option, pub lmtp: Option, + pub imap: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -71,6 +72,11 @@ pub struct LmtpConfig { pub hostname: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ImapConfig { + pub bind_addr: SocketAddr, +} + pub fn read_config(config_file: PathBuf) -> Result { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/imap/command.rs b/src/imap/command.rs new file mode 100644 index 0000000..adf2f90 --- /dev/null +++ b/src/imap/command.rs @@ -0,0 +1,29 @@ +use anyhow::{Error, Result}; +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use imap_codec::types::core::{AString, Tag}; +use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; +use imap_codec::types::sequence::SequenceSet; + +use crate::mailbox::Mailbox; +use crate::session; + +pub struct Command<'a> { + tag: Tag, + session: &'a mut session::Instance, +} + +// @FIXME better handle errors, our conversions are bad due to my fork of BàL +// @FIXME store the IMAP state in the session as an enum. +impl<'a> Command<'a> { + pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self { + Self { tag, session } + } + + + + + +} diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs new file mode 100644 index 0000000..e4222f7 --- /dev/null +++ b/src/imap/command/anonymous.rs @@ -0,0 +1,60 @@ + +use boitalettres::proto::{Request, Response}; +use crate::login::ArcLoginProvider; +use crate::imap::Context; + +//--- dispatching + +pub async fn dispatch(ctx: Context) -> Result { + match ctx.req.body { + CommandBody::Capability => anonymous::capability(ctx).await, + CommandBody::Login { username, password } => anonymous::login(ctx, username, password).await, + _ => Status::no(Some(ctx.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + } +} + +//--- Command controllers + +pub async fn capability(ctx: Context) -> Result { + let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; + let res = vec![ + ImapRes::Data(Data::Capability(capabilities)), + ImapRes::Status( + Status::ok(Some(ctx.req.tag.clone()), None, "Server capabilities") + .map_err(Error::msg)?, + ), + ]; + Ok(res) +} + +pub async fn login(ctx: Context, username: AString, password: AString) -> Result { + let (u, p) = (String::try_from(username)?, String::try_from(password)?); + tracing::info!(user = %u, "command.login"); + + let creds = match ctx.login_provider.login(&u, &p).await { + Err(e) => { + tracing::debug!(error=%e, "authentication failed"); + return Ok(vec![ImapRes::Status( + Status::no(Some(ctx.req.tag.clone()), None, "Authentication failed") + .map_err(Error::msg)?, + )]); + } + Ok(c) => c, + }; + + let user = flow::User { + creds, + name: u.clone(), + }; + ctx.state.authenticate(user)?; + + tracing::info!(username=%u, "connected"); + Ok(vec![ + //@FIXME we could send a capability status here too + ImapRes::Status( + Status::ok(Some(ctx.req.tag.clone()), None, "completed").map_err(Error::msg)?, + ), + ]) +} diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs new file mode 100644 index 0000000..188fc56 --- /dev/null +++ b/src/imap/command/authenticated.rs @@ -0,0 +1,91 @@ +pub async fn dispatch(ctx: Context) -> Result { + match req.body { + CommandBody::Capability => anonymous::capability().await, // we use the same implem for now + CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await, + CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await, + CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| { + self.state.select(mailbox); + Ok(response) + }), + _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + }, +} + + pub async fn lsub( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result { + Ok(vec![ImapRes::Status( + Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, + )]) + } + + pub async fn list( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result { + Ok(vec![ImapRes::Status( + Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, + )]) + } + + /* + * TRACE BEGIN --- + + + Example: C: A142 SELECT INBOX + S: * 172 EXISTS + S: * 1 RECENT + S: * OK [UNSEEN 12] Message 12 is first unseen + S: * OK [UIDVALIDITY 3857529045] UIDs valid + S: * OK [UIDNEXT 4392] Predicted next UID + S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) + S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited + S: A142 OK [READ-WRITE] SELECT completed + + * TRACE END --- + */ + pub async fn select(&mut self, mailbox: MailboxCodec) -> Result { + let name = String::try_from(mailbox)?; + let user = match self.session.user.as_ref() { + Some(u) => u, + _ => { + return Ok(vec![ImapRes::Status( + Status::no(Some(self.tag.clone()), None, "Not implemented") + .map_err(Error::msg)?, + )]) + } + }; + + let mut mb = Mailbox::new(&user.creds, name.clone())?; + tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected"); + + let sum = mb.summary().await?; + tracing::trace!(summary=%sum, "mailbox.summary"); + + let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)]; + + self.session.selected = Some(mb); + + let r_unseen = Status::ok(None, Some(Code::Unseen(0)), "").map_err(Error::msg)?; + let r_permanentflags = Status::ok(None, Some(Code:: + + Ok(vec![ + ImapRes::Data(Data::Exists(0)), + ImapRes::Data(Data::Recent(0)), + ImapRes::Data(Data::Flags(vec![]), + ImapRes::Status(), + ImapRes::Status(), + ImapRes::Status() + Status::ok( + Some(self.tag.clone()), + Some(Code::ReadWrite), + "Select completed", + ) + .map_err(Error::msg)?, + )]) + } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs new file mode 100644 index 0000000..b320ec2 --- /dev/null +++ b/src/imap/command/selected.rs @@ -0,0 +1,10 @@ + pub async fn fetch( + &self, + sequence_set: SequenceSet, + attributes: MacroOrFetchAttributes, + uid: bool, + ) -> Result { + Ok(vec![ImapRes::Status( + Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?, + )]) + } diff --git a/src/imap/flow.rs b/src/imap/flow.rs new file mode 100644 index 0000000..8ba75bc --- /dev/null +++ b/src/imap/flow.rs @@ -0,0 +1,50 @@ +use crate::mailbox::Mailbox; + +pub struct User { + pub name: String, + pub creds: Credentials, +} + +pub enum State { + NotAuthenticated, + Authenticated(User), + Selected(User, Mailbox), + Logout +} +pub enum Error { + ForbiddenTransition, +} + +// See RFC3501 section 3. +// https://datatracker.ietf.org/doc/html/rfc3501#page-13 +impl State { + pub fn authenticate(&mut self, user: User) -> Result<(), Error> { + self = match state { + State::NotAuthenticated => State::Authenticated(user), + _ => return Err(ForbiddenTransition), + }; + Ok(()) + } + + pub fn logout(&mut self) -> Self { + self = State::Logout; + Ok(()) + } + + pub fn select(&mut self, mailbox: Mailbox) -> Result<(), Error> { + self = match state { + State::Authenticated(user) => State::Selected(user, mailbox), + _ => return Err(ForbiddenTransition), + }; + Ok(()) + } + + pub fn unselect(state: State) -> Result<(), Error> { + self = match state { + State::Selected(user, _) => State::Authenticated(user), + _ => return Err(ForbiddenTransition), + }; + Ok(()) + } +} + diff --git a/src/imap/mod.rs b/src/imap/mod.rs new file mode 100644 index 0000000..45480ce --- /dev/null +++ b/src/imap/mod.rs @@ -0,0 +1,100 @@ +mod session; +mod flow; +mod command; + +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyhow::Result; +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use boitalettres::server::accept::addr::AddrStream; +use boitalettres::server::Server as ImapServer; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use tower::Service; + +use crate::LoginProvider; + +/// Server is a thin wrapper to register our Services in BàL +pub struct Server(ImapServer); +pub async fn new( + config: ImapConfig, + login: Arc, +) -> Result { + + //@FIXME add a configuration parameter + let incoming = AddrIncoming::new(config.bind_addr).await?; + let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone())); + + tracing::info!("IMAP activated, will listen on {:#}", self.imap.incoming.local_addr); + Server(imap) +} +impl Server { + pub async fn run(&self, mut must_exit: watch::Receiver) -> Result<()> { + tracing::info!("IMAP started!"); + tokio::select! { + s = self => s?, + _ = must_exit.changed() => tracing::info!("Stopped IMAP server"), + } + + Ok(()) + } +} + +//--- + +/// Instance is the main Tokio Tower service that we register in BàL. +/// It receives new connection demands and spawn a dedicated service. +struct Instance { + login_provider: Arc, +} +impl Instance { + pub fn new(login_provider: Arc) -> Self { + Self { login_provider } + } +} +impl<'a> Service<&'a AddrStream> for Instance { + type Response = Connection; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, addr: &'a AddrStream) -> Self::Future { + tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); + let lp = self.login_provider.clone(); + async { Ok(Connection::new(lp)) }.boxed() + } +} + +//--- + +/// Connection is the per-connection Tokio Tower service we register in BàL. +/// It handles a single TCP connection, and thus has a business logic. +struct Connection { + session: session::Manager, +} +impl Connection { + pub fn new(login_provider: Arc) -> Self { + Self { + session: session::Manager::new(login_provider), + } + } +} +impl Service for Connection { + type Response = Response; + type Error = BalError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + tracing::debug!("Got request: {:#?}", req); + self.session.process(req) + } +} diff --git a/src/imap/session.rs b/src/imap/session.rs new file mode 100644 index 0000000..2035634 --- /dev/null +++ b/src/imap/session.rs @@ -0,0 +1,201 @@ +use std::sync::Arc; + +use anyhow::Error; +use boitalettres::errors::Error as BalError; +use boitalettres::proto::{Request, Response}; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use imap_codec::types::command::CommandBody; +use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, oneshot}; + +use crate::command::{anonymous,authenticated,selected}; +use crate::login::Credentials; +use crate::mailbox::Mailbox; +use crate::LoginProvider; + +/* This constant configures backpressure in the system, + * or more specifically, how many pipelined messages are allowed + * before refusing them + */ +const MAX_PIPELINED_COMMANDS: usize = 10; + +struct Message { + req: Request, + tx: oneshot::Sender>, +} + +//----- + +pub struct Manager { + tx: mpsc::Sender, +} + +impl Manager { + pub fn new(login_provider: ArcLoginProvider) -> Self { + let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { + let mut instance = Instance::new(login_provider, rx); + instance.start().await; + }); + Self { tx } + } + + pub fn process(&self, req: Request) -> BoxFuture<'static, Result> { + let (tx, rx) = oneshot::channel(); + let tag = req.tag.clone(); + let msg = Message { req, tx }; + + // We use try_send on a bounded channel to protect the daemons from DoS. + // Pipelining requests in IMAP are a special case: they should not occure often + // and in a limited number (like 3 requests). Someone filling the channel + // will probably be malicious so we "rate limit" them. + match self.tx.try_send(msg) { + Ok(()) => (), + Err(TrySendError::Full(_)) => { + return async { + Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(|e| BalError::Text(e.to_string())) + } + .boxed() + } + Err(TrySendError::Closed(_)) => { + return async { + Status::bad(Some(tag), None, "The session task has exited") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(|e| BalError::Text(e.to_string())) + } + .boxed() + } + }; + + // @FIXME add a timeout, handle a session that fails. + async { + match rx.await { + Ok(r) => r, + Err(e) => { + tracing::warn!("Got error {:#?}", e); + Status::bad(Some(tag), None, "No response from the session handler") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(|e| BalError::Text(e.to_string())) + } + } + } + .boxed() + } +} + +//----- + +pub struct Context<'a> { + req: &'a Request, + state: &'a mut flow::State, + login: ArcLoginProvider, +} + +pub struct Instance { + rx: mpsc::Receiver, + + pub login_provider: ArcLoginProvider, + pub state: flow::State, +} +impl Instance { + fn new( + login_provider: ArcLoginProvider, + rx: mpsc::Receiver, + ) -> Self { + Self { + login_provider, + rx, + state: flow::State::NotAuthenticated, + } + } + + //@FIXME add a function that compute the runner's name from its local info + // to ease debug + // fn name(&self) -> String { } + + async fn start(&mut self) { + //@FIXME add more info about the runner + tracing::debug!("starting runner"); + + while let Some(msg) = self.rx.recv().await { + let ctx = Context { req: &msg.req, state: &mut self.state, login: self.login_provider }; + + // Command behavior is modulated by the state. + // To prevent state error, we handle the same command in separate code path depending + // on the State. + let cmd_res = match self.state { + flow::State::NotAuthenticated => anonymous::dispatch(ctx).await, + flow::State::Authenticated(user) => authenticated::dispatch(ctx).await, + flow::State::Selected(user, mailbox) => selected::dispatch(ctx).await, + flow::State::Logout => Status::bad(Some(ctx.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + }; + +/* + + match req.body { + CommandBody::Capability => anonymous::capability().await, + CommandBody::Login { username, password } => anonymous::login(self.login_provider, username, password).await.and_then(|(user, response)| { + self.state.authenticate(user)?; + Ok(response) + }, + _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + + }, + flow::State::Authenticated(user) => match req.body { + CommandBody::Capability => anonymous::capability().await, // we use the same implem for now + CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await, + CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await, + CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| { + self.state.select(mailbox); + Ok(response) + }), + _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + }, + flow::State::Selected(user, mailbox) => match req.body { + CommandBody::Capability => anonymous::capability().await, // we use the same implem for now + CommandBody::Fetch { sequence_set, attributes, uid, } => selected::fetch(sequence_set, attributes, uid).await, + _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the SELECTED state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + }, + flow::State::Logout => Status::bad(Some(msg.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(Error::msg), + } + */ + + let imap_res = match cmd_res { + Ok(new_state, imap_res) => { + self.state = new_state; + Ok(imap_res) + }, + Err(e) if Ok(be) = e.downcast::() => Err(be), + Err(e) => { + tracing::warn!(error=%e, "internal.error"); + Ok(Status::bad(Some(msg.req.tag.clone()), None, "Internal error") + .map(|s| vec![ImapRes::Status(s)]) + .map_err(|e| BalError::Text(e.to_string()))) + } + }; + + //@FIXME I think we should quit this thread on error and having our manager watch it, + // and then abort the session as it is corrupted. + msg.tx.send(imap_res).unwrap_or_else(|e| { + tracing::warn!("failed to send imap response to manager: {:#?}", e) + }); + } + + //@FIXME add more info about the runner + tracing::debug!("exiting runner"); + } +} diff --git a/src/login/mod.rs b/src/login/mod.rs index c0e9032..1d5d634 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -2,6 +2,7 @@ pub mod ldap_provider; pub mod static_provider; use std::collections::BTreeMap; +use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; @@ -29,6 +30,10 @@ pub trait LoginProvider { async fn public_login(&self, email: &str) -> Result; } +/// ArcLoginProvider is simply an alias on a structure that is used +/// in many places in the code +pub type ArcLoginProvider = Arc; + /// The struct Credentials represent all of the necessary information to interact /// with a user account's data after they are logged in. #[derive(Clone, Debug)] diff --git a/src/main.rs b/src/main.rs index 11a0a89..9270817 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,12 @@ mod bayou; -mod command; mod config; mod cryptoblob; +mod imap; mod lmtp; mod login; mod mail_ident; mod mailbox; mod server; -mod service; -mod session; mod time; mod uidindex; diff --git a/src/server.rs b/src/server.rs index 5e9eb26..acf9cf2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,31 +15,24 @@ use crate::config::*; use crate::lmtp::*; use crate::login::{ldap_provider::*, static_provider::*, *}; use crate::mailbox::Mailbox; -use crate::service; +use crate::imap; pub struct Server { lmtp_server: Option>, - imap_server: ImapServer, + imap_server: Option, } impl Server { pub async fn new(config: Config) -> Result { - let lmtp_config = config.lmtp.clone(); //@FIXME - let login = authenticator(config)?; + let (login, lmtp_conf, imap_conf) = build(config)?; - let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone())); + let lmtp_server = lmtp_conf.map(|cfg| LmtpServer::new(cfg, login.clone())); + let imap_server = imap_conf.map(|cfg| imap::new(cfg, login.clone())); - let incoming = AddrIncoming::new("127.0.0.1:4567").await?; - let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone())); - - Ok(Self { - lmtp_server: lmtp, - imap_server: imap, - }) + Ok(Self { lmtp_server, imap_server }) } pub async fn run(self) -> Result<()> { - //tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr); tracing::info!("Starting Aerogramme..."); let (exit_signal, provoke_exit) = watch_ctrl_c(); @@ -55,14 +48,11 @@ impl Server { Some(s) => s.run(exit_signal.clone()).await, } }, - //@FIXME handle ctrl + c async { - let mut must_exit = exit_signal.clone(); - tokio::select! { - s = self.imap_server => s?, - _ = must_exit.changed() => tracing::info!("IMAP server received CTRL+C, exiting."), + match self.imap_server.as_ref() { + None => Ok(()), + Some(s) => s.run(exit_signal.clone()).await, } - Ok(()) } )?; @@ -70,7 +60,7 @@ impl Server { } } -fn authenticator(config: Config) -> Result> { +fn build(config: Config) -> Result<(Arc, Option, Option> { let s3_region = Region::Custom { name: config.aws_region.clone(), endpoint: config.s3_endpoint, @@ -88,7 +78,8 @@ fn authenticator(config: Config) -> Result> } (None, None) => bail!("No login provider is set up in config file"), }; - Ok(lp) + + Ok(lp, self.lmtp_config, self.imap_config) } pub fn watch_ctrl_c() -> (watch::Receiver, Arc>) { diff --git a/src/service.rs b/src/service.rs deleted file mode 100644 index ce272a3..0000000 --- a/src/service.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::sync::Arc; -use std::task::{Context, Poll}; - -use anyhow::Result; -use boitalettres::errors::Error as BalError; -use boitalettres::proto::{Request, Response}; -use boitalettres::server::accept::addr::AddrStream; -use futures::future::BoxFuture; -use futures::future::FutureExt; -use tower::Service; - -use crate::session; -use crate::LoginProvider; - -pub struct Instance { - login_provider: Arc, -} -impl Instance { - pub fn new(login_provider: Arc) -> Self { - Self { login_provider } - } -} -impl<'a> Service<&'a AddrStream> for Instance { - type Response = Connection; - type Error = anyhow::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, addr: &'a AddrStream) -> Self::Future { - tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); - let lp = self.login_provider.clone(); - async { Ok(Connection::new(lp)) }.boxed() - } -} - -pub struct Connection { - session: session::Manager, -} -impl Connection { - pub fn new(login_provider: Arc) -> Self { - Self { - session: session::Manager::new(login_provider), - } - } -} -impl Service for Connection { - type Response = Response; - type Error = BalError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - tracing::debug!("Got request: {:#?}", req); - self.session.process(req) - } -} diff --git a/src/session.rs b/src/session.rs deleted file mode 100644 index c74d3cd..0000000 --- a/src/session.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::sync::Arc; - -use anyhow::Error; -use boitalettres::errors::Error as BalError; -use boitalettres::proto::{Request, Response}; -use futures::future::BoxFuture; -use futures::future::FutureExt; -use imap_codec::types::command::CommandBody; -use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{mpsc, oneshot}; - -use crate::command; -use crate::login::Credentials; -use crate::mailbox::Mailbox; -use crate::LoginProvider; - -/* This constant configures backpressure in the system, - * or more specifically, how many pipelined messages are allowed - * before refusing them - */ -const MAX_PIPELINED_COMMANDS: usize = 10; - -struct Message { - req: Request, - tx: oneshot::Sender>, -} - -pub struct Manager { - tx: mpsc::Sender, -} - -//@FIXME we should garbage collect the Instance when the Manager is destroyed. -impl Manager { - pub fn new(login_provider: Arc) -> Self { - let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { - let mut instance = Instance::new(login_provider, rx); - instance.start().await; - }); - Self { tx } - } - - pub fn process(&self, req: Request) -> BoxFuture<'static, Result> { - let (tx, rx) = oneshot::channel(); - let tag = req.tag.clone(); - let msg = Message { req, tx }; - - // We use try_send on a bounded channel to protect the daemons from DoS. - // Pipelining requests in IMAP are a special case: they should not occure often - // and in a limited number (like 3 requests). Someone filling the channel - // will probably be malicious so we "rate limit" them. - match self.tx.try_send(msg) { - Ok(()) => (), - Err(TrySendError::Full(_)) => { - return async { - Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(|e| BalError::Text(e.to_string())) - } - .boxed() - } - Err(TrySendError::Closed(_)) => { - return async { - Status::bad(Some(tag), None, "The session task has exited") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(|e| BalError::Text(e.to_string())) - } - .boxed() - } - }; - - // @FIXME add a timeout, handle a session that fails. - async { - match rx.await { - Ok(r) => r, - Err(e) => { - tracing::warn!("Got error {:#?}", e); - Status::bad(Some(tag), None, "No response from the session handler") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(|e| BalError::Text(e.to_string())) - } - } - } - .boxed() - } -} - -pub struct User { - pub name: String, - pub creds: Credentials, -} - -pub struct Instance { - rx: mpsc::Receiver, - - pub login_provider: Arc, - pub selected: Option, - pub user: Option, -} -impl Instance { - fn new( - login_provider: Arc, - rx: mpsc::Receiver, - ) -> Self { - Self { - login_provider, - rx, - selected: None, - user: None, - } - } - - //@FIXME add a function that compute the runner's name from its local info - // to ease debug - // fn name(&self) -> String { } - - async fn start(&mut self) { - //@FIXME add more info about the runner - tracing::debug!("starting runner"); - - while let Some(msg) = self.rx.recv().await { - let mut cmd = command::Command::new(msg.req.tag.clone(), self); - let res = match msg.req.body { - CommandBody::Capability => cmd.capability().await, - CommandBody::Login { username, password } => cmd.login(username, password).await, - CommandBody::Lsub { - reference, - mailbox_wildcard, - } => cmd.lsub(reference, mailbox_wildcard).await, - CommandBody::List { - reference, - mailbox_wildcard, - } => cmd.list(reference, mailbox_wildcard).await, - CommandBody::Select { mailbox } => cmd.select(mailbox).await, - CommandBody::Fetch { - sequence_set, - attributes, - uid, - } => cmd.fetch(sequence_set, attributes, uid).await, - _ => Status::bad(Some(msg.req.tag.clone()), None, "Unknown command") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(Error::msg), - }; - - let wrapped_res = res.or_else(|e| match e.downcast::() { - Ok(be) => Err(be), - Err(ae) => { - tracing::warn!(error=%ae, "internal.error"); - Status::bad(Some(msg.req.tag.clone()), None, "Internal error") - .map(|s| vec![ImapRes::Status(s)]) - .map_err(|e| BalError::Text(e.to_string())) - } - }); - - //@FIXME I think we should quit this thread on error and having our manager watch it, - // and then abort the session as it is corrupted. - msg.tx.send(wrapped_res).unwrap_or_else(|e| { - tracing::warn!("failed to send imap response to manager: {:#?}", e) - }); - } - - //@FIXME add more info about the runner - tracing::debug!("exiting runner"); - } -} -- cgit v1.2.3