aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-01-17 08:22:15 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-01-17 08:22:15 +0100
commit3d23f0c936516ed89f2888fb44babb3994e8d579 (patch)
tree79315188155cfe91d5cc5c98b3c6ead17d8f2e15
parent55e26d24a08519ded6a6898453dcd6db287f45c8 (diff)
downloadaerogramme-3d23f0c936516ed89f2888fb44babb3994e8d579.tar.gz
aerogramme-3d23f0c936516ed89f2888fb44babb3994e8d579.zip
WIP refactor idle
-rw-r--r--src/imap/command/authenticated.rs4
-rw-r--r--src/imap/command/selected.rs52
-rw-r--r--src/imap/flow.rs36
-rw-r--r--src/imap/mod.rs205
-rw-r--r--src/imap/request.rs8
-rw-r--r--src/imap/response.rs8
-rw-r--r--src/imap/session.rs46
7 files changed, 247 insertions, 112 deletions
diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs
index 9b6bb24..e17699a 100644
--- a/src/imap/command/authenticated.rs
+++ b/src/imap/command/authenticated.rs
@@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadWrite)
.set_body(data)
.ok()?,
- flow::Transition::Select(mb),
+ flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite),
))
}
@@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadOnly)
.set_body(data)
.ok()?,
- flow::Transition::Examine(mb),
+ flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly),
))
}
diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs
index c13b71a..c9c5337 100644
--- a/src/imap/command/selected.rs
+++ b/src/imap/command/selected.rs
@@ -25,6 +25,7 @@ pub struct SelectedContext<'a> {
pub mailbox: &'a mut MailboxView,
pub server_capabilities: &'a ServerCapability,
pub client_capabilities: &'a mut ClientCapability,
+ pub perm: &'a flow::MailboxPerm,
}
pub async fn dispatch<'a>(
@@ -39,7 +40,10 @@ pub async fn dispatch<'a>(
CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP)
- CommandBody::Close => ctx.close().await,
+ CommandBody::Close => match ctx.perm {
+ flow::MailboxPerm::ReadWrite => ctx.close().await,
+ flow::MailboxPerm::ReadOnly => ctx.examine_close().await,
+ },
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Fetch {
sequence_set,
@@ -75,6 +79,11 @@ pub async fn dispatch<'a>(
// UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.unselect().await,
+ // IDLE extension (rfc2177)
+ CommandBody::Idle => {
+ unimplemented!()
+ }
+
// In selected mode, we fallback to authenticated when needed
_ => {
authenticated::dispatch(authenticated::AuthenticatedContext {
@@ -102,6 +111,18 @@ impl<'a> SelectedContext<'a> {
))
}
+ /// CLOSE in examined state is not the same as in selected state
+ /// (in selected state it also does an EXPUNGE, here it doesn't)
+ async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> {
+ Ok((
+ Response::build()
+ .to_req(self.req)
+ .message("CLOSE completed")
+ .ok()?,
+ flow::Transition::Unselect,
+ ))
+ }
+
async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
@@ -189,6 +210,10 @@ impl<'a> SelectedContext<'a> {
}
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None))
+ }
+
let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?;
@@ -211,6 +236,10 @@ impl<'a> SelectedContext<'a> {
modifiers: &[StoreModifier],
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None))
+ }
+
let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => {
@@ -251,6 +280,11 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ //@FIXME Could copy be valid in EXAMINE mode?
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None))
+ }
+
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -303,6 +337,10 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
+ if let Some(failed) = self.fail_read_only() {
+ return Ok((failed, flow::Transition::None))
+ }
+
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@@ -350,4 +388,16 @@ impl<'a> SelectedContext<'a> {
flow::Transition::None,
))
}
+
+ fn fail_read_only(&self) -> Option<Response<'static>> {
+ match self.perm {
+ flow::MailboxPerm::ReadWrite => None,
+ flow::MailboxPerm::ReadOnly => {
+ Some(Response::build()
+ .to_req(self.req)
+ .message("Write command are forbidden while exmining mailbox")
+ .no().unwrap())
+ },
+ }
+ }
}
diff --git a/src/imap/flow.rs b/src/imap/flow.rs
index 95810c1..ff348ca 100644
--- a/src/imap/flow.rs
+++ b/src/imap/flow.rs
@@ -19,17 +19,23 @@ impl StdError for Error {}
pub enum State {
NotAuthenticated,
Authenticated(Arc<User>),
- Selected(Arc<User>, MailboxView),
- // Examined is like Selected, but indicates that the mailbox is read-only
- Examined(Arc<User>, MailboxView),
+ Selected(Arc<User>, MailboxView, MailboxPerm),
+ Idle(Arc<User>, MailboxView, MailboxPerm),
Logout,
}
+#[derive(Clone)]
+pub enum MailboxPerm {
+ ReadOnly,
+ ReadWrite,
+}
+
pub enum Transition {
None,
Authenticate(Arc<User>),
- Examine(MailboxView),
- Select(MailboxView),
+ Select(MailboxView, MailboxPerm),
+ Idle,
+ UnIdle,
Unselect,
Logout,
}
@@ -38,20 +44,22 @@ pub enum Transition {
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State {
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
- let new_state = match (&self, tr) {
+ let new_state = match (std::mem::replace(self, State::NotAuthenticated), tr) {
(_s, Transition::None) => return Ok(()),
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(
- State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
- Transition::Select(m),
- ) => State::Selected(u.clone(), m),
- (
- State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
- Transition::Examine(m),
- ) => State::Examined(u.clone(), m),
- (State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
+ State::Authenticated(u) | State::Selected(u, _, _),
+ Transition::Select(m, p),
+ ) => State::Selected(u, m, p),
+ (State::Selected(u, _, _) , Transition::Unselect) => {
State::Authenticated(u.clone())
}
+ (State::Selected(u, m, p), Transition::Idle) => {
+ State::Idle(u, m, p)
+ },
+ (State::Idle(u, m, p), Transition::UnIdle) => {
+ State::Selected(u, m, p)
+ },
(_, Transition::Logout) => State::Logout,
_ => return Err(Error::ForbiddenTransition),
};
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 61a265a..baa15f7 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -8,22 +8,28 @@ mod index;
mod mail_view;
mod mailbox_view;
mod mime_view;
+mod request;
mod response;
mod search;
mod session;
use std::net::SocketAddr;
-use anyhow::Result;
+use anyhow::{Result, bail};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::watch;
+use tokio::sync::mpsc;
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
+use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status};
+use crate::imap::response::{Body, ResponseOrIdle};
+use crate::imap::session::Instance;
+use crate::imap::request::Request;
use crate::config::ImapConfig;
use crate::imap::capability::ServerCapability;
use crate::login::ArcLoginProvider;
@@ -35,8 +41,8 @@ pub struct Server {
capabilities: ServerCapability,
}
+#[derive(Clone)]
struct ClientContext {
- stream: AnyStream,
addr: SocketAddr,
login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>,
@@ -74,13 +80,12 @@ impl Server {
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
- stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
server_capabilities: self.capabilities.clone(),
};
- let conn = tokio::spawn(client_wrapper(client));
+ let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
connections.push(conn);
}
drop(tcp);
@@ -92,46 +97,74 @@ impl Server {
}
}
-async fn client_wrapper(ctx: ClientContext) {
- let addr = ctx.addr.clone();
- match client(ctx).await {
- Ok(()) => {
- tracing::debug!("closing successful session for {:?}", addr);
- }
- Err(e) => {
- tracing::error!("closing errored session for {:?}: {}", addr, e);
+use tokio::sync::mpsc::*;
+enum LoopMode {
+ Quit,
+ Interactive,
+ IdleUntil(tokio::sync::Notify),
+}
+
+struct NetLoop {
+ ctx: ClientContext,
+ server: ServerFlow,
+ cmd_tx: Sender<Request>,
+ resp_rx: UnboundedReceiver<ResponseOrIdle>,
+}
+
+impl NetLoop {
+ async fn handler(ctx: ClientContext, sock: AnyStream) {
+ let addr = ctx.addr.clone();
+
+ let nl = match Self::new(ctx, sock).await {
+ Ok(nl) => {
+ tracing::debug!(addr=?addr, "netloop successfully initialized");
+ nl
+ },
+ Err(e) => {
+ tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
+ return
+ }
+ };
+
+ match nl.core().await {
+ Ok(()) => {
+ tracing::debug!("closing successful netloop core for {:?}", addr);
+ }
+ Err(e) => {
+ tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
+ }
}
}
-}
-async fn client(mut ctx: ClientContext) -> Result<()> {
- // Send greeting
- let (mut server, _) = ServerFlow::send_greeting(
- ctx.stream,
- ServerFlowOptions {
- crlf_relaxed: false,
- literal_accept_text: Text::unvalidated("OK"),
- literal_reject_text: Text::unvalidated("Literal rejected"),
- ..ServerFlowOptions::default()
- },
- Greeting::ok(
- Some(Code::Capability(ctx.server_capabilities.to_vec())),
- "Aerogramme",
- )
- .unwrap(),
- )
- .await?;
-
- use crate::imap::response::{Body, Response as MyResponse};
- use crate::imap::session::Instance;
- use imap_codec::imap_types::command::Command;
- use imap_codec::imap_types::response::{Code, Response, Status};
-
- use tokio::sync::mpsc;
- let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
- let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
-
- let bckgrnd = tokio::spawn(async move {
+ async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
+ // Send greeting
+ let (mut server, _) = ServerFlow::send_greeting(
+ sock,
+ ServerFlowOptions {
+ crlf_relaxed: false,
+ literal_accept_text: Text::unvalidated("OK"),
+ literal_reject_text: Text::unvalidated("Literal rejected"),
+ ..ServerFlowOptions::default()
+ },
+ Greeting::ok(
+ Some(Code::Capability(ctx.server_capabilities.to_vec())),
+ "Aerogramme",
+ )
+ .unwrap(),
+ )
+ .await?;
+
+ // Start a mailbox session in background
+ let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
+ let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
+ tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
+
+ // Return the object
+ Ok(NetLoop { ctx, server, cmd_tx, resp_rx })
+ }
+
+ /// Coms with the background session
+ async fn session(ctx: ClientContext, mut cmd_rx: Receiver<Request>, resp_tx: UnboundedSender<ResponseOrIdle>) -> () {
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
loop {
let cmd = match cmd_rx.recv().await {
@@ -140,8 +173,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
- let maybe_response = session.command(cmd).await;
- tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response");
+ let maybe_response = session.request(cmd).await;
+ tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
match resp_tx.send(maybe_response) {
Err(_) => break,
@@ -149,67 +182,81 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
}
tracing::info!("runner is quitting");
- });
+ }
+
+ async fn core(mut self) -> Result<()> {
+ let mut mode = LoopMode::Interactive;
+ loop {
+ mode = match mode {
+ LoopMode::Interactive => self.interactive_mode().await?,
+ LoopMode::IdleUntil(notif) => self.idle_mode(notif).await?,
+ LoopMode::Quit => break,
+ }
+ }
+ Ok(())
+ }
+
- // Main loop
- loop {
+ async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! {
// Managing imap_flow stuff
- srv_evt = server.progress() => match srv_evt? {
+ srv_evt = self.server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
- Response::Status(Status::Bye(_)) => break,
- _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
+ Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
+ _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
- match cmd_tx.try_send(command) {
+ match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
- server.enqueue_status(Status::bye(None, "Too fast").unwrap());
- tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
},
flow => {
- server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
- tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow);
-
+ self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
+ tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
},
// Managing response generated by Aerogramme
- maybe_msg = resp_rx.recv() => {
- let response = match maybe_msg {
- None => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
- continue
- },
- Some(r) => r,
- };
-
- for body_elem in response.body.into_iter() {
- let _handle = match body_elem {
- Body::Data(d) => server.enqueue_data(d),
- Body::Status(s) => server.enqueue_status(s),
- };
- }
- server.enqueue_status(response.completion);
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ Some(ResponseOrIdle::Response(response)) => {
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ },
+ Some(ResponseOrIdle::Idle) => {
+ let cr = CommandContinuationRequest::basic(None, "idling")?;
+ self.server.enqueue_continuation(cr);
+ return Ok(LoopMode::IdleUntil(tokio::sync::Notify::new()))
+ },
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
},
// When receiving a CTRL+C
- _ = ctx.must_exit.changed() => {
- server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ _ = self.ctx.must_exit.changed() => {
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
+ Ok(LoopMode::Interactive)
}
- drop(cmd_tx);
- bckgrnd.await?;
- Ok(())
+ async fn idle_mode(&mut self, notif: tokio::sync::Notify) -> Result<LoopMode> {
+ Ok(LoopMode::IdleUntil(notif))
+ }
}
diff --git a/src/imap/request.rs b/src/imap/request.rs
new file mode 100644
index 0000000..c458276
--- /dev/null
+++ b/src/imap/request.rs
@@ -0,0 +1,8 @@
+use imap_codec::imap_types::command::Command;
+use tokio::sync::Notify;
+
+#[derive(Debug)]
+pub enum Request {
+ ImapCommand(Command<'static>),
+ IdleUntil(Notify),
+}
diff --git a/src/imap/response.rs b/src/imap/response.rs
index d20e58e..a9978e1 100644
--- a/src/imap/response.rs
+++ b/src/imap/response.rs
@@ -3,6 +3,7 @@ use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag;
use imap_codec::imap_types::response::{Code, Data, Status};
+#[derive(Debug)]
pub enum Body<'a> {
Data(Data<'a>),
Status(Status<'a>),
@@ -88,6 +89,7 @@ impl<'a> ResponseBuilder<'a> {
}
}
+#[derive(Debug)]
pub struct Response<'a> {
pub body: Vec<Body<'a>>,
pub completion: Status<'a>,
@@ -110,3 +112,9 @@ impl<'a> Response<'a> {
})
}
}
+
+#[derive(Debug)]
+pub enum ResponseOrIdle {
+ Response(Response<'static>),
+ Idle,
+}
diff --git a/src/imap/session.rs b/src/imap/session.rs
index 6b26478..d86e6ff 100644
--- a/src/imap/session.rs
+++ b/src/imap/session.rs
@@ -1,7 +1,9 @@
+use anyhow::anyhow;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
-use crate::imap::response::Response;
+use crate::imap::request::Request;
+use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider;
use imap_codec::imap_types::command::Command;
@@ -23,7 +25,24 @@ impl Instance {
}
}
- pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
+ pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
+ match req {
+ Request::IdleUntil(stop) => ResponseOrIdle::Response(self.idle(stop).await),
+ Request::ImapCommand(cmd) => self.command(cmd).await,
+ }
+ }
+
+ pub async fn idle(&mut self, stop: tokio::sync::Notify) -> Response<'static> {
+ let (user, mbx) = match &mut self.state {
+ flow::State::Idle(ref user, ref mut mailbox, ref perm) => (user, mailbox),
+ _ => unreachable!(),
+ };
+
+ unimplemented!();
+ }
+
+
+ pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let (resp, tr) = match &mut self.state {
@@ -44,26 +63,18 @@ impl Instance {
};
authenticated::dispatch(ctx).await
}
- flow::State::Selected(ref user, ref mut mailbox) => {
+ flow::State::Selected(ref user, ref mut mailbox, ref perm) => {
let ctx = selected::SelectedContext {
req: &cmd,
server_capabilities: &self.server_capabilities,
client_capabilities: &mut self.client_capabilities,
user,
mailbox,
+ perm,
};
selected::dispatch(ctx).await
}
- flow::State::Examined(ref user, ref mut mailbox) => {
- let ctx = examined::ExaminedContext {
- req: &cmd,
- server_capabilities: &self.server_capabilities,
- client_capabilities: &mut self.client_capabilities,
- user,
- mailbox,
- };
- examined::dispatch(ctx).await
- }
+ flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")),
flow::State::Logout => Response::build()
.tag(cmd.tag.clone())
.message("No commands are allowed in the LOGOUT state.")
@@ -88,15 +99,18 @@ impl Instance {
e,
cmd
);
- return Response::build()
+ return ResponseOrIdle::Response(Response::build()
.to_req(&cmd)
.message(
"Internal error, processing command triggered an illegal IMAP state transition",
)
.bad()
- .unwrap();
+ .unwrap());
}
- resp
+ match self.state {
+ flow::State::Idle(..) => ResponseOrIdle::Idle,
+ _ => ResponseOrIdle::Response(resp),
+ }
}
}