aboutsummaryrefslogtreecommitdiff
path: root/src/imap/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/imap/mod.rs')
-rw-r--r--src/imap/mod.rs177
1 files changed, 98 insertions, 79 deletions
diff --git a/src/imap/mod.rs b/src/imap/mod.rs
index 544086e..dbf72fe 100644
--- a/src/imap/mod.rs
+++ b/src/imap/mod.rs
@@ -15,7 +15,7 @@ mod session;
use std::net::SocketAddr;
-use anyhow::{bail, Result};
+use anyhow::{anyhow, bail, Result, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
@@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut;
const PIPELINABLE_COMMANDS: usize = 64;
-#[derive(Debug)]
-enum LoopMode {
- Quit,
- Interactive,
- Idle(BytesMut, Arc<Notify>),
-}
-
// @FIXME a full refactor of this part of the code will be needed sooner or later
struct NetLoop {
ctx: ClientContext,
@@ -163,7 +156,7 @@ impl NetLoop {
async fn handler(ctx: ClientContext, sock: AnyStream) {
let addr = ctx.addr.clone();
- let nl = match Self::new(ctx, sock).await {
+ let mut nl = match Self::new(ctx, sock).await {
Ok(nl) => {
tracing::debug!(addr=?addr, "netloop successfully initialized");
nl
@@ -241,85 +234,111 @@ impl NetLoop {
tracing::info!("runner is quitting");
}
- async fn core(mut self) -> Result<()> {
- tracing::trace!("Starting the core loop");
- let mut mode = LoopMode::Interactive;
+ async fn core(&mut self) -> Result<()> {
+ let mut maybe_idle: Option<Arc<Notify>> = None;
loop {
- tracing::trace!(mode=?mode, "Core loop iter");
- mode = match mode {
- LoopMode::Interactive => self.interactive_mode().await?,
- LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
- LoopMode::Quit => break,
- }
- }
- Ok(())
- }
-
- async fn interactive_mode(&mut self) -> Result<LoopMode> {
- tokio::select! {
- // Managing imap_flow stuff
- srv_evt = self.server.progress() => match srv_evt? {
- ServerFlowEvent::ResponseSent { handle: _handle, response } => {
- match response {
- Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
- _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
- }
- },
- ServerFlowEvent::CommandReceived { command } => {
- match self.cmd_tx.try_send(Request::ImapCommand(command)) {
- Ok(_) => (),
- Err(mpsc::error::TrySendError::Full(_)) => {
- self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
- tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ tokio::select! {
+ // Managing imap_flow stuff
+ srv_evt = self.server.progress() => match srv_evt? {
+ ServerFlowEvent::ResponseSent { handle: _handle, response } => {
+ match response {
+ Response::Status(Status::Bye(_)) => return Ok(()),
+ _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
- _ => {
- self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ ServerFlowEvent::CommandReceived { command } => {
+ match self.cmd_tx.try_send(Request::ImapCommand(command)) {
+ Ok(_) => (),
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ }
+ _ => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ }
+ }
+ },
+ ServerFlowEvent::IdleCommandReceived { tag } => {
+ match self.cmd_tx.try_send(Request::IdleStart(tag)) {
+ Ok(_) => (),
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
+ tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
+ }
+ _ => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ }
}
}
- },
- flow => {
- self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
- tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
- }
- },
-
- // Managing response generated by Aerogramme
- maybe_msg = self.resp_rx.recv() => match maybe_msg {
- Some(ResponseOrIdle::Response(response)) => {
- tracing::trace!("Interactive, server has a response for the client");
- for body_elem in response.body.into_iter() {
- let _handle = match body_elem {
- Body::Data(d) => self.server.enqueue_data(d),
- Body::Status(s) => self.server.enqueue_status(s),
- };
+ ServerFlowEvent::IdleDoneReceived => {
+ tracing::trace!("client sent DONE and want to stop IDLE");
+ maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one();
+ maybe_idle = None;
+ }
+ flow => {
+ self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
+ tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
- self.server.enqueue_status(response.completion);
- },
- Some(ResponseOrIdle::StartIdle(stop)) => {
- tracing::trace!("Interactive, server agreed to switch in idle mode");
- let cr = CommandContinuationRequest::basic(None, "Idling")?;
- self.server.enqueue_continuation(cr);
- self.cmd_tx.try_send(Request::Idle)?;
- return Ok(LoopMode::Idle(BytesMut::new(), stop))
- },
- None => {
- self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
- tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
- Some(_) => unreachable!(),
- },
+ // Managing response generated by Aerogramme
+ maybe_msg = self.resp_rx.recv() => match maybe_msg {
+ Some(ResponseOrIdle::Response(response)) => {
+ tracing::trace!("Interactive, server has a response for the client");
+ for body_elem in response.body.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.server.enqueue_status(response.completion);
+ },
+ Some(ResponseOrIdle::IdleAccept(stop)) => {
+ tracing::trace!("Interactive, server agreed to switch in idle mode");
+ let cr = CommandContinuationRequest::basic(None, "Idling")?;
+ self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?;
+ self.cmd_tx.try_send(Request::IdlePoll)?;
+ if maybe_idle.is_some() {
+ bail!("Can't start IDLE if already idling");
+ }
+ maybe_idle = Some(stop);
+ },
+ Some(ResponseOrIdle::IdleEvent(elems)) => {
+ tracing::trace!("server imap session has some change to communicate to the client");
+ for body_elem in elems.into_iter() {
+ let _handle = match body_elem {
+ Body::Data(d) => self.server.enqueue_data(d),
+ Body::Status(s) => self.server.enqueue_status(s),
+ };
+ }
+ self.cmd_tx.try_send(Request::IdlePoll)?;
+ },
+ Some(ResponseOrIdle::IdleReject(response)) => {
+ tracing::trace!("inform client that session rejected idle");
+ self.server
+ .idle_reject(response.completion)
+ .or(Err(anyhow!("wrong reject command")))?;
+ },
+ None => {
+ self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
+ tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
+ },
+ Some(_) => unreachable!(),
- // When receiving a CTRL+C
- _ = self.ctx.must_exit.changed() => {
- tracing::trace!("Interactive, CTRL+C, exiting");
- self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
- },
- };
- Ok(LoopMode::Interactive)
+ },
+
+ // When receiving a CTRL+C
+ _ = self.ctx.must_exit.changed() => {
+ tracing::trace!("Interactive, CTRL+C, exiting");
+ self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
+ },
+ };
+ }
}
+ /*
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
// Flush send
loop {
@@ -398,5 +417,5 @@ impl NetLoop {
return Ok(LoopMode::Interactive)
},
};
- }
+ }*/
}