use std::net::SocketAddr; use std::{pin::Pin, sync::Arc}; use anyhow::Result; use async_trait::async_trait; use duplexify::Duplex; use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; use futures::{ stream, stream::{FuturesOrdered, FuturesUnordered}, StreamExt, }; use log::*; use tokio::net::TcpListener; use tokio::select; use tokio::sync::watch; use tokio_util::compat::*; use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode}; use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata}; use crate::config::*; use crate::login::*; use crate::mail::incoming::EncryptedMessage; pub struct LmtpServer { bind_addr: SocketAddr, hostname: String, login_provider: Arc<dyn LoginProvider + Send + Sync>, } impl LmtpServer { pub fn new( config: LmtpConfig, login_provider: Arc<dyn LoginProvider + Send + Sync>, ) -> Arc<Self> { Arc::new(Self { bind_addr: config.bind_addr, hostname: config.hostname, login_provider, }) } pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> { let tcp = TcpListener::bind(self.bind_addr).await?; info!("LMTP server listening on {:#}", self.bind_addr); let mut connections = FuturesUnordered::new(); while !*must_exit.borrow() { let wait_conn_finished = async { if connections.is_empty() { futures::future::pending().await } else { connections.next().await } }; let (socket, remote_addr) = select! { a = tcp.accept() => a?, _ = wait_conn_finished => continue, _ = must_exit.changed() => continue, }; info!("LMTP: accepted connection from {}", remote_addr); let conn = tokio::spawn(smtp_server::interact( socket.compat(), smtp_server::IsAlreadyTls::No, (), self.clone(), )); connections.push(conn); } drop(tcp); info!("LMTP server shutting down, draining remaining connections..."); while connections.next().await.is_some() {} Ok(()) } } // ---- pub struct Message { to: Vec<PublicCredentials>, } #[async_trait] impl Config for LmtpServer { type Protocol = smtp_server::protocol::Lmtp; type ConnectionUserMeta = (); type MailUserMeta = Message; fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str { &self.hostname } async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) -> Message { Message { to: vec![] } } async fn tls_accept<IO>( &self, _io: IO, _conn_meta: &mut ConnectionMetadata<()>, ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>> where IO: Send + AsyncRead + AsyncWrite, { Err(io::Error::new( io::ErrorKind::InvalidInput, "TLS not implemented for LMTP server", )) } async fn filter_from( &self, from: Option<Email>, _meta: &mut MailMetadata<Message>, _conn_meta: &mut ConnectionMetadata<()>, ) -> Decision<Option<Email>> { Decision::Accept { reply: reply::okay_from().convert(), res: from, } } async fn filter_to( &self, to: Email, meta: &mut MailMetadata<Message>, _conn_meta: &mut ConnectionMetadata<()>, ) -> Decision<Email> { let to_str = match to.hostname.as_ref() { Some(h) => format!("{}@{}", to.localpart, h), None => to.localpart.to_string(), }; match self.login_provider.public_login(&to_str).await { Ok(creds) => { meta.user.to.push(creds); Decision::Accept { reply: reply::okay_to().convert(), res: to, } } Err(e) => Decision::Reject { reply: Reply { code: ReplyCode::POLICY_REASON, ecode: None, text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], }, }, } } async fn handle_mail<'resp, R>( &'resp self, reader: &mut EscapedDataReader<'_, R>, meta: MailMetadata<Message>, _conn_meta: &'resp mut ConnectionMetadata<()>, ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>> where R: Send + Unpin + AsyncRead, { let err_response_stream = |meta: MailMetadata<Message>, msg: String| { Box::pin( stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject { reply: Reply { code: ReplyCode::POLICY_REASON, ecode: None, text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())], }, }), ) }; let mut text = Vec::new(); if let Err(e) = reader.read_to_end(&mut text).await { return err_response_stream(meta, format!("io error: {}", e)); } reader.complete(); let encrypted_message = match EncryptedMessage::new(text) { Ok(x) => Arc::new(x), Err(e) => return err_response_stream(meta, e.to_string()), }; Box::pin( meta.user .to .into_iter() .map(move |creds| { let encrypted_message = encrypted_message.clone(); async move { match encrypted_message.deliver_to(creds).await { Ok(()) => Decision::Accept { reply: reply::okay_mail().convert(), res: (), }, Err(e) => Decision::Reject { reply: Reply { code: ReplyCode::POLICY_REASON, ecode: None, text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], }, }, } } }) .collect::<FuturesOrdered<_>>(), ) } }