aboutsummaryrefslogtreecommitdiff
path: root/src/imap
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2022-06-17 18:39:36 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-06-17 18:39:36 +0200
commit5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e (patch)
tree312708a97fcfcd273481f41b8bd6c878030573f4 /src/imap
parent41f1b02171cee36706d30cf24329ff12780d47fd (diff)
downloadaerogramme-5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e.tar.gz
aerogramme-5dd5ae8bcd6f88703bc483d7f8d5882fefad4e7e.zip
WIP Refactor, code is broken
Diffstat (limited to 'src/imap')
-rw-r--r--src/imap/command.rs29
-rw-r--r--src/imap/command/anonymous.rs60
-rw-r--r--src/imap/command/authenticated.rs91
-rw-r--r--src/imap/command/selected.rs10
-rw-r--r--src/imap/flow.rs50
-rw-r--r--src/imap/mod.rs100
-rw-r--r--src/imap/session.rs201
7 files changed, 541 insertions, 0 deletions
diff --git a/src/imap/command.rs b/src/imap/command.rs
new file mode 100644
index 0000000..adf2f90
--- /dev/null
+++ b/src/imap/command.rs
@@ -0,0 +1,29 @@
+use anyhow::{Error, Result};
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use imap_codec::types::core::{AString, Tag};
+use imap_codec::types::fetch_attributes::MacroOrFetchAttributes;
+use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
+use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
+use imap_codec::types::sequence::SequenceSet;
+
+use crate::mailbox::Mailbox;
+use crate::session;
+
+pub struct Command<'a> {
+ tag: Tag,
+ session: &'a mut session::Instance,
+}
+
+// @FIXME better handle errors, our conversions are bad due to my fork of BàL
+// @FIXME store the IMAP state in the session as an enum.
+impl<'a> Command<'a> {
+ pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self {
+ Self { tag, session }
+ }
+
+
+
+
+
+}
diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs
new file mode 100644
index 0000000..e4222f7
--- /dev/null
+++ b/src/imap/command/anonymous.rs
@@ -0,0 +1,60 @@
+
+use boitalettres::proto::{Request, Response};
+use crate::login::ArcLoginProvider;
+use crate::imap::Context;
+
+//--- dispatching
+
+pub async fn dispatch(ctx: Context) -> Result<Response> {
+ match ctx.req.body {
+ CommandBody::Capability => anonymous::capability(ctx).await,
+ CommandBody::Login { username, password } => anonymous::login(ctx, username, password).await,
+ _ => Status::no(Some(ctx.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ }
+}
+
+//--- Command controllers
+
+pub async fn capability(ctx: Context) -> Result<Response> {
+ let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
+ let res = vec![
+ ImapRes::Data(Data::Capability(capabilities)),
+ ImapRes::Status(
+ Status::ok(Some(ctx.req.tag.clone()), None, "Server capabilities")
+ .map_err(Error::msg)?,
+ ),
+ ];
+ Ok(res)
+}
+
+pub async fn login(ctx: Context, username: AString, password: AString) -> Result<Response> {
+ let (u, p) = (String::try_from(username)?, String::try_from(password)?);
+ tracing::info!(user = %u, "command.login");
+
+ let creds = match ctx.login_provider.login(&u, &p).await {
+ Err(e) => {
+ tracing::debug!(error=%e, "authentication failed");
+ return Ok(vec![ImapRes::Status(
+ Status::no(Some(ctx.req.tag.clone()), None, "Authentication failed")
+ .map_err(Error::msg)?,
+ )]);
+ }
+ Ok(c) => c,
+ };
+
+ let user = flow::User {
+ creds,
+ name: u.clone(),
+ };
+ ctx.state.authenticate(user)?;
+
+ tracing::info!(username=%u, "connected");
+ Ok(vec![
+ //@FIXME we could send a capability status here too
+ ImapRes::Status(
+ Status::ok(Some(ctx.req.tag.clone()), None, "completed").map_err(Error::msg)?,
+ ),
+ ])
+}
diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs
new file mode 100644
index 0000000..188fc56
--- /dev/null
+++ b/src/imap/command/authenticated.rs
@@ -0,0 +1,91 @@
+pub async fn dispatch(ctx: Context) -> Result<Response> {
+ match req.body {
+ CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
+ CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await,
+ CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await,
+ CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| {
+ self.state.select(mailbox);
+ Ok(response)
+ }),
+ _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ },
+}
+
+ pub async fn lsub(
+ &self,
+ reference: MailboxCodec,
+ mailbox_wildcard: ListMailbox,
+ ) -> Result<Response> {
+ Ok(vec![ImapRes::Status(
+ Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
+ )])
+ }
+
+ pub async fn list(
+ &self,
+ reference: MailboxCodec,
+ mailbox_wildcard: ListMailbox,
+ ) -> Result<Response> {
+ Ok(vec![ImapRes::Status(
+ Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
+ )])
+ }
+
+ /*
+ * TRACE BEGIN ---
+
+
+ Example: C: A142 SELECT INBOX
+ S: * 172 EXISTS
+ S: * 1 RECENT
+ S: * OK [UNSEEN 12] Message 12 is first unseen
+ S: * OK [UIDVALIDITY 3857529045] UIDs valid
+ S: * OK [UIDNEXT 4392] Predicted next UID
+ S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
+ S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
+ S: A142 OK [READ-WRITE] SELECT completed
+
+ * TRACE END ---
+ */
+ pub async fn select(&mut self, mailbox: MailboxCodec) -> Result<Response> {
+ let name = String::try_from(mailbox)?;
+ let user = match self.session.user.as_ref() {
+ Some(u) => u,
+ _ => {
+ return Ok(vec![ImapRes::Status(
+ Status::no(Some(self.tag.clone()), None, "Not implemented")
+ .map_err(Error::msg)?,
+ )])
+ }
+ };
+
+ let mut mb = Mailbox::new(&user.creds, name.clone())?;
+ tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected");
+
+ let sum = mb.summary().await?;
+ tracing::trace!(summary=%sum, "mailbox.summary");
+
+ let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)];
+
+ self.session.selected = Some(mb);
+
+ let r_unseen = Status::ok(None, Some(Code::Unseen(0)), "").map_err(Error::msg)?;
+ let r_permanentflags = Status::ok(None, Some(Code::
+
+ Ok(vec![
+ ImapRes::Data(Data::Exists(0)),
+ ImapRes::Data(Data::Recent(0)),
+ ImapRes::Data(Data::Flags(vec![]),
+ ImapRes::Status(),
+ ImapRes::Status(),
+ ImapRes::Status()
+ Status::ok(
+ Some(self.tag.clone()),
+ Some(Code::ReadWrite),
+ "Select completed",
+ )
+ .map_err(Error::msg)?,
+ )])
+ }
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
new file mode 100644
index 0000000..b320ec2
--- /dev/null
+++ b/src/imap/command/selected.rs
@@ -0,0 +1,10 @@
+ pub async fn fetch(
+ &self,
+ sequence_set: SequenceSet,
+ attributes: MacroOrFetchAttributes,
+ uid: bool,
+ ) -> Result<Response> {
+ Ok(vec![ImapRes::Status(
+ Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
+ )])
+ }
diff --git a/src/imap/flow.rs b/src/imap/flow.rs
new file mode 100644
index 0000000..8ba75bc
--- /dev/null
+++ b/src/imap/flow.rs
@@ -0,0 +1,50 @@
+use crate::mailbox::Mailbox;
+
+pub struct User {
+ pub name: String,
+ pub creds: Credentials,
+}
+
+pub enum State {
+ NotAuthenticated,
+ Authenticated(User),
+ Selected(User, Mailbox),
+ Logout
+}
+pub enum Error {
+ ForbiddenTransition,
+}
+
+// See RFC3501 section 3.
+// https://datatracker.ietf.org/doc/html/rfc3501#page-13
+impl State {
+ pub fn authenticate(&mut self, user: User) -> Result<(), Error> {
+ self = match state {
+ State::NotAuthenticated => State::Authenticated(user),
+ _ => return Err(ForbiddenTransition),
+ };
+ Ok(())
+ }
+
+ pub fn logout(&mut self) -> Self {
+ self = State::Logout;
+ Ok(())
+ }
+
+ pub fn select(&mut self, mailbox: Mailbox) -> Result<(), Error> {
+ self = match state {
+ State::Authenticated(user) => State::Selected(user, mailbox),
+ _ => return Err(ForbiddenTransition),
+ };
+ Ok(())
+ }
+
+ pub fn unselect(state: State) -> Result<(), Error> {
+ self = match state {
+ State::Selected(user, _) => State::Authenticated(user),
+ _ => return Err(ForbiddenTransition),
+ };
+ Ok(())
+ }
+}
+
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
new file mode 100644
index 0000000..45480ce
--- /dev/null
+++ b/src/imap/mod.rs
@@ -0,0 +1,100 @@
+mod session;
+mod flow;
+mod command;
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use anyhow::Result;
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use boitalettres::server::accept::addr::AddrStream;
+use boitalettres::server::Server as ImapServer;
+use futures::future::BoxFuture;
+use futures::future::FutureExt;
+use tower::Service;
+
+use crate::LoginProvider;
+
+/// Server is a thin wrapper to register our Services in BàL
+pub struct Server(ImapServer<AddrIncoming, service::Instance>);
+pub async fn new(
+ config: ImapConfig,
+ login: Arc<dyn LoginProvider + Send + Sync>,
+) -> Result<Server> {
+
+ //@FIXME add a configuration parameter
+ let incoming = AddrIncoming::new(config.bind_addr).await?;
+ let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone()));
+
+ tracing::info!("IMAP activated, will listen on {:#}", self.imap.incoming.local_addr);
+ Server(imap)
+}
+impl Server {
+ pub async fn run(&self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
+ tracing::info!("IMAP started!");
+ tokio::select! {
+ s = self => s?,
+ _ = must_exit.changed() => tracing::info!("Stopped IMAP server"),
+ }
+
+ Ok(())
+ }
+}
+
+//---
+
+/// Instance is the main Tokio Tower service that we register in BàL.
+/// It receives new connection demands and spawn a dedicated service.
+struct Instance {
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+}
+impl Instance {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ Self { login_provider }
+ }
+}
+impl<'a> Service<&'a AddrStream> for Instance {
+ type Response = Connection;
+ type Error = anyhow::Error;
+ type Future = BoxFuture<'static, Result<Self::Response>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
+ tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
+ let lp = self.login_provider.clone();
+ async { Ok(Connection::new(lp)) }.boxed()
+ }
+}
+
+//---
+
+/// Connection is the per-connection Tokio Tower service we register in BàL.
+/// It handles a single TCP connection, and thus has a business logic.
+struct Connection {
+ session: session::Manager,
+}
+impl Connection {
+ pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
+ Self {
+ session: session::Manager::new(login_provider),
+ }
+ }
+}
+impl Service<Request> for Connection {
+ type Response = Response;
+ type Error = BalError;
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request) -> Self::Future {
+ tracing::debug!("Got request: {:#?}", req);
+ self.session.process(req)
+ }
+}
diff --git a/src/imap/session.rs b/src/imap/session.rs
new file mode 100644
index 0000000..2035634
--- /dev/null
+++ b/src/imap/session.rs
@@ -0,0 +1,201 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use boitalettres::errors::Error as BalError;
+use boitalettres::proto::{Request, Response};
+use futures::future::BoxFuture;
+use futures::future::FutureExt;
+use imap_codec::types::command::CommandBody;
+use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
+use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::{mpsc, oneshot};
+
+use crate::command::{anonymous,authenticated,selected};
+use crate::login::Credentials;
+use crate::mailbox::Mailbox;
+use crate::LoginProvider;
+
+/* This constant configures backpressure in the system,
+ * or more specifically, how many pipelined messages are allowed
+ * before refusing them
+ */
+const MAX_PIPELINED_COMMANDS: usize = 10;
+
+struct Message {
+ req: Request,
+ tx: oneshot::Sender<Result<Response, BalError>>,
+}
+
+//-----
+
+pub struct Manager {
+ tx: mpsc::Sender<Message>,
+}
+
+impl Manager {
+ pub fn new(login_provider: ArcLoginProvider) -> Self {
+ let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
+ tokio::spawn(async move {
+ let mut instance = Instance::new(login_provider, rx);
+ instance.start().await;
+ });
+ Self { tx }
+ }
+
+ pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
+ let (tx, rx) = oneshot::channel();
+ let tag = req.tag.clone();
+ let msg = Message { req, tx };
+
+ // We use try_send on a bounded channel to protect the daemons from DoS.
+ // Pipelining requests in IMAP are a special case: they should not occure often
+ // and in a limited number (like 3 requests). Someone filling the channel
+ // will probably be malicious so we "rate limit" them.
+ match self.tx.try_send(msg) {
+ Ok(()) => (),
+ Err(TrySendError::Full(_)) => {
+ return async {
+ Status::bad(Some(tag), None, "Too fast! Send less pipelined requests!")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
+ }
+ .boxed()
+ }
+ Err(TrySendError::Closed(_)) => {
+ return async {
+ Status::bad(Some(tag), None, "The session task has exited")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
+ }
+ .boxed()
+ }
+ };
+
+ // @FIXME add a timeout, handle a session that fails.
+ async {
+ match rx.await {
+ Ok(r) => r,
+ Err(e) => {
+ tracing::warn!("Got error {:#?}", e);
+ Status::bad(Some(tag), None, "No response from the session handler")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string()))
+ }
+ }
+ }
+ .boxed()
+ }
+}
+
+//-----
+
+pub struct Context<'a> {
+ req: &'a Request,
+ state: &'a mut flow::State,
+ login: ArcLoginProvider,
+}
+
+pub struct Instance {
+ rx: mpsc::Receiver<Message>,
+
+ pub login_provider: ArcLoginProvider,
+ pub state: flow::State,
+}
+impl Instance {
+ fn new(
+ login_provider: ArcLoginProvider,
+ rx: mpsc::Receiver<Message>,
+ ) -> Self {
+ Self {
+ login_provider,
+ rx,
+ state: flow::State::NotAuthenticated,
+ }
+ }
+
+ //@FIXME add a function that compute the runner's name from its local info
+ // to ease debug
+ // fn name(&self) -> String { }
+
+ async fn start(&mut self) {
+ //@FIXME add more info about the runner
+ tracing::debug!("starting runner");
+
+ while let Some(msg) = self.rx.recv().await {
+ let ctx = Context { req: &msg.req, state: &mut self.state, login: self.login_provider };
+
+ // Command behavior is modulated by the state.
+ // To prevent state error, we handle the same command in separate code path depending
+ // on the State.
+ let cmd_res = match self.state {
+ flow::State::NotAuthenticated => anonymous::dispatch(ctx).await,
+ flow::State::Authenticated(user) => authenticated::dispatch(ctx).await,
+ flow::State::Selected(user, mailbox) => selected::dispatch(ctx).await,
+ flow::State::Logout => Status::bad(Some(ctx.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ };
+
+/*
+
+ match req.body {
+ CommandBody::Capability => anonymous::capability().await,
+ CommandBody::Login { username, password } => anonymous::login(self.login_provider, username, password).await.and_then(|(user, response)| {
+ self.state.authenticate(user)?;
+ Ok(response)
+ },
+ _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+
+ },
+ flow::State::Authenticated(user) => match req.body {
+ CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
+ CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await,
+ CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await,
+ CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| {
+ self.state.select(mailbox);
+ Ok(response)
+ }),
+ _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ },
+ flow::State::Selected(user, mailbox) => match req.body {
+ CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
+ CommandBody::Fetch { sequence_set, attributes, uid, } => selected::fetch(sequence_set, attributes, uid).await,
+ _ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the SELECTED state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ },
+ flow::State::Logout => Status::bad(Some(msg.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(Error::msg),
+ }
+ */
+
+ let imap_res = match cmd_res {
+ Ok(new_state, imap_res) => {
+ self.state = new_state;
+ Ok(imap_res)
+ },
+ Err(e) if Ok(be) = e.downcast::<BalError>() => Err(be),
+ Err(e) => {
+ tracing::warn!(error=%e, "internal.error");
+ Ok(Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
+ .map(|s| vec![ImapRes::Status(s)])
+ .map_err(|e| BalError::Text(e.to_string())))
+ }
+ };
+
+ //@FIXME I think we should quit this thread on error and having our manager watch it,
+ // and then abort the session as it is corrupted.
+ msg.tx.send(imap_res).unwrap_or_else(|e| {
+ tracing::warn!("failed to send imap response to manager: {:#?}", e)
+ });
+ }
+
+ //@FIXME add more info about the runner
+ tracing::debug!("exiting runner");
+ }
+}