aboutsummaryrefslogtreecommitdiff
path: root/src/imap/session.rs
diff options
context:
space:
mode:
authorQuentin <quentin@dufour.io>2024-01-02 22:44:29 +0000
committerQuentin <quentin@dufour.io>2024-01-02 22:44:29 +0000
commitb9a0c1e6eced036eb71e8221a4f236f72832fec2 (patch)
treec498a7a2a5833f2c6f27d4ba97894747f9d454c0 /src/imap/session.rs
parent6ff3c6f71efd802da422a371e6168ae528fb2ddc (diff)
parentc9a33c080d39d4a2b269e3c8f166a708b6606da5 (diff)
downloadaerogramme-b9a0c1e6eced036eb71e8221a4f236f72832fec2.tar.gz
aerogramme-b9a0c1e6eced036eb71e8221a4f236f72832fec2.zip
Merge pull request 'Implement imap-flow' (#34) from refactor/imap-flow into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/34
Diffstat (limited to 'src/imap/session.rs')
-rw-r--r--src/imap/session.rs224
1 files changed, 65 insertions, 159 deletions
diff --git a/src/imap/session.rs b/src/imap/session.rs
index 15141d3..5c67f8e 100644
--- a/src/imap/session.rs
+++ b/src/imap/session.rs
@@ -1,180 +1,86 @@
-use anyhow::Error;
-use boitalettres::errors::Error as BalError;
-use boitalettres::proto::{Request, Response};
-use futures::future::BoxFuture;
-use futures::future::FutureExt;
-
-use tokio::sync::mpsc::error::TrySendError;
-use tokio::sync::{mpsc, oneshot};
-
use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
+use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
-
-/* This constant configures backpressure in the system,
- * or more specifically, how many pipelined messages are allowed
- * before refusing them
- */
-const MAX_PIPELINED_COMMANDS: usize = 10;
-
-struct Message {
- req: Request,
- tx: oneshot::Sender<Result<Response, BalError>>,
-}
-
-//-----
-
-pub struct Manager {
- tx: mpsc::Sender<Message>,
-}
-
-impl Manager {
- pub fn new(login_provider: ArcLoginProvider) -> Self {
- let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
- tokio::spawn(async move {
- let instance = Instance::new(login_provider, rx);
- instance.start().await;
- });
- Self { tx }
- }
-
- pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
- let (tx, rx) = oneshot::channel();
- let msg = Message { req, tx };
-
- // We use try_send on a bounded channel to protect the daemons from DoS.
- // Pipelining requests in IMAP are a special case: they should not occure often
- // and in a limited number (like 3 requests). Someone filling the channel
- // will probably be malicious so we "rate limit" them.
- match self.tx.try_send(msg) {
- Ok(()) => (),
- Err(TrySendError::Full(_)) => {
- return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed()
- }
- Err(TrySendError::Closed(_)) => {
- return async { Err(BalError::Text("Terminated session".to_string())) }.boxed()
- }
- };
-
- // @FIXME add a timeout, handle a session that fails.
- async {
- match rx.await {
- Ok(r) => r,
- Err(e) => {
- tracing::warn!("Got error {:#?}", e);
- Response::bad("No response from the session handler")
- }
- }
- }
- .boxed()
- }
-}
+use imap_codec::imap_types::command::Command;
//-----
-
pub struct Instance {
- rx: mpsc::Receiver<Message>,
-
pub login_provider: ArcLoginProvider,
pub state: flow::State,
}
impl Instance {
- fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver<Message>) -> Self {
+ pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
login_provider,
- rx,
state: flow::State::NotAuthenticated,
}
}
- //@FIXME add a function that compute the runner's name from its local info
- // to ease debug
- // fn name(&self) -> String { }
-
- async fn start(mut self) {
- //@FIXME add more info about the runner
- tracing::debug!("starting runner");
-
- while let Some(msg) = self.rx.recv().await {
- // Command behavior is modulated by the state.
- // To prevent state error, we handle the same command in separate code paths.
- let ctrl = match &mut self.state {
- flow::State::NotAuthenticated => {
- let ctx = anonymous::AnonymousContext {
- req: &msg.req,
- login_provider: Some(&self.login_provider),
- };
- anonymous::dispatch(ctx).await
- }
- flow::State::Authenticated(ref user) => {
- let ctx = authenticated::AuthenticatedContext {
- req: &msg.req,
- user,
- };
- authenticated::dispatch(ctx).await
- }
- flow::State::Selected(ref user, ref mut mailbox) => {
- let ctx = selected::SelectedContext {
- req: &msg.req,
- user,
- mailbox,
- };
- selected::dispatch(ctx).await
- }
- flow::State::Examined(ref user, ref mut mailbox) => {
- let ctx = examined::ExaminedContext {
- req: &msg.req,
- user,
- mailbox,
- };
- examined::dispatch(ctx).await
- }
- flow::State::Logout => {
- Response::bad("No commands are allowed in the LOGOUT state.")
- .map(|r| (r, flow::Transition::None))
- .map_err(Error::msg)
- }
- };
-
- // Process result
- let res = match ctrl {
- Ok((res, tr)) => {
- //@FIXME remove unwrap
- self.state = match self.state.apply(tr) {
- Ok(new_state) => new_state,
- Err(e) => {
- tracing::error!("Invalid transition: {}, exiting", e);
- break;
- }
- };
-
- //@FIXME enrich here the command with some global status
-
- Ok(res)
- }
- // Cast from anyhow::Error to Bal::Error
- // @FIXME proper error handling would be great
- Err(e) => match e.downcast::<BalError>() {
- Ok(be) => Err(be),
- Err(e) => {
- tracing::warn!(error=%e, "internal.error");
- Response::bad("Internal error")
- }
- },
- };
-
- //@FIXME I think we should quit this thread on error and having our manager watch it,
- // and then abort the session as it is corrupted.
- msg.tx.send(res).unwrap_or_else(|e| {
- tracing::warn!("failed to send imap response to manager: {:#?}", e)
- });
-
- if let flow::State::Logout = &self.state {
- break;
+ pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
+ // Command behavior is modulated by the state.
+ // To prevent state error, we handle the same command in separate code paths.
+ let (resp, tr) = match &mut self.state {
+ flow::State::NotAuthenticated => {
+ let ctx = anonymous::AnonymousContext {
+ req: &cmd,
+ login_provider: &self.login_provider,
+ };
+ anonymous::dispatch(ctx).await
+ }
+ flow::State::Authenticated(ref user) => {
+ let ctx = authenticated::AuthenticatedContext { req: &cmd, user };
+ authenticated::dispatch(ctx).await
+ }
+ flow::State::Selected(ref user, ref mut mailbox) => {
+ let ctx = selected::SelectedContext {
+ req: &cmd,
+ user,
+ mailbox,
+ };
+ selected::dispatch(ctx).await
}
+ flow::State::Examined(ref user, ref mut mailbox) => {
+ let ctx = examined::ExaminedContext {
+ req: &cmd,
+ user,
+ mailbox,
+ };
+ examined::dispatch(ctx).await
+ }
+ flow::State::Logout => Response::build()
+ .tag(cmd.tag.clone())
+ .message("No commands are allowed in the LOGOUT state.")
+ .bad()
+ .map(|r| (r, flow::Transition::None)),
+ }
+ .unwrap_or_else(|err| {
+ tracing::error!("Command error {:?} occured while processing {:?}", err, cmd);
+ (
+ Response::build()
+ .to_req(&cmd)
+ .message("Internal error while processing command")
+ .bad()
+ .unwrap(),
+ flow::Transition::None,
+ )
+ });
+
+ if let Err(e) = self.state.apply(tr) {
+ tracing::error!(
+ "Transition error {:?} occured while processing on command {:?}",
+ e,
+ cmd
+ );
+ return Response::build()
+ .to_req(&cmd)
+ .message(
+ "Internal error, processing command triggered an illegal IMAP state transition",
+ )
+ .bad()
+ .unwrap();
}
- //@FIXME add more info about the runner
- tracing::debug!("exiting runner");
+ resp
}
}