aboutsummaryrefslogtreecommitdiff
path: root/aero-proto/src/lmtp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'aero-proto/src/lmtp.rs')
-rw-r--r--aero-proto/src/lmtp.rs219
1 files changed, 219 insertions, 0 deletions
diff --git a/aero-proto/src/lmtp.rs b/aero-proto/src/lmtp.rs
new file mode 100644
index 0000000..a82a783
--- /dev/null
+++ b/aero-proto/src/lmtp.rs
@@ -0,0 +1,219 @@
+use std::net::SocketAddr;
+use std::{pin::Pin, sync::Arc};
+
+use anyhow::Result;
+use async_trait::async_trait;
+use duplexify::Duplex;
+use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
+use futures::{
+ stream,
+ stream::{FuturesOrdered, FuturesUnordered},
+ StreamExt,
+};
+use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode};
+use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata};
+use tokio::net::TcpListener;
+use tokio::select;
+use tokio::sync::watch;
+use tokio_util::compat::*;
+
+use aero_collections::mail::incoming::EncryptedMessage;
+use aero_user::config::*;
+use aero_user::login::*;
+
+pub struct LmtpServer {
+ bind_addr: SocketAddr,
+ hostname: String,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+}
+
+impl LmtpServer {
+ pub fn new(
+ config: LmtpConfig,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+ ) -> Arc<Self> {
+ Arc::new(Self {
+ bind_addr: config.bind_addr,
+ hostname: config.hostname,
+ login_provider,
+ })
+ }
+
+ pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
+ let tcp = TcpListener::bind(self.bind_addr).await?;
+ tracing::info!("LMTP server listening on {:#}", self.bind_addr);
+
+ let mut connections = FuturesUnordered::new();
+
+ while !*must_exit.borrow() {
+ let wait_conn_finished = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ let (socket, remote_addr) = select! {
+ a = tcp.accept() => a?,
+ _ = wait_conn_finished => continue,
+ _ = must_exit.changed() => continue,
+ };
+ tracing::info!("LMTP: accepted connection from {}", remote_addr);
+
+ let conn = tokio::spawn(smtp_server::interact(
+ socket.compat(),
+ smtp_server::IsAlreadyTls::No,
+ (),
+ self.clone(),
+ ));
+
+ connections.push(conn);
+ }
+ drop(tcp);
+
+ tracing::info!("LMTP server shutting down, draining remaining connections...");
+ while connections.next().await.is_some() {}
+
+ Ok(())
+ }
+}
+
+// ----
+
+pub struct Message {
+ to: Vec<PublicCredentials>,
+}
+
+#[async_trait]
+impl Config for LmtpServer {
+ type Protocol = smtp_server::protocol::Lmtp;
+
+ type ConnectionUserMeta = ();
+ type MailUserMeta = Message;
+
+ fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str {
+ &self.hostname
+ }
+
+ async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) -> Message {
+ Message { to: vec![] }
+ }
+
+ async fn tls_accept<IO>(
+ &self,
+ _io: IO,
+ _conn_meta: &mut ConnectionMetadata<()>,
+ ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
+ where
+ IO: Send + AsyncRead + AsyncWrite,
+ {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "TLS not implemented for LMTP server",
+ ))
+ }
+
+ async fn filter_from(
+ &self,
+ from: Option<Email>,
+ _meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<()>,
+ ) -> Decision<Option<Email>> {
+ Decision::Accept {
+ reply: reply::okay_from().convert(),
+ res: from,
+ }
+ }
+
+ async fn filter_to(
+ &self,
+ to: Email,
+ meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<()>,
+ ) -> Decision<Email> {
+ let to_str = match to.hostname.as_ref() {
+ Some(h) => format!("{}@{}", to.localpart, h),
+ None => to.localpart.to_string(),
+ };
+ match self.login_provider.public_login(&to_str).await {
+ Ok(creds) => {
+ meta.user.to.push(creds);
+ Decision::Accept {
+ reply: reply::okay_to().convert(),
+ res: to,
+ }
+ }
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+
+ async fn handle_mail<'resp, R>(
+ &'resp self,
+ reader: &mut EscapedDataReader<'_, R>,
+ meta: MailMetadata<Message>,
+ _conn_meta: &'resp mut ConnectionMetadata<()>,
+ ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>>
+ where
+ R: Send + Unpin + AsyncRead,
+ {
+ let err_response_stream = |meta: MailMetadata<Message>, msg: String| {
+ Box::pin(
+ stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())],
+ },
+ }),
+ )
+ };
+
+ let mut text = Vec::new();
+ if let Err(e) = reader.read_to_end(&mut text).await {
+ return err_response_stream(meta, format!("io error: {}", e));
+ }
+ reader.complete();
+ let raw_size = text.len();
+
+ // Unescape email, shrink it also to remove last dot
+ let unesc_res = DataUnescaper::new(true).unescape(&mut text);
+ text.truncate(unesc_res.written);
+ tracing::debug!(prev_sz = raw_size, new_sz = text.len(), "unescaped");
+
+ let encrypted_message = match EncryptedMessage::new(text) {
+ Ok(x) => Arc::new(x),
+ Err(e) => return err_response_stream(meta, e.to_string()),
+ };
+
+ Box::pin(
+ meta.user
+ .to
+ .into_iter()
+ .map(move |creds| {
+ let encrypted_message = encrypted_message.clone();
+ async move {
+ match encrypted_message.deliver_to(creds).await {
+ Ok(()) => Decision::Accept {
+ reply: reply::okay_mail().convert(),
+ res: (),
+ },
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>(),
+ )
+ }
+}