From 553a15a82a700792986b23cb89e2a8ec070cc27d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 31 May 2022 17:07:34 +0200 Subject: Implementn basic LMTP server --- Cargo.lock | 5 ++ Cargo.toml | 6 +- src/config.rs | 1 + src/lmtp.rs | 263 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mail_uuid.rs | 8 +- src/main.rs | 12 +++ src/server.rs | 67 +++++++++++--- 7 files changed, 349 insertions(+), 13 deletions(-) create mode 100644 src/lmtp.rs diff --git a/Cargo.lock b/Cargo.lock index d9d4f69..2e5416c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,6 +1106,8 @@ dependencies = [ "async-trait", "base64", "clap", + "duplexify", + "futures", "hex", "im", "itertools", @@ -1122,9 +1124,11 @@ dependencies = [ "rusoto_s3", "rusoto_signature", "serde", + "smtp-message", "smtp-server", "sodiumoxide", "tokio", + "tokio-util", "toml", "zstd", ] @@ -2059,6 +2063,7 @@ checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite 0.2.9", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 16d0b52..00acdca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,9 @@ argon2 = "0.3" async-trait = "0.1" base64 = "0.13" clap = { version = "3.1.18", features = ["derive", "env"] } +duplexify = "1.1.0" hex = "0.4" +futures = "0.3" im = "15" itertools = "0.10" lazy_static = "1.4" @@ -28,10 +30,12 @@ rand = "0.8.5" rmp-serde = "0.15" rpassword = "6.0" sodiumoxide = "0.2" -tokio = "1.17.0" +tokio = { version = "1.18", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-util = { version = "0.7", features = [ "compat" ] } toml = "0.5" zstd = { version = "0.9", default-features = false } +smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", branch = "improve-k2v-client" } diff --git a/src/config.rs b/src/config.rs index 3fd0bd4..9ec0ea1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -68,6 +68,7 @@ pub struct LoginLdapConfig { #[derive(Deserialize, Debug, Clone)] pub struct LmtpConfig { pub bind_addr: SocketAddr, + pub hostname: String, } pub fn read_config(config_file: PathBuf) -> Result { diff --git a/src/lmtp.rs b/src/lmtp.rs new file mode 100644 index 0000000..4186d69 --- /dev/null +++ b/src/lmtp.rs @@ -0,0 +1,263 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::{pin::Pin, sync::Arc}; + +use anyhow::{bail, Result}; +use async_trait::async_trait; +use duplexify::Duplex; +use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; +use futures::{stream, stream::FuturesUnordered, StreamExt}; +use log::*; +use rusoto_s3::{PutObjectRequest, S3Client, S3}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::watch; +use tokio_util::compat::*; + +use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode}; +use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol}; + +use crate::config::*; +use crate::cryptoblob::*; +use crate::login::*; +use crate::mail_uuid::*; + +pub struct LmtpServer { + bind_addr: SocketAddr, + hostname: String, + login_provider: Arc, +} + +impl LmtpServer { + pub fn new( + config: LmtpConfig, + login_provider: Arc, + ) -> Arc { + Arc::new(Self { + bind_addr: config.bind_addr, + hostname: config.hostname, + login_provider, + }) + } + + pub async fn run(self: &Arc, mut must_exit: watch::Receiver) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + + let conn = tokio::spawn(smtp_server::interact( + socket.compat(), + smtp_server::IsAlreadyTls::No, + Conn { remote_addr }, + self.clone(), + )); + + connections.push(conn); + } + drop(tcp); + + info!("LMTP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} + + Ok(()) + } +} + +// ---- + +pub struct Conn { + remote_addr: SocketAddr, +} + +pub struct Message { + to: Vec, +} + +#[async_trait] +impl Config for LmtpServer { + const PROTOCOL: Protocol = Protocol::Lmtp; + + type ConnectionUserMeta = Conn; + type MailUserMeta = Message; + + fn hostname(&self, _conn_meta: &ConnectionMetadata) -> &str { + &self.hostname + } + + async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata) -> Message { + Message { to: vec![] } + } + + async fn tls_accept( + &self, + _io: IO, + _conn_meta: &mut ConnectionMetadata, + ) -> io::Result>, Pin>>> + where + IO: Send + AsyncRead + AsyncWrite, + { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "TLS not implemented for LMTP server", + )) + } + + async fn filter_from( + &self, + from: Option, + meta: &mut MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision> { + Decision::Accept { + reply: reply::okay_from().convert(), + res: from, + } + } + + async fn filter_to( + &self, + to: Email, + meta: &mut MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision { + let to_str = match to.hostname.as_ref() { + Some(h) => format!("{}@{}", to.localpart, h), + None => to.localpart.to_string(), + }; + match self.login_provider.public_login(&to_str).await { + Ok(creds) => { + meta.user.to.push(creds); + Decision::Accept { + reply: reply::okay_to().convert(), + res: to, + } + } + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + + async fn handle_mail<'a, R>( + &self, + reader: &mut EscapedDataReader<'a, R>, + _mail: MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision<()> + where + R: Send + Unpin + AsyncRead, + { + unreachable!(); + } + + async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>( + &'slife0 self, + reader: &mut EscapedDataReader<'a, R>, + meta: MailMetadata, + conn_meta: &'slife1 mut ConnectionMetadata, + ) -> Pin> + Send + 'stream>> + where + R: Send + Unpin + AsyncRead, + 'slife0: 'stream, + 'slife1: 'stream, + Self: 'stream, + { + let err_response_stream = |meta: MailMetadata, msg: String| { + Box::pin( + stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())], + }, + }), + ) + }; + + let mut text = Vec::new(); + if reader.read_to_end(&mut text).await.is_err() { + return err_response_stream(meta, "io error".into()); + } + reader.complete(); + + let encrypted_message = match EncryptedMessage::new(text) { + Ok(x) => Arc::new(x), + Err(e) => return err_response_stream(meta, e.to_string()), + }; + + Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| { + let encrypted_message = encrypted_message.clone(); + async move { + match encrypted_message.deliver_to(creds).await { + Ok(()) => Decision::Accept { + reply: reply::okay_mail().convert(), + res: (), + }, + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + })) + } +} + +// ---- + +struct EncryptedMessage { + key: Key, + encrypted_body: Vec, +} + +impl EncryptedMessage { + fn new(body: Vec) -> Result { + let key = gen_key(); + let encrypted_body = seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { + let s3_client = creds.storage.s3_client()?; + + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::encode(&encrypted_key); + + let mut por = PutObjectRequest::default(); + por.bucket = creds.storage.bucket.clone(); + por.key = format!("incoming/{}", gen_uuid().to_string()); + por.metadata = Some( + [("Message-Key".to_string(), key_header)] + .into_iter() + .collect::>(), + ); + por.body = Some(self.encrypted_body.clone().into()); + s3_client.put_object(por).await?; + + Ok(()) + } +} diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs index d0d582f..b784e78 100644 --- a/src/mail_uuid.rs +++ b/src/mail_uuid.rs @@ -71,6 +71,12 @@ impl Serialize for MailUuid { where S: Serializer, { - serializer.serialize_str(&hex::encode(self.0)) + serializer.serialize_str(&self.to_string()) + } +} + +impl ToString for MailUuid { + fn to_string(&self) -> String { + hex::encode(self.0) } } diff --git a/src/main.rs b/src/main.rs index b8231f1..33d3188 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod bayou; mod config; mod cryptoblob; +mod lmtp; mod login; mod mail_uuid; mod mailbox; @@ -35,6 +36,11 @@ enum Command { #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] config_file: PathBuf, }, + /// TEST TEST TEST + Test { + #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] + config_file: PathBuf, + }, /// Initializes key pairs for a user and adds a key decryption password FirstLogin { #[clap(flatten)] @@ -125,6 +131,12 @@ async fn main() -> Result<()> { let server = Server::new(config)?; server.run().await?; } + Command::Test { config_file } => { + let config = read_config(config_file)?; + + let server = Server::new(config)?; + server.test().await?; + } Command::FirstLogin { creds, user_secrets, diff --git a/src/server.rs b/src/server.rs index e1ab599..1fd21b4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,23 @@ -use anyhow::{bail, Result}; use std::sync::Arc; +use anyhow::{bail, Result}; +use futures::{try_join, StreamExt}; +use log::*; use rusoto_signature::Region; +use tokio::sync::watch; use crate::config::*; +use crate::lmtp::*; use crate::login::{ldap_provider::*, static_provider::*, *}; use crate::mailbox::Mailbox; pub struct Server { - pub login_provider: Box, + pub login_provider: Arc, + pub lmtp_server: Option>, } impl Server { - pub fn new(config: Config) -> Result> { + pub fn new(config: Config) -> Result { let s3_region = Region::Custom { name: config.aws_region.clone(), endpoint: config.s3_endpoint, @@ -21,17 +26,43 @@ impl Server { name: config.aws_region, endpoint: config.k2v_endpoint, }; - let login_provider: Box = match (config.login_static, config.login_ldap) - { - (Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), - (None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), - (Some(_), Some(_)) => bail!("A single login provider must be set up in config file"), - (None, None) => bail!("No login provider is set up in config file"), + let login_provider: Arc = + match (config.login_static, config.login_ldap) { + (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), + (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), + (Some(_), Some(_)) => { + bail!("A single login provider must be set up in config file") + } + (None, None) => bail!("No login provider is set up in config file"), + }; + + let lmtp_server = config + .lmtp + .map(|cfg| LmtpServer::new(cfg, login_provider.clone())); + + Ok(Self { + login_provider, + lmtp_server, + }) + } + + pub async fn run(&self) -> Result<()> { + let (exit_signal, provoke_exit) = watch_ctrl_c(); + let exit_on_err = move |err: anyhow::Error| { + error!("Error: {}", err); + let _ = provoke_exit.send(true); }; - Ok(Arc::new(Self { login_provider })) + + try_join!(async { + match self.lmtp_server.as_ref() { + None => Ok(()), + Some(s) => s.run(exit_signal.clone()).await, + } + })?; + Ok(()) } - pub async fn run(self: &Arc) -> Result<()> { + pub async fn test(&self) -> Result<()> { let creds = self.login_provider.login("lx", "plop").await?; let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?; @@ -41,3 +72,17 @@ impl Server { Ok(()) } } + +pub fn watch_ctrl_c() -> (watch::Receiver, Arc>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} -- cgit v1.2.3