aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-31 17:07:34 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-31 17:07:34 +0200
commit553a15a82a700792986b23cb89e2a8ec070cc27d (patch)
tree8ea039322b39064fa8c66d9647f84b79f64f1921
parent01d82c14ca61e7c4de1e72c5f94456610464c064 (diff)
downloadaerogramme-553a15a82a700792986b23cb89e2a8ec070cc27d.tar.gz
aerogramme-553a15a82a700792986b23cb89e2a8ec070cc27d.zip
Implementn basic LMTP server
-rw-r--r--Cargo.lock5
-rw-r--r--Cargo.toml6
-rw-r--r--src/config.rs1
-rw-r--r--src/lmtp.rs263
-rw-r--r--src/mail_uuid.rs8
-rw-r--r--src/main.rs12
-rw-r--r--src/server.rs67
7 files changed, 349 insertions, 13 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d9d4f69..2e5416c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1106,6 +1106,8 @@ dependencies = [
"async-trait",
"base64",
"clap",
+ "duplexify",
+ "futures",
"hex",
"im",
"itertools",
@@ -1122,9 +1124,11 @@ dependencies = [
"rusoto_s3",
"rusoto_signature",
"serde",
+ "smtp-message",
"smtp-server",
"sodiumoxide",
"tokio",
+ "tokio-util",
"toml",
"zstd",
]
@@ -2059,6 +2063,7 @@ checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c"
dependencies = [
"bytes",
"futures-core",
+ "futures-io",
"futures-sink",
"pin-project-lite 0.2.9",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index 16d0b52..00acdca 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,7 +12,9 @@ argon2 = "0.3"
async-trait = "0.1"
base64 = "0.13"
clap = { version = "3.1.18", features = ["derive", "env"] }
+duplexify = "1.1.0"
hex = "0.4"
+futures = "0.3"
im = "15"
itertools = "0.10"
lazy_static = "1.4"
@@ -28,10 +30,12 @@ rand = "0.8.5"
rmp-serde = "0.15"
rpassword = "6.0"
sodiumoxide = "0.2"
-tokio = "1.17.0"
+tokio = { version = "1.18", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-util = { version = "0.7", features = [ "compat" ] }
toml = "0.5"
zstd = { version = "0.9", default-features = false }
+smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", branch = "improve-k2v-client" }
diff --git a/src/config.rs b/src/config.rs
index 3fd0bd4..9ec0ea1 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -68,6 +68,7 @@ pub struct LoginLdapConfig {
#[derive(Deserialize, Debug, Clone)]
pub struct LmtpConfig {
pub bind_addr: SocketAddr,
+ pub hostname: String,
}
pub fn read_config(config_file: PathBuf) -> Result<Config> {
diff --git a/src/lmtp.rs b/src/lmtp.rs
new file mode 100644
index 0000000..4186d69
--- /dev/null
+++ b/src/lmtp.rs
@@ -0,0 +1,263 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::{pin::Pin, sync::Arc};
+
+use anyhow::{bail, Result};
+use async_trait::async_trait;
+use duplexify::Duplex;
+use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
+use futures::{stream, stream::FuturesUnordered, StreamExt};
+use log::*;
+use rusoto_s3::{PutObjectRequest, S3Client, S3};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::select;
+use tokio::sync::watch;
+use tokio_util::compat::*;
+
+use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode};
+use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol};
+
+use crate::config::*;
+use crate::cryptoblob::*;
+use crate::login::*;
+use crate::mail_uuid::*;
+
+pub struct LmtpServer {
+ bind_addr: SocketAddr,
+ hostname: String,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+}
+
+impl LmtpServer {
+ pub fn new(
+ config: LmtpConfig,
+ login_provider: Arc<dyn LoginProvider + Send + Sync>,
+ ) -> Arc<Self> {
+ Arc::new(Self {
+ bind_addr: config.bind_addr,
+ hostname: config.hostname,
+ login_provider,
+ })
+ }
+
+ pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
+ let tcp = TcpListener::bind(self.bind_addr).await?;
+ let mut connections = FuturesUnordered::new();
+
+ while !*must_exit.borrow() {
+ let wait_conn_finished = async {
+ if connections.is_empty() {
+ futures::future::pending().await
+ } else {
+ connections.next().await
+ }
+ };
+ let (socket, remote_addr) = select! {
+ a = tcp.accept() => a?,
+ _ = wait_conn_finished => continue,
+ _ = must_exit.changed() => continue,
+ };
+
+ let conn = tokio::spawn(smtp_server::interact(
+ socket.compat(),
+ smtp_server::IsAlreadyTls::No,
+ Conn { remote_addr },
+ self.clone(),
+ ));
+
+ connections.push(conn);
+ }
+ drop(tcp);
+
+ info!("LMTP server shutting down, draining remaining connections...");
+ while connections.next().await.is_some() {}
+
+ Ok(())
+ }
+}
+
+// ----
+
+pub struct Conn {
+ remote_addr: SocketAddr,
+}
+
+pub struct Message {
+ to: Vec<PublicCredentials>,
+}
+
+#[async_trait]
+impl Config for LmtpServer {
+ const PROTOCOL: Protocol = Protocol::Lmtp;
+
+ type ConnectionUserMeta = Conn;
+ type MailUserMeta = Message;
+
+ fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str {
+ &self.hostname
+ }
+
+ async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message {
+ Message { to: vec![] }
+ }
+
+ async fn tls_accept<IO>(
+ &self,
+ _io: IO,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
+ where
+ IO: Send + AsyncRead + AsyncWrite,
+ {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "TLS not implemented for LMTP server",
+ ))
+ }
+
+ async fn filter_from(
+ &self,
+ from: Option<Email>,
+ meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<Option<Email>> {
+ Decision::Accept {
+ reply: reply::okay_from().convert(),
+ res: from,
+ }
+ }
+
+ async fn filter_to(
+ &self,
+ to: Email,
+ meta: &mut MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<Email> {
+ let to_str = match to.hostname.as_ref() {
+ Some(h) => format!("{}@{}", to.localpart, h),
+ None => to.localpart.to_string(),
+ };
+ match self.login_provider.public_login(&to_str).await {
+ Ok(creds) => {
+ meta.user.to.push(creds);
+ Decision::Accept {
+ reply: reply::okay_to().convert(),
+ res: to,
+ }
+ }
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+
+ async fn handle_mail<'a, R>(
+ &self,
+ reader: &mut EscapedDataReader<'a, R>,
+ _mail: MailMetadata<Message>,
+ _conn_meta: &mut ConnectionMetadata<Conn>,
+ ) -> Decision<()>
+ where
+ R: Send + Unpin + AsyncRead,
+ {
+ unreachable!();
+ }
+
+ async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>(
+ &'slife0 self,
+ reader: &mut EscapedDataReader<'a, R>,
+ meta: MailMetadata<Message>,
+ conn_meta: &'slife1 mut ConnectionMetadata<Conn>,
+ ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'stream>>
+ where
+ R: Send + Unpin + AsyncRead,
+ 'slife0: 'stream,
+ 'slife1: 'stream,
+ Self: 'stream,
+ {
+ let err_response_stream = |meta: MailMetadata<Message>, msg: String| {
+ Box::pin(
+ stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())],
+ },
+ }),
+ )
+ };
+
+ let mut text = Vec::new();
+ if reader.read_to_end(&mut text).await.is_err() {
+ return err_response_stream(meta, "io error".into());
+ }
+ reader.complete();
+
+ let encrypted_message = match EncryptedMessage::new(text) {
+ Ok(x) => Arc::new(x),
+ Err(e) => return err_response_stream(meta, e.to_string()),
+ };
+
+ Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| {
+ let encrypted_message = encrypted_message.clone();
+ async move {
+ match encrypted_message.deliver_to(creds).await {
+ Ok(()) => Decision::Accept {
+ reply: reply::okay_mail().convert(),
+ res: (),
+ },
+ Err(e) => Decision::Reject {
+ reply: Reply {
+ code: ReplyCode::POLICY_REASON,
+ ecode: None,
+ text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
+ },
+ },
+ }
+ }
+ }))
+ }
+}
+
+// ----
+
+struct EncryptedMessage {
+ key: Key,
+ encrypted_body: Vec<u8>,
+}
+
+impl EncryptedMessage {
+ fn new(body: Vec<u8>) -> Result<Self> {
+ let key = gen_key();
+ let encrypted_body = seal(&body, &key)?;
+ Ok(Self {
+ key,
+ encrypted_body,
+ })
+ }
+
+ async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
+ let s3_client = creds.storage.s3_client()?;
+
+ let encrypted_key =
+ sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
+ let key_header = base64::encode(&encrypted_key);
+
+ let mut por = PutObjectRequest::default();
+ por.bucket = creds.storage.bucket.clone();
+ por.key = format!("incoming/{}", gen_uuid().to_string());
+ por.metadata = Some(
+ [("Message-Key".to_string(), key_header)]
+ .into_iter()
+ .collect::<HashMap<_, _>>(),
+ );
+ por.body = Some(self.encrypted_body.clone().into());
+ s3_client.put_object(por).await?;
+
+ Ok(())
+ }
+}
diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs
index d0d582f..b784e78 100644
--- a/src/mail_uuid.rs
+++ b/src/mail_uuid.rs
@@ -71,6 +71,12 @@ impl Serialize for MailUuid {
where
S: Serializer,
{
- serializer.serialize_str(&hex::encode(self.0))
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl ToString for MailUuid {
+ fn to_string(&self) -> String {
+ hex::encode(self.0)
}
}
diff --git a/src/main.rs b/src/main.rs
index b8231f1..33d3188 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,7 @@
mod bayou;
mod config;
mod cryptoblob;
+mod lmtp;
mod login;
mod mail_uuid;
mod mailbox;
@@ -35,6 +36,11 @@ enum Command {
#[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
config_file: PathBuf,
},
+ /// TEST TEST TEST
+ Test {
+ #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
+ config_file: PathBuf,
+ },
/// Initializes key pairs for a user and adds a key decryption password
FirstLogin {
#[clap(flatten)]
@@ -125,6 +131,12 @@ async fn main() -> Result<()> {
let server = Server::new(config)?;
server.run().await?;
}
+ Command::Test { config_file } => {
+ let config = read_config(config_file)?;
+
+ let server = Server::new(config)?;
+ server.test().await?;
+ }
Command::FirstLogin {
creds,
user_secrets,
diff --git a/src/server.rs b/src/server.rs
index e1ab599..1fd21b4 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,18 +1,23 @@
-use anyhow::{bail, Result};
use std::sync::Arc;
+use anyhow::{bail, Result};
+use futures::{try_join, StreamExt};
+use log::*;
use rusoto_signature::Region;
+use tokio::sync::watch;
use crate::config::*;
+use crate::lmtp::*;
use crate::login::{ldap_provider::*, static_provider::*, *};
use crate::mailbox::Mailbox;
pub struct Server {
- pub login_provider: Box<dyn LoginProvider>,
+ pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
+ pub lmtp_server: Option<Arc<LmtpServer>>,
}
impl Server {
- pub fn new(config: Config) -> Result<Arc<Self>> {
+ pub fn new(config: Config) -> Result<Self> {
let s3_region = Region::Custom {
name: config.aws_region.clone(),
endpoint: config.s3_endpoint,
@@ -21,17 +26,43 @@ impl Server {
name: config.aws_region,
endpoint: config.k2v_endpoint,
};
- let login_provider: Box<dyn LoginProvider> = match (config.login_static, config.login_ldap)
- {
- (Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
- (None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
- (Some(_), Some(_)) => bail!("A single login provider must be set up in config file"),
- (None, None) => bail!("No login provider is set up in config file"),
+ let login_provider: Arc<dyn LoginProvider + Send + Sync> =
+ match (config.login_static, config.login_ldap) {
+ (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
+ (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
+ (Some(_), Some(_)) => {
+ bail!("A single login provider must be set up in config file")
+ }
+ (None, None) => bail!("No login provider is set up in config file"),
+ };
+
+ let lmtp_server = config
+ .lmtp
+ .map(|cfg| LmtpServer::new(cfg, login_provider.clone()));
+
+ Ok(Self {
+ login_provider,
+ lmtp_server,
+ })
+ }
+
+ pub async fn run(&self) -> Result<()> {
+ let (exit_signal, provoke_exit) = watch_ctrl_c();
+ let exit_on_err = move |err: anyhow::Error| {
+ error!("Error: {}", err);
+ let _ = provoke_exit.send(true);
};
- Ok(Arc::new(Self { login_provider }))
+
+ try_join!(async {
+ match self.lmtp_server.as_ref() {
+ None => Ok(()),
+ Some(s) => s.run(exit_signal.clone()).await,
+ }
+ })?;
+ Ok(())
}
- pub async fn run(self: &Arc<Self>) -> Result<()> {
+ pub async fn test(&self) -> Result<()> {
let creds = self.login_provider.login("lx", "plop").await?;
let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?;
@@ -41,3 +72,17 @@ impl Server {
Ok(())
}
}
+
+pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ let send_cancel = Arc::new(send_cancel);
+ let send_cancel_2 = send_cancel.clone();
+ tokio::spawn(async move {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ info!("Received CTRL+C, shutting down.");
+ send_cancel.send(true).unwrap();
+ });
+ (watch_cancel, send_cancel_2)
+}