diff options
Diffstat (limited to 'src/imap/mod.rs')
-rw-r--r-- | src/imap/mod.rs | 177 |
1 files changed, 98 insertions, 79 deletions
diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 544086e..dbf72fe 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,7 +15,7 @@ mod session; use std::net::SocketAddr; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; @@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut; const PIPELINABLE_COMMANDS: usize = 64; -#[derive(Debug)] -enum LoopMode { - Quit, - Interactive, - Idle(BytesMut, Arc<Notify>), -} - // @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, @@ -163,7 +156,7 @@ impl NetLoop { async fn handler(ctx: ClientContext, sock: AnyStream) { let addr = ctx.addr.clone(); - let nl = match Self::new(ctx, sock).await { + let mut nl = match Self::new(ctx, sock).await { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl @@ -241,85 +234,111 @@ impl NetLoop { tracing::info!("runner is quitting"); } - async fn core(mut self) -> Result<()> { - tracing::trace!("Starting the core loop"); - let mut mode = LoopMode::Interactive; + async fn core(&mut self) -> Result<()> { + let mut maybe_idle: Option<Arc<Notify>> = None; loop { - tracing::trace!(mode=?mode, "Core loop iter"); - mode = match mode { - LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, - LoopMode::Quit => break, - } - } - Ok(()) - } - - async fn interactive_mode(&mut self) -> Result<LoopMode> { - tokio::select! { - // Managing imap_flow stuff - srv_evt = self.server.progress() => match srv_evt? { - ServerFlowEvent::ResponseSent { handle: _handle, response } => { - match response { - Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), - _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), - } - }, - ServerFlowEvent::CommandReceived { command } => { - match self.cmd_tx.try_send(Request::ImapCommand(command)) { - Ok(_) => (), - Err(mpsc::error::TrySendError::Full(_)) => { - self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + tokio::select! { + // Managing imap_flow stuff + srv_evt = self.server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => return Ok(()), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } - _ => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + ServerFlowEvent::CommandReceived { command } => { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } + } + }, + ServerFlowEvent::IdleCommandReceived { tag } => { + match self.cmd_tx.try_send(Request::IdleStart(tag)) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); + } + _ => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + } } } - }, - flow => { - self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); - } - }, - - // Managing response generated by Aerogramme - maybe_msg = self.resp_rx.recv() => match maybe_msg { - Some(ResponseOrIdle::Response(response)) => { - tracing::trace!("Interactive, server has a response for the client"); - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => self.server.enqueue_data(d), - Body::Status(s) => self.server.enqueue_status(s), - }; + ServerFlowEvent::IdleDoneReceived => { + tracing::trace!("client sent DONE and want to stop IDLE"); + maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one(); + maybe_idle = None; + } + flow => { + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } - self.server.enqueue_status(response.completion); - }, - Some(ResponseOrIdle::StartIdle(stop)) => { - tracing::trace!("Interactive, server agreed to switch in idle mode"); - let cr = CommandContinuationRequest::basic(None, "Idling")?; - self.server.enqueue_continuation(cr); - self.cmd_tx.try_send(Request::Idle)?; - return Ok(LoopMode::Idle(BytesMut::new(), stop)) - }, - None => { - self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, - Some(_) => unreachable!(), - }, + // Managing response generated by Aerogramme + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + tracing::trace!("Interactive, server has a response for the client"); + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::IdleAccept(stop)) => { + tracing::trace!("Interactive, server agreed to switch in idle mode"); + let cr = CommandContinuationRequest::basic(None, "Idling")?; + self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?; + self.cmd_tx.try_send(Request::IdlePoll)?; + if maybe_idle.is_some() { + bail!("Can't start IDLE if already idling"); + } + maybe_idle = Some(stop); + }, + Some(ResponseOrIdle::IdleEvent(elems)) => { + tracing::trace!("server imap session has some change to communicate to the client"); + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.cmd_tx.try_send(Request::IdlePoll)?; + }, + Some(ResponseOrIdle::IdleReject(response)) => { + tracing::trace!("inform client that session rejected idle"); + self.server + .idle_reject(response.completion) + .or(Err(anyhow!("wrong reject command")))?; + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, + Some(_) => unreachable!(), - // When receiving a CTRL+C - _ = self.ctx.must_exit.changed() => { - tracing::trace!("Interactive, CTRL+C, exiting"); - self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); - }, - }; - Ok(LoopMode::Interactive) + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + tracing::trace!("Interactive, CTRL+C, exiting"); + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; + } } + /* async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> { // Flush send loop { @@ -398,5 +417,5 @@ impl NetLoop { return Ok(LoopMode::Interactive) }, }; - } + }*/ } |