From 0d667a30301bec47c03314ff0e449a220ad3b913 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 2 Jan 2024 20:23:33 +0100 Subject: compile with imap-flow --- src/imap/mod.rs | 225 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 152 insertions(+), 73 deletions(-) (limited to 'src/imap/mod.rs') diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 589231b..31eeaa8 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -4,104 +4,183 @@ mod mailbox_view; mod response; mod session; -use std::task::{Context, Poll}; +use std::net::SocketAddr; use anyhow::Result; -//use boitalettres::errors::Error as BalError; -//use boitalettres::proto::{Request, Response}; -//use boitalettres::server::accept::addr::AddrIncoming; -//use boitalettres::server::accept::addr::AddrStream; -//use boitalettres::server::Server as ImapServer; -use futures::future::BoxFuture; -use futures::future::FutureExt; +use futures::stream::{FuturesUnordered, StreamExt}; + +use tokio::net::TcpListener; use tokio::sync::watch; +use imap_codec::imap_types::response::Greeting; +use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; +use imap_flow::stream::AnyStream; + use crate::config::ImapConfig; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL -pub struct Server {} - -pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result { - unimplemented!(); - /* let incoming = AddrIncoming::new(config.bind_addr).await?; - tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr); - - let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); - Ok(Server(imap))*/ -} - -impl Server { - pub async fn run(self, mut must_exit: watch::Receiver) -> Result<()> { - tracing::info!("IMAP started!"); - unimplemented!(); - /*tokio::select! { - s = self.0 => s?, - _ = must_exit.changed() => tracing::info!("Stopped IMAP server"), - } - - Ok(())*/ - } +pub struct Server { + bind_addr: SocketAddr, + login_provider: ArcLoginProvider, } -//--- -/* -/// Instance is the main Tokio Tower service that we register in BàL. -/// It receives new connection demands and spawn a dedicated service. -struct Instance { +struct ClientContext { + stream: AnyStream, + addr: SocketAddr, login_provider: ArcLoginProvider, + must_exit: watch::Receiver, } -impl Instance { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { login_provider } +pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server { + Server { + bind_addr: config.bind_addr, + login_provider: login, } } -impl<'a> Service<&'a AddrStream> for Instance { - type Response = Connection; - type Error = anyhow::Error; - type Future = BoxFuture<'static, Result>; +impl Server { + pub async fn run(self: Self, mut must_exit: watch::Receiver) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + tracing::info!("IMAP server listening on {:#}", self.bind_addr); + + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = tokio::select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + tracing::info!("IMAP: accepted connection from {}", remote_addr); + + let client = ClientContext { + stream: AnyStream::new(socket), + addr: remote_addr.clone(), + login_provider: self.login_provider.clone(), + must_exit: must_exit.clone(), + }; + let conn = tokio::spawn(client_wrapper(client)); + connections.push(conn); + } + drop(tcp); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + tracing::info!("IMAP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} - fn call(&mut self, addr: &'a AddrStream) -> Self::Future { - tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); - let lp = self.login_provider.clone(); - async { Ok(Connection::new(lp)) }.boxed() + Ok(()) } } -//--- - -/// Connection is the per-connection Tokio Tower service we register in BàL. -/// It handles a single TCP connection, and thus has a business logic. -struct Connection { - session: session::Manager, -} - -impl Connection { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { - session: session::Manager::new(login_provider), +async fn client_wrapper(ctx: ClientContext) { + let addr = ctx.addr.clone(); + match client(ctx).await { + Ok(()) => { + tracing::info!("closing successful session for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored session for {:?}: {}", addr, e); } } } -impl Service for Connection { - type Response = Response; - type Error = BalError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) +async fn client(mut ctx: ClientContext) -> Result<()> { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + ctx.stream, + ServerFlowOptions::default(), + Greeting::ok(None, "Aerogramme").unwrap(), + ) + .await?; + + use crate::imap::response::{Body, Response as MyResponse}; + use crate::imap::session::Instance; + use imap_codec::imap_types::command::Command; + use imap_codec::imap_types::response::{Response, Status}; + + use tokio::sync::mpsc; + let (cmd_tx, mut cmd_rx) = mpsc::channel::>(10); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::>(); + + let bckgrnd = tokio::spawn(async move { + let mut session = Instance::new(ctx.login_provider); + loop { + let cmd = match cmd_rx.recv().await { + None => break, + Some(cmd_recv) => cmd_recv, + }; + + let maybe_response = session.command(cmd).await; + + match resp_tx.send(maybe_response) { + Err(_) => break, + Ok(_) => (), + }; + } + tracing::info!("runner is quitting"); + }); + + // Main loop + loop { + tokio::select! { + // Managing imap_flow stuff + srv_evt = server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => break, + _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + } + }, + ServerFlowEvent::CommandReceived { command } => { + match cmd_tx.try_send(command) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + } + _ => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + } + } + }, + }, + + // Managing response generated by Aerogramme + maybe_msg = resp_rx.recv() => { + let response = match maybe_msg { + None => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + continue + }, + Some(r) => r, + }; + + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => server.enqueue_data(d), + Body::Status(s) => server.enqueue_status(s), + }; + } + server.enqueue_status(response.completion); + }, + + // When receiving a CTRL+C + _ = ctx.must_exit.changed() => { + server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; } - fn call(&mut self, req: Request) -> Self::Future { - tracing::debug!("Got request: {:#?}", req.command); - self.session.process(req) - } + drop(cmd_tx); + bckgrnd.await?; + Ok(()) } -*/ -- cgit v1.2.3