aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-01-02 20:23:33 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-01-02 20:23:33 +0100
commit0d667a30301bec47c03314ff0e449a220ad3b913 (patch)
treeb3f5ccbbe5e5a615fc278900328bbcaccc9d8c6f /src
parent9a8d4c651e5993f09f54cf7c1eacf7a4839ea9db (diff)
downloadaerogramme-0d667a30301bec47c03314ff0e449a220ad3b913.tar.gz
aerogramme-0d667a30301bec47c03314ff0e449a220ad3b913.zip
compile with imap-flow
Diffstat (limited to 'src')
-rw-r--r--src/imap/command/anonymous.rs22
-rw-r--r--src/imap/command/anystate.rs6
-rw-r--r--src/imap/command/authenticated.rs40
-rw-r--r--src/imap/command/examined.rs16
-rw-r--r--src/imap/command/selected.rs24
-rw-r--r--src/imap/flow.rs24
-rw-r--r--src/imap/mailbox_view.rs28
-rw-r--r--src/imap/mod.rs225
-rw-r--r--src/imap/response.rs6
-rw-r--r--src/imap/session.rs226
-rw-r--r--src/server.rs4
11 files changed, 312 insertions, 309 deletions
diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs
index 4de5fbd..fbd10e9 100644
--- a/src/imap/command/anonymous.rs
+++ b/src/imap/command/anonymous.rs
@@ -1,7 +1,6 @@
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody};
-use imap_codec::imap_types::core::{AString, NonEmptyVec};
-use imap_codec::imap_types::response::{Capability, Data};
+use imap_codec::imap_types::core::AString;
use imap_codec::imap_types::secret::Secret;
use crate::imap::command::anystate;
@@ -13,16 +12,16 @@ use crate::mail::user::User;
//--- dispatching
pub struct AnonymousContext<'a> {
- pub req: &'a Command<'a>,
+ pub req: &'a Command<'static>,
pub login_provider: &'a ArcLoginProvider,
}
-pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub async fn dispatch(ctx: AnonymousContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
- CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
+ CommandBody::Logout => anystate::logout(),
// Specific to anonymous context (3 commands)
CommandBody::Login { username, password } => ctx.login(username, password).await,
@@ -39,22 +38,11 @@ pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, fl
//--- Command controllers, private
impl<'a> AnonymousContext<'a> {
- async fn capability(self) -> Result<(Response<'a>, flow::Transition)> {
- let capabilities: NonEmptyVec<Capability> =
- (vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?;
- let res = Response::build()
- .to_req(self.req)
- .message("Server capabilities")
- .data(Data::Capability(capabilities))
- .ok()?;
- Ok((res, flow::Transition::None))
- }
-
async fn login(
self,
username: &AString<'a>,
password: &Secret<AString<'a>>,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let (u, p) = (
std::str::from_utf8(username.as_ref())?,
std::str::from_utf8(password.declassify().as_ref())?,
diff --git a/src/imap/command/anystate.rs b/src/imap/command/anystate.rs
index ea3bc16..42fe645 100644
--- a/src/imap/command/anystate.rs
+++ b/src/imap/command/anystate.rs
@@ -5,7 +5,7 @@ use imap_codec::imap_types::response::{Capability, Data};
use crate::imap::flow;
use crate::imap::response::Response;
-pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub(crate) fn capability(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
let capabilities: NonEmptyVec<Capability> =
(vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?;
let res = Response::build()
@@ -17,7 +17,7 @@ pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transi
Ok((res, flow::Transition::None))
}
-pub(crate) fn noop_nothing<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub(crate) fn noop_nothing(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build().tag(tag).message("Noop completed.").ok()?,
flow::Transition::None,
@@ -41,7 +41,7 @@ pub(crate) fn not_implemented<'a>(
))
}
-pub(crate) fn wrong_state<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub(crate) fn wrong_state(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.tag(tag)
diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs
index c9f9ff7..74ebbfa 100644
--- a/src/imap/command/authenticated.rs
+++ b/src/imap/command/authenticated.rs
@@ -21,18 +21,18 @@ use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW}
use crate::mail::IMF;
pub struct AuthenticatedContext<'a> {
- pub req: &'a Command<'a>,
+ pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
}
pub async fn dispatch<'a>(
ctx: AuthenticatedContext<'a>,
-) -> Result<(Response<'a>, flow::Transition)> {
+) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any state
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
- CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
+ CommandBody::Logout => anystate::logout(),
// Specific to this state (11 commands)
CommandBody::Create { mailbox } => ctx.create(mailbox).await,
@@ -68,7 +68,10 @@ pub async fn dispatch<'a>(
// --- PRIVATE ---
impl<'a> AuthenticatedContext<'a> {
- async fn create(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
+ async fn create(
+ self,
+ mailbox: &MailboxCodec<'a>,
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name = match mailbox {
MailboxCodec::Inbox => {
return Ok((
@@ -100,7 +103,10 @@ impl<'a> AuthenticatedContext<'a> {
}
}
- async fn delete(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
+ async fn delete(
+ self,
+ mailbox: &MailboxCodec<'a>,
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
match self.user.delete_mailbox(&name).await {
@@ -125,7 +131,7 @@ impl<'a> AuthenticatedContext<'a> {
self,
from: &MailboxCodec<'a>,
to: &MailboxCodec<'a>,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(from).try_into()?;
let new_name: &str = MailboxName(to).try_into()?;
@@ -152,7 +158,7 @@ impl<'a> AuthenticatedContext<'a> {
reference: &MailboxCodec<'a>,
mailbox_wildcard: &ListMailbox<'a>,
is_lsub: bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let mbx_hier_delim: QuotedChar = QuotedChar::unvalidated(MBX_HIER_DELIM_RAW);
let reference: &str = MailboxName(reference).try_into()?;
@@ -259,9 +265,9 @@ impl<'a> AuthenticatedContext<'a> {
async fn status(
self,
- mailbox: &MailboxCodec<'a>,
+ mailbox: &MailboxCodec<'static>,
attributes: &[StatusDataItemName],
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(name).await?;
let mb = match mb_opt {
@@ -316,7 +322,7 @@ impl<'a> AuthenticatedContext<'a> {
async fn subscribe(
self,
mailbox: &MailboxCodec<'a>,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
@@ -341,7 +347,7 @@ impl<'a> AuthenticatedContext<'a> {
async fn unsubscribe(
self,
mailbox: &MailboxCodec<'a>,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
@@ -399,7 +405,10 @@ impl<'a> AuthenticatedContext<'a> {
* TRACE END ---
*/
- async fn select(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
+ async fn select(
+ self,
+ mailbox: &MailboxCodec<'a>,
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -430,7 +439,10 @@ impl<'a> AuthenticatedContext<'a> {
))
}
- async fn examine(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
+ async fn examine(
+ self,
+ mailbox: &MailboxCodec<'a>,
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -468,7 +480,7 @@ impl<'a> AuthenticatedContext<'a> {
flags: &[Flag<'a>],
date: &Option<DateTime>,
message: &Literal<'a>,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await {
Ok((_mb, uidvalidity, uid)) => Ok((
diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs
index 7f9c39c..eec85cd 100644
--- a/src/imap/command/examined.rs
+++ b/src/imap/command/examined.rs
@@ -14,17 +14,17 @@ use crate::imap::response::Response;
use crate::mail::user::User;
pub struct ExaminedContext<'a> {
- pub req: &'a Command<'a>,
+ pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
-pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
- CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
+ CommandBody::Logout => anystate::logout(),
// Specific to the EXAMINE state (specialization of the SELECTED state)
// ~3 commands -> close, fetch, search + NOOP
@@ -58,7 +58,7 @@ pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flo
impl<'a> ExaminedContext<'a> {
/// CLOSE in examined state is not the same as in selected state
/// (in selected state it also does an EXPUNGE, here it doesn't)
- async fn close(self) -> Result<(Response<'a>, flow::Transition)> {
+ async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@@ -71,9 +71,9 @@ impl<'a> ExaminedContext<'a> {
pub async fn fetch(
self,
sequence_set: &SequenceSet,
- attributes: &'a MacroOrMessageDataItemNames<'a>,
+ attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::build()
@@ -98,7 +98,7 @@ impl<'a> ExaminedContext<'a> {
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@@ -108,7 +108,7 @@ impl<'a> ExaminedContext<'a> {
))
}
- pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> {
+ pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
index cd5d221..d5dcd61 100644
--- a/src/imap/command/selected.rs
+++ b/src/imap/command/selected.rs
@@ -18,17 +18,19 @@ use crate::imap::response::Response;
use crate::mail::user::User;
pub struct SelectedContext<'a> {
- pub req: &'a Command<'a>,
+ pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
-pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
+pub async fn dispatch<'a>(
+ ctx: SelectedContext<'a>,
+) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
- CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
+ CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP)
CommandBody::Close => ctx.close().await,
@@ -65,7 +67,7 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flo
// --- PRIVATE ---
impl<'a> SelectedContext<'a> {
- async fn close(self) -> Result<(Response<'a>, flow::Transition)> {
+ async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
// We expunge messages,
// but we don't send the untagged EXPUNGE responses
let tag = self.req.tag.clone();
@@ -79,9 +81,9 @@ impl<'a> SelectedContext<'a> {
pub async fn fetch(
self,
sequence_set: &SequenceSet,
- attributes: &'a MacroOrMessageDataItemNames<'a>,
+ attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::build()
@@ -106,7 +108,7 @@ impl<'a> SelectedContext<'a> {
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@@ -116,7 +118,7 @@ impl<'a> SelectedContext<'a> {
))
}
- pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> {
+ pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
@@ -130,7 +132,7 @@ impl<'a> SelectedContext<'a> {
))
}
- async fn expunge(self) -> Result<(Response<'a>, flow::Transition)> {
+ async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?;
@@ -151,7 +153,7 @@ impl<'a> SelectedContext<'a> {
response: &StoreResponse,
flags: &[Flag<'a>],
uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let data = self
.mailbox
.store(sequence_set, kind, response, flags, uid)
@@ -172,7 +174,7 @@ impl<'a> SelectedContext<'a> {
sequence_set: &SequenceSet,
mailbox: &MailboxCodec<'a>,
uid: &bool,
- ) -> Result<(Response<'a>, flow::Transition)> {
+ ) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
diff --git a/src/imap/flow.rs b/src/imap/flow.rs
index eb94bb5..95810c1 100644
--- a/src/imap/flow.rs
+++ b/src/imap/flow.rs
@@ -37,23 +37,27 @@ pub enum Transition {
// See RFC3501 section 3.
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State {
- pub fn apply(self, tr: Transition) -> Result<Self, Error> {
- match (self, tr) {
- (s, Transition::None) => Ok(s),
- (State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)),
+ pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
+ let new_state = match (&self, tr) {
+ (_s, Transition::None) => return Ok(()),
+ (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Select(m),
- ) => Ok(State::Selected(u, m)),
+ ) => State::Selected(u.clone(), m),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Examine(m),
- ) => Ok(State::Examined(u, m)),
+ ) => State::Examined(u.clone(), m),
(State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
- Ok(State::Authenticated(u))
+ State::Authenticated(u.clone())
}
- (_, Transition::Logout) => Ok(State::Logout),
- _ => Err(Error::ForbiddenTransition),
- }
+ (_, Transition::Logout) => State::Logout,
+ _ => return Err(Error::ForbiddenTransition),
+ };
+
+ *self = new_state;
+
+ Ok(())
}
}
diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs
index 2e5444b..fd58de7 100644
--- a/src/imap/mailbox_view.rs
+++ b/src/imap/mailbox_view.rs
@@ -75,14 +75,26 @@ impl<'a> FetchedMail<'a> {
}
}
-pub struct AttributesProxy<'a> {
- attrs: Vec<MessageDataItemName<'a>>,
+pub struct AttributesProxy {
+ attrs: Vec<MessageDataItemName<'static>>,
}
-impl<'a> AttributesProxy<'a> {
- fn new(attrs: &'a MacroOrMessageDataItemNames<'a>, is_uid_fetch: bool) -> Self {
+impl AttributesProxy {
+ fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self {
// Expand macros
let mut fetch_attrs = match attrs {
- MacroOrMessageDataItemNames::Macro(m) => m.expand(),
+ MacroOrMessageDataItemNames::Macro(m) => {
+ use imap_codec::imap_types::fetch::Macro;
+ use MessageDataItemName::*;
+ match m {
+ Macro::All => vec![Flags, InternalDate, Rfc822Size, Envelope],
+ Macro::Fast => vec![Flags, InternalDate, Rfc822Size],
+ Macro::Full => vec![Flags, InternalDate, Rfc822Size, Envelope, Body],
+ _ => {
+ tracing::error!("unimplemented macro");
+ vec![]
+ }
+ }
+ }
MacroOrMessageDataItemNames::MessageDataItemNames(a) => a.clone(),
};
@@ -248,7 +260,7 @@ impl<'a> MailView<'a> {
Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt)))
}
- fn filter<'b>(&self, ap: &AttributesProxy<'b>) -> Result<(Body<'b>, SeenFlag)> {
+ fn filter<'b>(&self, ap: &AttributesProxy) -> Result<(Body<'static>, SeenFlag)> {
let mut seen = SeenFlag::DoNothing;
let res_attrs = ap
.attrs
@@ -593,9 +605,9 @@ impl MailboxView {
pub async fn fetch<'b>(
&self,
sequence_set: &SequenceSet,
- attributes: &'b MacroOrMessageDataItemNames<'b>,
+ attributes: &'b MacroOrMessageDataItemNames<'static>,
is_uid_fetch: &bool,
- ) -> Result<Vec<Body<'b>>> {
+ ) -> Result<Vec<Body<'static>>> {
let ap = AttributesProxy::new(attributes, *is_uid_fetch);
// Prepare data
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 589231b..31eeaa8 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -4,104 +4,183 @@ mod mailbox_view;
mod response;
mod session;
-use std::task::{Context, Poll};
+use std::net::SocketAddr;
use anyhow::Result;
-//use boitalettres::errors::Error as BalError;
-//use boitalettres::proto::{Request, Response};
-//use boitalettres::server::accept::addr::AddrIncoming;
-//use boitalettres::server::accept::addr::AddrStream;
-//use boitalettres::server::Server as ImapServer;
-use futures::future::BoxFuture;
-use futures::future::FutureExt;
+use futures::stream::{FuturesUnordered, StreamExt};
+
+use tokio::net::TcpListener;
use tokio::sync::watch;
+use imap_codec::imap_types::response::Greeting;
+use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
+use imap_flow::stream::AnyStream;
+
use crate::config::ImapConfig;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
-pub struct Server {}
-
-pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> {
- unimplemented!();
- /* let incoming = AddrIncoming::new(config.bind_addr).await?;
- tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr);
-
- let imap = ImapServer::new(incoming).serve(Instance::new(login.clone()));
- Ok(Server(imap))*/
-}
-
-impl Server {
- pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
- tracing::info!("IMAP started!");
- unimplemented!();
- /*tokio::select! {
- s = self.0 => s?,
- _ = must_exit.changed() => tracing::info!("Stopped IMAP server"),
- }
-
- Ok(())*/
- }
+pub struct Server {
+ bind_addr: SocketAddr,
+ login_provider: ArcLoginProvider,
}
-//---
-/*
-/// Instance is the main Tokio Tower service that we register in BàL.
-/// It receives new connection demands and spawn a dedicated service.
-struct Instance {
+struct ClientContext {
+ stream: AnyStream,
+ addr: SocketAddr,
login_provider: ArcLoginProvider,
+ must_exit: watch::Receiver<bool>,
}
-impl Instance {
- pub fn new(login_provider: ArcLoginProvider) -> Self {
- Self { login_provider }
+pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server {
+ Server {
+ bind_addr: config.bind_addr,
+ login_provider: login,
}
}
-impl<'a> Service<&'a AddrStream> for Instance {
- type Response = Connection;
- type Error = anyhow::Error;
- type Future = BoxFuture<'static, Result<Self::Response>>;
+impl Server {
+ pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
+ let tcp = TcpListener::bind(self.bind_addr).await?;
+ tracing::info!("IMAP server listening on {:#}", self.bind_addr);
+
+ let mut connections = FuturesUnordered::new();
+
+ while !*must_exit.borrow() {
+ let wait_conn_finished = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ let (socket, remote_addr) = tokio::select! {
+ a = tcp.accept() => a?,
+ _ = wait_conn_finished => continue,
+ _ = must_exit.changed() => continue,
+ };
+ tracing::info!("IMAP: accepted connection from {}", remote_addr);
+
+ let client = ClientContext {
+ stream: AnyStream::new(socket),
+ addr: remote_addr.clone(),
+ login_provider: self.login_provider.clone(),
+ must_exit: must_exit.clone(),
+ };
+ let conn = tokio::spawn(client_wrapper(client));
+ connections.push(conn);
+ }
+ drop(tcp);
- fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
+ tracing::info!("IMAP server shutting down, draining remaining connections...");
+ while connections.next().await.is_some() {}
- fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
- tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
- let lp = self.login_provider.clone();
- async { Ok(Connection::new(lp)) }.boxed()
+ Ok(())
}
}
-//---
-
-/// Connection is the per-connection Tokio Tower service we register in BàL.
-/// It handles a single TCP connection, and thus has a business logic.
-struct Connection {
- session: session::Manager,
-}
-
-impl Connection {
- pub fn new(login_provider: ArcLoginProvider) -> Self {
- Self {
- session: session::Manager::new(login_provider),
+async fn client_wrapper(ctx: ClientContext) {
+ let addr = ctx.addr.clone();
+ match client(ctx).await {
+ Ok(()) => {
+ tracing::info!("closing successful session for {:?}", addr);
+ }
+ Err(e) => {
+ tracing::error!("closing errored session for {:?}: {}", addr, e);
}
}
}
-impl Service<Request> for Connection {
- type Response = Response;
- type Error = BalError;
- type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
-
- fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
+async fn client(mut ctx: ClientContext) -> Result<()> {
+ // Send greeting
+ let (mut server, _) = ServerFlow::send_greeting(
+ ctx.stream,
+ ServerFlowOptions::default(),
+ Greeting::ok(None, "Aerogramme").unwrap(),
+ )
+ .await?;
+
+ use crate::imap::response::{Body, Response as MyResponse};
+ use crate::imap::session::Instance;
+ use imap_codec::imap_types::command::Command;
+ use imap_codec::imap_types::response::{Response, Status};
+
+ use tokio::sync::mpsc;
+ let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
+ let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
+
+ let bckgrnd = tokio::spawn(async move {
+ let mut session = Instance::new(ctx.login_provider);
+ loop {
+ let cmd = match cmd_rx.recv().await {
+ None => break,
+ Some(cmd_recv) => cmd_recv,
+ };
+
+ let maybe_response = session.command(cmd).await;
+
+ match resp_tx.send(maybe_response) {
+ Err(_) => break,
+ Ok(_) => (),
+ };
+ }
+ tracing::info!("runner is quitting");
+ });
+
+ // Main loop
+ loop {
+ tokio::select! {
+ // Managing imap_flow stuff
+ srv_evt = server.progress() => match srv_evt? {
+ ServerFlowEvent::ResponseSent { handle: _handle, response } => {
+ match response {
+ Response::Status(Status::Bye(_)) => break,
+ _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
+ }
+ },
+ ServerFlowEvent::CommandReceived { command } => {
+ match cmd_tx.try_send(command) {
+ Ok(_) => (),
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
+ }
+ _ => {
+ server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", ctx.addr);
+ }
+ }
+ },
+ },
+
+ // Managing response generated by Aerogramme
+ maybe_msg = resp_rx.recv() => {
+ let response = match maybe_msg {
+ None => {
+ server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", ctx.addr);
+ continue
+ },
+ Some(r) => r,
+ };
+
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => server.enqueue_data(d),
+ Body::Status(s) => server.enqueue_status(s),
+ };
+ }
+ server.enqueue_status(response.completion);
+ },
+
+ // When receiving a CTRL+C
+ _ = ctx.must_exit.changed() => {
+ server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ },
+ };
}
- fn call(&mut self, req: Request) -> Self::Future {
- tracing::debug!("Got request: {:#?}", req.command);
- self.session.process(req)
- }
+ drop(cmd_tx);
+ bckgrnd.await?;
+ Ok(())
}
-*/
diff --git a/src/imap/response.rs b/src/imap/response.rs
index 012c8ed..d20e58e 100644
--- a/src/imap/response.rs
+++ b/src/imap/response.rs
@@ -47,11 +47,13 @@ impl<'a> ResponseBuilder<'a> {
self
}
+ #[allow(dead_code)]
pub fn info(mut self, status: Status<'a>) -> Self {
self.body.push(Body::Status(status));
self
}
+ #[allow(dead_code)]
pub fn many_info(mut self, status: Vec<Status<'a>>) -> Self {
for d in status.into_iter() {
self = self.info(d);
@@ -87,8 +89,8 @@ impl<'a> ResponseBuilder<'a> {
}
pub struct Response<'a> {
- body: Vec<Body<'a>>,
- completion: Status<'a>,
+ pub body: Vec<Body<'a>>,
+ pub completion: Status<'a>,
}
impl<'a> Response<'a> {
diff --git a/src/imap/session.rs b/src/imap/session.rs
index e2af18b..5c67f8e 100644
--- a/src/imap/session.rs
+++ b/src/imap/session.rs
@@ -1,182 +1,86 @@
-use anyhow::Error;
-//use boitalettres::errors::Error as BalError;
-//use boitalettres::proto::{Request, Response};
-use futures::future::BoxFuture;
-use futures::future::FutureExt;
-
-use tokio::sync::mpsc::error::TrySendError;
-use tokio::sync::{mpsc, oneshot};
-
use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
+use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
+use imap_codec::imap_types::command::Command;
-/*
-/* This constant configures backpressure in the system,
- * or more specifically, how many pipelined messages are allowed
- * before refusing them
- */
-const MAX_PIPELINED_COMMANDS: usize = 10;
-
-struct Message {
- req: Request,
- tx: oneshot::Sender<Result<Response, BalError>>,
-}
-
-//-----
-
-pub struct Manager {
- tx: mpsc::Sender<Message>,
-}
-
-impl Manager {
- pub fn new(login_provider: ArcLoginProvider) -> Self {
- let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
- tokio::spawn(async move {
- let instance = Instance::new(login_provider, rx);
- instance.start().await;
- });
- Self { tx }
- }
-
- pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
- let (tx, rx) = oneshot::channel();
- let msg = Message { req, tx };
-
- // We use try_send on a bounded channel to protect the daemons from DoS.
- // Pipelining requests in IMAP are a special case: they should not occure often
- // and in a limited number (like 3 requests). Someone filling the channel
- // will probably be malicious so we "rate limit" them.
- match self.tx.try_send(msg) {
- Ok(()) => (),
- Err(TrySendError::Full(_)) => {
- return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed()
- }
- Err(TrySendError::Closed(_)) => {
- return async { Err(BalError::Text("Terminated session".to_string())) }.boxed()
- }
- };
-
- // @FIXME add a timeout, handle a session that fails.
- async {
- match rx.await {
- Ok(r) => r,
- Err(e) => {
- tracing::warn!("Got error {:#?}", e);
- Response::bad("No response from the session handler")
- }
- }
- }
- .boxed()
- }
-}
-*/
//-----
-/*
pub struct Instance {
- rx: mpsc::Receiver<Message>,
-
pub login_provider: ArcLoginProvider,
pub state: flow::State,
}
impl Instance {
- fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver<Message>) -> Self {
+ pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
login_provider,
- rx,
state: flow::State::NotAuthenticated,
}
}
- //@FIXME add a function that compute the runner's name from its local info
- // to ease debug
- // fn name(&self) -> String { }
-
- async fn start(mut self) {
- //@FIXME add more info about the runner
- tracing::debug!("starting runner");
-
- while let Some(msg) = self.rx.recv().await {
- // Command behavior is modulated by the state.
- // To prevent state error, we handle the same command in separate code paths.
- let ctrl = match &mut self.state {
- flow::State::NotAuthenticated => {
- let ctx = anonymous::AnonymousContext {
- req: &msg.req,
- login_provider: Some(&self.login_provider),
- };
- anonymous::dispatch(ctx).await
- }
- flow::State::Authenticated(ref user) => {
- let ctx = authenticated::AuthenticatedContext {
- req: &msg.req,
- user,
- };
- authenticated::dispatch(ctx).await
- }
- flow::State::Selected(ref user, ref mut mailbox) => {
- let ctx = selected::SelectedContext {
- req: &msg.req,
- user,
- mailbox,
- };
- selected::dispatch(ctx).await
- }
- flow::State::Examined(ref user, ref mut mailbox) => {
- let ctx = examined::ExaminedContext {
- req: &msg.req,
- user,
- mailbox,
- };
- examined::dispatch(ctx).await
- }
- flow::State::Logout => {
- Response::bad("No commands are allowed in the LOGOUT state.")
- .map(|r| (r, flow::Transition::None))
- .map_err(Error::msg)
- }
- };
-
- // Process result
- let res = match ctrl {
- Ok((res, tr)) => {
- //@FIXME remove unwrap
- self.state = match self.state.apply(tr) {
- Ok(new_state) => new_state,
- Err(e) => {
- tracing::error!("Invalid transition: {}, exiting", e);
- break;
- }
- };
-
- //@FIXME enrich here the command with some global status
-
- Ok(res)
- }
- // Cast from anyhow::Error to Bal::Error
- // @FIXME proper error handling would be great
- Err(e) => match e.downcast::<BalError>() {
- Ok(be) => Err(be),
- Err(e) => {
- tracing::warn!(error=%e, "internal.error");
- Response::bad("Internal error")
- }
- },
- };
-
- //@FIXME I think we should quit this thread on error and having our manager watch it,
- // and then abort the session as it is corrupted.
- msg.tx.send(res).unwrap_or_else(|e| {
- tracing::warn!("failed to send imap response to manager: {:#?}", e)
- });
-
- if let flow::State::Logout = &self.state {
- break;
+ pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
+ // Command behavior is modulated by the state.
+ // To prevent state error, we handle the same command in separate code paths.
+ let (resp, tr) = match &mut self.state {
+ flow::State::NotAuthenticated => {
+ let ctx = anonymous::AnonymousContext {
+ req: &cmd,
+ login_provider: &self.login_provider,
+ };
+ anonymous::dispatch(ctx).await
+ }
+ flow::State::Authenticated(ref user) => {
+ let ctx = authenticated::AuthenticatedContext { req: &cmd, user };
+ authenticated::dispatch(ctx).await
+ }
+ flow::State::Selected(ref user, ref mut mailbox) => {
+ let ctx = selected::SelectedContext {
+ req: &cmd,
+ user,
+ mailbox,
+ };
+ selected::dispatch(ctx).await
}
+ flow::State::Examined(ref user, ref mut mailbox) => {
+ let ctx = examined::ExaminedContext {
+ req: &cmd,
+ user,
+ mailbox,
+ };
+ examined::dispatch(ctx).await
+ }
+ flow::State::Logout => Response::build()
+ .tag(cmd.tag.clone())
+ .message("No commands are allowed in the LOGOUT state.")
+ .bad()
+ .map(|r| (r, flow::Transition::None)),
+ }
+ .unwrap_or_else(|err| {
+ tracing::error!("Command error {:?} occured while processing {:?}", err, cmd);
+ (
+ Response::build()
+ .to_req(&cmd)
+ .message("Internal error while processing command")
+ .bad()
+ .unwrap(),
+ flow::Transition::None,
+ )
+ });
+
+ if let Err(e) = self.state.apply(tr) {
+ tracing::error!(
+ "Transition error {:?} occured while processing on command {:?}",
+ e,
+ cmd
+ );
+ return Response::build()
+ .to_req(&cmd)
+ .message(
+ "Internal error, processing command triggered an illegal IMAP state transition",
+ )
+ .bad()
+ .unwrap();
}
- //@FIXME add more info about the runner
- tracing::debug!("exiting runner");
+ resp
}
}
-*/
diff --git a/src/server.rs b/src/server.rs
index 8bfde98..bd2fd5d 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -25,7 +25,7 @@ impl Server {
let login = Arc::new(StaticLoginProvider::new(config.users).await?);
let lmtp_server = None;
- let imap_server = Some(imap::new(config.imap, login.clone()).await?);
+ let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,
imap_server,
@@ -42,7 +42,7 @@ impl Server {
};
let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone()));
- let imap_server = Some(imap::new(config.imap, login.clone()).await?);
+ let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,