aboutsummaryrefslogtreecommitdiff
path: root/src/imap/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/imap/mod.rs')
-rw-r--r--src/imap/mod.rs275
1 files changed, 202 insertions, 73 deletions
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 61a265a..40c4d4f 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -8,24 +8,30 @@ mod index;
mod mail_view;
mod mailbox_view;
mod mime_view;
+mod request;
mod response;
mod search;
mod session;
use std::net::SocketAddr;
-use anyhow::Result;
+use anyhow::{bail, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
+use tokio::sync::mpsc;
use tokio::sync::watch;
+use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
use crate::config::ImapConfig;
use crate::imap::capability::ServerCapability;
+use crate::imap::request::Request;
+use crate::imap::response::{Body, ResponseOrIdle};
+use crate::imap::session::Instance;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
@@ -35,8 +41,8 @@ pub struct Server {
capabilities: ServerCapability,
}
+#[derive(Clone)]
struct ClientContext {
- stream: AnyStream,
addr: SocketAddr,
login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>,
@@ -74,13 +80,12 @@ impl Server {
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
- stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
server_capabilities: self.capabilities.clone(),
};
- let conn = tokio::spawn(client_wrapper(client));
+ let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
connections.push(conn);
}
drop(tcp);
@@ -92,46 +97,87 @@ impl Server {
}
}
-async fn client_wrapper(ctx: ClientContext) {
- let addr = ctx.addr.clone();
- match client(ctx).await {
- Ok(()) => {
- tracing::debug!("closing successful session for {:?}", addr);
- }
- Err(e) => {
- tracing::error!("closing errored session for {:?}: {}", addr, e);
+use std::sync::Arc;
+use tokio::sync::mpsc::*;
+use tokio::sync::Notify;
+use tokio_util::bytes::BytesMut;
+enum LoopMode {
+ Quit,
+ Interactive,
+ Idle(BytesMut, Arc<Notify>),
+}
+
+// @FIXME a full refactor of this part of the code will be needed sooner or later
+struct NetLoop {
+ ctx: ClientContext,
+ server: ServerFlow,
+ cmd_tx: Sender<Request>,
+ resp_rx: UnboundedReceiver<ResponseOrIdle>,
+}
+
+impl NetLoop {
+ async fn handler(ctx: ClientContext, sock: AnyStream) {
+ let addr = ctx.addr.clone();
+
+ let nl = match Self::new(ctx, sock).await {
+ Ok(nl) => {
+ tracing::debug!(addr=?addr, "netloop successfully initialized");
+ nl
+ }
+ Err(e) => {
+ tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
+ return;
+ }
+ };
+
+ match nl.core().await {
+ Ok(()) => {
+ tracing::debug!("closing successful netloop core for {:?}", addr);
+ }
+ Err(e) => {
+ tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
+ }
}
}
-}
-async fn client(mut ctx: ClientContext) -> Result<()> {
- // Send greeting
- let (mut server, _) = ServerFlow::send_greeting(
- ctx.stream,
- ServerFlowOptions {
- crlf_relaxed: false,
- literal_accept_text: Text::unvalidated("OK"),
- literal_reject_text: Text::unvalidated("Literal rejected"),
- ..ServerFlowOptions::default()
- },
- Greeting::ok(
- Some(Code::Capability(ctx.server_capabilities.to_vec())),
- "Aerogramme",
+ async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
+ // Send greeting
+ let (mut server, _) = ServerFlow::send_greeting(
+ sock,
+ ServerFlowOptions {
+ crlf_relaxed: false,
+ literal_accept_text: Text::unvalidated("OK"),
+ literal_reject_text: Text::unvalidated("Literal rejected"),
+ ..ServerFlowOptions::default()
+ },
+ Greeting::ok(
+ Some(Code::Capability(ctx.server_capabilities.to_vec())),
+ "Aerogramme",
+ )
+ .unwrap(),
)
- .unwrap(),
- )
- .await?;
+ .await?;
- use crate::imap::response::{Body, Response as MyResponse};
- use crate::imap::session::Instance;
- use imap_codec::imap_types::command::Command;
- use imap_codec::imap_types::response::{Code, Response, Status};
+ // Start a mailbox session in background
+ let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
+ let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
+ tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
- use tokio::sync::mpsc;
- let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
- let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
+ // Return the object
+ Ok(NetLoop {
+ ctx,
+ server,
+ cmd_tx,
+ resp_rx,
+ })
+ }
- let bckgrnd = tokio::spawn(async move {
+ /// Coms with the background session
+ async fn session(
+ ctx: ClientContext,
+ mut cmd_rx: Receiver<Request>,
+ resp_tx: UnboundedSender<ResponseOrIdle>,
+ ) -> () {
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
loop {
let cmd = match cmd_rx.recv().await {
@@ -140,8 +186,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
- let maybe_response = session.command(cmd).await;
- tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response");
+ let maybe_response = session.request(cmd).await;
+ tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
match resp_tx.send(maybe_response) {
Err(_) => break,
@@ -149,67 +195,150 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
};
}
tracing::info!("runner is quitting");
- });
+ }
- // Main loop
- loop {
+ async fn core(mut self) -> Result<()> {
+ let mut mode = LoopMode::Interactive;
+ loop {
+ mode = match mode {
+ LoopMode::Interactive => self.interactive_mode().await?,
+ LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
+ LoopMode::Quit => break,
+ }
+ }
+ Ok(())
+ }
+
+ async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! {
// Managing imap_flow stuff
- srv_evt = server.progress() => match srv_evt? {
+ srv_evt = self.server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
- Response::Status(Status::Bye(_)) => break,
- _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
+ Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
+ _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
- match cmd_tx.try_send(command) {
+ match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
- server.enqueue_status(Status::bye(None, "Too fast").unwrap());
- tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
},
flow => {
- server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
- tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow);
-
+ self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
+ tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
},
// Managing response generated by Aerogramme
- maybe_msg = resp_rx.recv() => {
- let response = match maybe_msg {
- None => {
- server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", ctx.addr);
- continue
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ Some(ResponseOrIdle::Response(response)) => {
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ },
+ Some(ResponseOrIdle::StartIdle(stop)) => {
+ let cr = CommandContinuationRequest::basic(None, "Idling")?;
+ self.server.enqueue_continuation(cr);
+ self.cmd_tx.try_send(Request::Idle)?;
+ return Ok(LoopMode::Idle(BytesMut::new(), stop))
+ },
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ Some(_) => unreachable!(),
+
+ },
+
+ // When receiving a CTRL+C
+ _ = self.ctx.must_exit.changed() => {
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ },
+ };
+ Ok(LoopMode::Interactive)
+ }
+
+ async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
+ // Flush send
+ loop {
+ match self.server.progress_send().await? {
+ Some(..) => continue,
+ None => break,
+ }
+ }
+
+ tokio::select! {
+ // Receiving IDLE event from background
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ // Session decided idle is terminated
+ Some(ResponseOrIdle::Response(response)) => {
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ return Ok(LoopMode::Interactive)
+ },
+ // Session has some information for user
+ Some(ResponseOrIdle::IdleEvent(elems)) => {
+ for body_elem in elems.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.cmd_tx.try_send(Request::Idle)?;
+ return Ok(LoopMode::Idle(buff, stop))
+ },
+
+ // Session crashed
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ return Ok(LoopMode::Interactive)
+ },
+
+ // Session can't start idling while already idling, it's a logic error!
+ Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"),
+ },
+
+ // User is trying to interact with us
+ _read_client_bytes = self.server.stream.read(&mut buff) => {
+ use imap_codec::decode::Decoder;
+ let codec = imap_codec::IdleDoneCodec::new();
+ match codec.decode(&buff) {
+ Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => {
+ // Session will be informed that it must stop idle
+ // It will generate the "done" message and change the loop mode
+ stop.notify_one()
},
- Some(r) => r,
+ Err(_) => (),
+ _ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."),
};
- for body_elem in response.body.into_iter() {
- let _handle = match body_elem {
- Body::Data(d) => server.enqueue_data(d),
- Body::Status(s) => server.enqueue_status(s),
- };
- }
- server.enqueue_status(response.completion);
+ return Ok(LoopMode::Idle(buff, stop))
},
// When receiving a CTRL+C
- _ = ctx.must_exit.changed() => {
- server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ _ = self.ctx.must_exit.changed() => {
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ return Ok(LoopMode::Interactive)
},
};
}
-
- drop(cmd_tx);
- bckgrnd.await?;
- Ok(())
}