diff options
author | Quentin <quentin@dufour.io> | 2024-01-19 14:04:03 +0000 |
---|---|---|
committer | Quentin <quentin@dufour.io> | 2024-01-19 14:04:03 +0000 |
commit | 0f227e44e4996e54d2e55ed5c7a07f5458c4db4f (patch) | |
tree | 7f6c4fec623d0d99d3f09e752a360ca0f806429e /src/imap/mod.rs | |
parent | 55e26d24a08519ded6a6898453dcd6db287f45c8 (diff) | |
parent | 23aa313e11f344da07143d60ce446b5f23d5f362 (diff) | |
download | aerogramme-0f227e44e4996e54d2e55ed5c7a07f5458c4db4f.tar.gz aerogramme-0f227e44e4996e54d2e55ed5c7a07f5458c4db4f.zip |
Merge pull request 'Implement IDLE' (#72) from feat/idle into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/72
Diffstat (limited to 'src/imap/mod.rs')
-rw-r--r-- | src/imap/mod.rs | 275 |
1 files changed, 202 insertions, 73 deletions
diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 61a265a..40c4d4f 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -8,24 +8,30 @@ mod index; mod mail_view; mod mailbox_view; mod mime_view; +mod request; mod response; mod search; mod session; use std::net::SocketAddr; -use anyhow::Result; +use anyhow::{bail, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; +use tokio::sync::mpsc; use tokio::sync::watch; +use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status}; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; use crate::config::ImapConfig; use crate::imap::capability::ServerCapability; +use crate::imap::request::Request; +use crate::imap::response::{Body, ResponseOrIdle}; +use crate::imap::session::Instance; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL @@ -35,8 +41,8 @@ pub struct Server { capabilities: ServerCapability, } +#[derive(Clone)] struct ClientContext { - stream: AnyStream, addr: SocketAddr, login_provider: ArcLoginProvider, must_exit: watch::Receiver<bool>, @@ -74,13 +80,12 @@ impl Server { tracing::info!("IMAP: accepted connection from {}", remote_addr); let client = ClientContext { - stream: AnyStream::new(socket), addr: remote_addr.clone(), login_provider: self.login_provider.clone(), must_exit: must_exit.clone(), server_capabilities: self.capabilities.clone(), }; - let conn = tokio::spawn(client_wrapper(client)); + let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket))); connections.push(conn); } drop(tcp); @@ -92,46 +97,87 @@ impl Server { } } -async fn client_wrapper(ctx: ClientContext) { - let addr = ctx.addr.clone(); - match client(ctx).await { - Ok(()) => { - tracing::debug!("closing successful session for {:?}", addr); - } - Err(e) => { - tracing::error!("closing errored session for {:?}: {}", addr, e); +use std::sync::Arc; +use tokio::sync::mpsc::*; +use tokio::sync::Notify; +use tokio_util::bytes::BytesMut; +enum LoopMode { + Quit, + Interactive, + Idle(BytesMut, Arc<Notify>), +} + +// @FIXME a full refactor of this part of the code will be needed sooner or later +struct NetLoop { + ctx: ClientContext, + server: ServerFlow, + cmd_tx: Sender<Request>, + resp_rx: UnboundedReceiver<ResponseOrIdle>, +} + +impl NetLoop { + async fn handler(ctx: ClientContext, sock: AnyStream) { + let addr = ctx.addr.clone(); + + let nl = match Self::new(ctx, sock).await { + Ok(nl) => { + tracing::debug!(addr=?addr, "netloop successfully initialized"); + nl + } + Err(e) => { + tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); + return; + } + }; + + match nl.core().await { + Ok(()) => { + tracing::debug!("closing successful netloop core for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored netloop core for {:?}: {}", addr, e); + } } } -} -async fn client(mut ctx: ClientContext) -> Result<()> { - // Send greeting - let (mut server, _) = ServerFlow::send_greeting( - ctx.stream, - ServerFlowOptions { - crlf_relaxed: false, - literal_accept_text: Text::unvalidated("OK"), - literal_reject_text: Text::unvalidated("Literal rejected"), - ..ServerFlowOptions::default() - }, - Greeting::ok( - Some(Code::Capability(ctx.server_capabilities.to_vec())), - "Aerogramme", + async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + sock, + ServerFlowOptions { + crlf_relaxed: false, + literal_accept_text: Text::unvalidated("OK"), + literal_reject_text: Text::unvalidated("Literal rejected"), + ..ServerFlowOptions::default() + }, + Greeting::ok( + Some(Code::Capability(ctx.server_capabilities.to_vec())), + "Aerogramme", + ) + .unwrap(), ) - .unwrap(), - ) - .await?; + .await?; - use crate::imap::response::{Body, Response as MyResponse}; - use crate::imap::session::Instance; - use imap_codec::imap_types::command::Command; - use imap_codec::imap_types::response::{Code, Response, Status}; + // Start a mailbox session in background + let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>(); + tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); - use tokio::sync::mpsc; - let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10); - let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>(); + // Return the object + Ok(NetLoop { + ctx, + server, + cmd_tx, + resp_rx, + }) + } - let bckgrnd = tokio::spawn(async move { + /// Coms with the background session + async fn session( + ctx: ClientContext, + mut cmd_rx: Receiver<Request>, + resp_tx: UnboundedSender<ResponseOrIdle>, + ) -> () { let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); loop { let cmd = match cmd_rx.recv().await { @@ -140,8 +186,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command"); - let maybe_response = session.command(cmd).await; - tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response"); + let maybe_response = session.request(cmd).await; + tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response"); match resp_tx.send(maybe_response) { Err(_) => break, @@ -149,67 +195,150 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; } tracing::info!("runner is quitting"); - }); + } - // Main loop - loop { + async fn core(mut self) -> Result<()> { + let mut mode = LoopMode::Interactive; + loop { + mode = match mode { + LoopMode::Interactive => self.interactive_mode().await?, + LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, + LoopMode::Quit => break, + } + } + Ok(()) + } + + async fn interactive_mode(&mut self) -> Result<LoopMode> { tokio::select! { // Managing imap_flow stuff - srv_evt = server.progress() => match srv_evt? { + srv_evt = self.server.progress() => match srv_evt? { ServerFlowEvent::ResponseSent { handle: _handle, response } => { match response { - Response::Status(Status::Bye(_)) => break, - _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } }, ServerFlowEvent::CommandReceived { command } => { - match cmd_tx.try_send(command) { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { Ok(_) => (), Err(mpsc::error::TrySendError::Full(_)) => { - server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); } _ => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); } } }, flow => { - server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow); - + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } }, // Managing response generated by Aerogramme - maybe_msg = resp_rx.recv() => { - let response = match maybe_msg { - None => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); - continue + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::StartIdle(stop)) => { + let cr = CommandContinuationRequest::basic(None, "Idling")?; + self.server.enqueue_continuation(cr); + self.cmd_tx.try_send(Request::Idle)?; + return Ok(LoopMode::Idle(BytesMut::new(), stop)) + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + Some(_) => unreachable!(), + + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; + Ok(LoopMode::Interactive) + } + + async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> { + // Flush send + loop { + match self.server.progress_send().await? { + Some(..) => continue, + None => break, + } + } + + tokio::select! { + // Receiving IDLE event from background + maybe_msg = self.resp_rx.recv() => match maybe_msg { + // Session decided idle is terminated + Some(ResponseOrIdle::Response(response)) => { + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + return Ok(LoopMode::Interactive) + }, + // Session has some information for user + Some(ResponseOrIdle::IdleEvent(elems)) => { + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.cmd_tx.try_send(Request::Idle)?; + return Ok(LoopMode::Idle(buff, stop)) + }, + + // Session crashed + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + return Ok(LoopMode::Interactive) + }, + + // Session can't start idling while already idling, it's a logic error! + Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"), + }, + + // User is trying to interact with us + _read_client_bytes = self.server.stream.read(&mut buff) => { + use imap_codec::decode::Decoder; + let codec = imap_codec::IdleDoneCodec::new(); + match codec.decode(&buff) { + Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => { + // Session will be informed that it must stop idle + // It will generate the "done" message and change the loop mode + stop.notify_one() }, - Some(r) => r, + Err(_) => (), + _ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."), }; - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => server.enqueue_data(d), - Body::Status(s) => server.enqueue_status(s), - }; - } - server.enqueue_status(response.completion); + return Ok(LoopMode::Idle(buff, stop)) }, // When receiving a CTRL+C - _ = ctx.must_exit.changed() => { - server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + _ = self.ctx.must_exit.changed() => { + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + return Ok(LoopMode::Interactive) }, }; } - - drop(cmd_tx); - bckgrnd.await?; - Ok(()) } |