diff options
author | Quentin <quentin@dufour.io> | 2024-01-02 22:44:29 +0000 |
---|---|---|
committer | Quentin <quentin@dufour.io> | 2024-01-02 22:44:29 +0000 |
commit | b9a0c1e6eced036eb71e8221a4f236f72832fec2 (patch) | |
tree | c498a7a2a5833f2c6f27d4ba97894747f9d454c0 /src/imap/mod.rs | |
parent | 6ff3c6f71efd802da422a371e6168ae528fb2ddc (diff) | |
parent | c9a33c080d39d4a2b269e3c8f166a708b6606da5 (diff) | |
download | aerogramme-b9a0c1e6eced036eb71e8221a4f236f72832fec2.tar.gz aerogramme-b9a0c1e6eced036eb71e8221a4f236f72832fec2.zip |
Merge pull request 'Implement imap-flow' (#34) from refactor/imap-flow into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/34
Diffstat (limited to 'src/imap/mod.rs')
-rw-r--r-- | src/imap/mod.rs | 223 |
1 files changed, 152 insertions, 71 deletions
diff --git a/src/imap/mod.rs b/src/imap/mod.rs index f85bcc6..31eeaa8 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -1,105 +1,186 @@ mod command; mod flow; mod mailbox_view; +mod response; mod session; -use std::task::{Context, Poll}; +use std::net::SocketAddr; use anyhow::Result; -use boitalettres::errors::Error as BalError; -use boitalettres::proto::{Request, Response}; -use boitalettres::server::accept::addr::AddrIncoming; -use boitalettres::server::accept::addr::AddrStream; -use boitalettres::server::Server as ImapServer; -use futures::future::BoxFuture; -use futures::future::FutureExt; +use futures::stream::{FuturesUnordered, StreamExt}; + +use tokio::net::TcpListener; use tokio::sync::watch; -use tower::Service; + +use imap_codec::imap_types::response::Greeting; +use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; +use imap_flow::stream::AnyStream; use crate::config::ImapConfig; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL -pub struct Server(ImapServer<AddrIncoming, Instance>); - -pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> { - //@FIXME add a configuration parameter - let incoming = AddrIncoming::new(config.bind_addr).await?; - tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr); - - let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); - Ok(Server(imap)) -} - -impl Server { - pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> { - tracing::info!("IMAP started!"); - tokio::select! { - s = self.0 => s?, - _ = must_exit.changed() => tracing::info!("Stopped IMAP server"), - } - - Ok(()) - } +pub struct Server { + bind_addr: SocketAddr, + login_provider: ArcLoginProvider, } -//--- - -/// Instance is the main Tokio Tower service that we register in BàL. -/// It receives new connection demands and spawn a dedicated service. -struct Instance { +struct ClientContext { + stream: AnyStream, + addr: SocketAddr, login_provider: ArcLoginProvider, + must_exit: watch::Receiver<bool>, } -impl Instance { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { login_provider } +pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server { + Server { + bind_addr: config.bind_addr, + login_provider: login, } } -impl<'a> Service<&'a AddrStream> for Instance { - type Response = Connection; - type Error = anyhow::Error; - type Future = BoxFuture<'static, Result<Self::Response>>; +impl Server { + pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + tracing::info!("IMAP server listening on {:#}", self.bind_addr); + + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = tokio::select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + tracing::info!("IMAP: accepted connection from {}", remote_addr); + + let client = ClientContext { + stream: AnyStream::new(socket), + addr: remote_addr.clone(), + login_provider: self.login_provider.clone(), + must_exit: must_exit.clone(), + }; + let conn = tokio::spawn(client_wrapper(client)); + connections.push(conn); + } + drop(tcp); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + tracing::info!("IMAP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} - fn call(&mut self, addr: &'a AddrStream) -> Self::Future { - tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); - let lp = self.login_provider.clone(); - async { Ok(Connection::new(lp)) }.boxed() + Ok(()) } } -//--- - -/// Connection is the per-connection Tokio Tower service we register in BàL. -/// It handles a single TCP connection, and thus has a business logic. -struct Connection { - session: session::Manager, -} - -impl Connection { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { - session: session::Manager::new(login_provider), +async fn client_wrapper(ctx: ClientContext) { + let addr = ctx.addr.clone(); + match client(ctx).await { + Ok(()) => { + tracing::info!("closing successful session for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored session for {:?}: {}", addr, e); } } } -impl Service<Request> for Connection { - type Response = Response; - type Error = BalError; - type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; +async fn client(mut ctx: ClientContext) -> Result<()> { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + ctx.stream, + ServerFlowOptions::default(), + Greeting::ok(None, "Aerogramme").unwrap(), + ) + .await?; + + use crate::imap::response::{Body, Response as MyResponse}; + use crate::imap::session::Instance; + use imap_codec::imap_types::command::Command; + use imap_codec::imap_types::response::{Response, Status}; + + use tokio::sync::mpsc; + let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>(); + + let bckgrnd = tokio::spawn(async move { + let mut session = Instance::new(ctx.login_provider); + loop { + let cmd = match cmd_rx.recv().await { + None => break, + Some(cmd_recv) => cmd_recv, + }; + + let maybe_response = session.command(cmd).await; + + match resp_tx.send(maybe_response) { + Err(_) => break, + Ok(_) => (), + }; + } + tracing::info!("runner is quitting"); + }); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) + // Main loop + loop { + tokio::select! { + // Managing imap_flow stuff + srv_evt = server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => break, + _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + } + }, + ServerFlowEvent::CommandReceived { command } => { + match cmd_tx.try_send(command) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + } + _ => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + } + } + }, + }, + + // Managing response generated by Aerogramme + maybe_msg = resp_rx.recv() => { + let response = match maybe_msg { + None => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + continue + }, + Some(r) => r, + }; + + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => server.enqueue_data(d), + Body::Status(s) => server.enqueue_status(s), + }; + } + server.enqueue_status(response.completion); + }, + + // When receiving a CTRL+C + _ = ctx.must_exit.changed() => { + server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; } - fn call(&mut self, req: Request) -> Self::Future { - tracing::debug!("Got request: {:#?}", req.command); - self.session.process(req) - } + drop(cmd_tx); + bckgrnd.await?; + Ok(()) } |