From d53cf1d220ef08c0b9368cfe91bee7660b7f5a3b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 31 May 2022 15:30:32 +0200 Subject: Implement public_login --- src/config.rs | 2 + src/login/ldap_provider.rs | 120 +++++++++++++++++++++++++++----------- src/login/mod.rs | 12 +++- src/login/static_provider.rs | 134 ++++++++++++++++++++++++++++++------------- 4 files changed, 194 insertions(+), 74 deletions(-) (limited to 'src') diff --git a/src/config.rs b/src/config.rs index b77288b..a1de5ba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,8 @@ pub struct LoginStaticConfig { #[derive(Deserialize, Debug, Clone)] pub struct LoginStaticUser { + #[serde(default)] + pub email_addresses: Vec, pub password: String, pub aws_access_key_id: String, diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index c9d23a0..9310e55 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -84,11 +84,30 @@ impl LdapLoginProvider { bucket_source, }) } + + fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result { + let aws_access_key_id = get_attr(user, &self.aws_access_key_id_attr)?; + let aws_secret_access_key = get_attr(user, &self.aws_secret_access_key_attr)?; + let bucket = match &self.bucket_source { + BucketSource::Constant(b) => b.clone(), + BucketSource::Attr(a) => get_attr(user, a)?, + }; + + Ok(StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id, + aws_secret_access_key, + bucket, + }) + } } #[async_trait] impl LoginProvider for LdapLoginProvider { async fn login(&self, username: &str, password: &str) -> Result { + check_identifier(username)?; + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; ldap3::drive!(conn); @@ -97,13 +116,6 @@ impl LoginProvider for LdapLoginProvider { ldap.simple_bind(dn, pw).await?.success()?; } - let username_is_ok = username - .chars() - .all(|c| c.is_alphanumeric() || "-+_.@".contains(c)); - if !username_is_ok { - bail!("Invalid username, must contain only a-z A-Z 0-9 - + _ . @"); - } - let (matches, _res) = ldap .search( &self.search_base, @@ -137,32 +149,9 @@ impl LoginProvider for LdapLoginProvider { .context("Invalid password")?; debug!("Ldap login with user name {} successfull", username); - let get_attr = |attr: &str| -> Result { - Ok(user - .attrs - .get(attr) - .ok_or(anyhow!("Missing attr: {}", attr))? - .iter() - .next() - .ok_or(anyhow!("No value for attr: {}", attr))? - .clone()) - }; - let aws_access_key_id = get_attr(&self.aws_access_key_id_attr)?; - let aws_secret_access_key = get_attr(&self.aws_secret_access_key_attr)?; - let bucket = match &self.bucket_source { - BucketSource::Constant(b) => b.clone(), - BucketSource::Attr(a) => get_attr(a)?, - }; + let storage = self.storage_creds_from_ldap_user(&user)?; - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id, - aws_secret_access_key, - bucket, - }; - - let user_secret = get_attr(&self.user_secret_attr)?; + let user_secret = get_attr(&user, &self.user_secret_attr)?; let alternate_user_secrets = match &self.alternate_user_secrets_attr { None => vec![], Some(a) => user.attrs.get(a).cloned().unwrap_or_default(), @@ -178,4 +167,71 @@ impl LoginProvider for LdapLoginProvider { Ok(Credentials { storage, keys }) } + + async fn public_login(&self, email: &str) -> Result { + check_identifier(email)?; + + let (dn, pw) = match self.bind_dn_and_pw.as_ref() { + Some(x) => x, + None => bail!("Missing bind_dn and bind_password in LDAP login provider config"), + }; + + let (conn, mut ldap) = LdapConnAsync::new(&self.ldap_server).await?; + ldap3::drive!(conn); + ldap.simple_bind(dn, pw).await?.success()?; + + let (matches, _res) = ldap + .search( + &self.search_base, + Scope::Subtree, + &format!( + "(&(objectClass=inetOrgPerson)({}={}))", + self.mail_attr, email + ), + &self.attrs_to_retrieve, + ) + .await? + .success()?; + + if matches.is_empty() { + bail!("No such user account"); + } + if matches.len() > 1 { + bail!("Multiple matching user accounts"); + } + let user = SearchEntry::construct(matches.into_iter().next().unwrap()); + debug!("Found matching LDAP user for email {}: {}", email, user.dn); + + let storage = self.storage_creds_from_ldap_user(&user)?; + drop(ldap); + + let k2v_client = storage.k2v_client()?; + let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + + Ok(PublicCredentials { + storage, + public_key, + }) + } +} + +fn get_attr(user: &SearchEntry, attr: &str) -> Result { + Ok(user + .attrs + .get(attr) + .ok_or(anyhow!("Missing attr: {}", attr))? + .iter() + .next() + .ok_or(anyhow!("No value for attr: {}", attr))? + .clone()) +} + +fn check_identifier(id: &str) -> Result<()> { + let is_ok = id + .chars() + .all(|c| c.is_alphanumeric() || "-+_.@".contains(c)); + if !is_ok { + bail!("Invalid username/email address, must contain only a-z A-Z 0-9 - + _ . @"); + } + Ok(()) } diff --git a/src/login/mod.rs b/src/login/mod.rs index 2640a58..c0e9032 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -24,6 +24,9 @@ pub trait LoginProvider { /// The login method takes an account's password as an input to decypher /// decryption keys and obtain full access to the user's account. async fn login(&self, username: &str, password: &str) -> Result; + /// The public_login method takes an account's email address and returns + /// public credentials for adding mails to the user's inbox. + async fn public_login(&self, email: &str) -> Result; } /// The struct Credentials represent all of the necessary information to interact @@ -36,6 +39,13 @@ pub struct Credentials { pub keys: CryptoKeys, } +#[derive(Clone, Debug)] +pub struct PublicCredentials { + /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) + pub storage: StorageCredentials, + pub public_key: PublicKey, +} + /// The struct StorageCredentials contains access key to an S3 and K2V bucket #[derive(Clone, Debug)] pub struct StorageCredentials { @@ -396,7 +406,7 @@ impl CryptoKeys { Ok((salt_ct, public_ct)) } - async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> { + pub async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> { let mut params = k2v .read_batch(&[ k2v_read_single_key("keys", "salt", false), diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index cc6ffb6..aa5e499 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; @@ -10,16 +11,34 @@ use crate::login::*; pub struct StaticLoginProvider { default_bucket: Option, - users: HashMap, + users: HashMap>, + users_by_email: HashMap>, + k2v_region: Region, s3_region: Region, } impl StaticLoginProvider { pub fn new(config: LoginStaticConfig, k2v_region: Region, s3_region: Region) -> Result { + let users = config + .users + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect::>(); + let mut users_by_email = HashMap::new(); + for (_, u) in users.iter() { + for m in u.email_addresses.iter() { + if users_by_email.contains_key(m) { + bail!("Several users have same email address: {}", m); + } + users_by_email.insert(m.clone(), u.clone()); + } + } + Ok(Self { default_bucket: config.default_bucket, - users: config.users, + users, + users_by_email, k2v_region, s3_region, }) @@ -29,49 +48,82 @@ impl StaticLoginProvider { #[async_trait] impl LoginProvider for StaticLoginProvider { async fn login(&self, username: &str, password: &str) -> Result { - match self.users.get(username) { + let user = match self.users.get(username) { None => bail!("User {} does not exist", username), - Some(u) => { - if !verify_password(password, &u.password)? { - bail!("Wrong password"); - } - let bucket = u - .bucket - .clone() - .or_else(|| self.default_bucket.clone()) - .ok_or(anyhow!( - "No bucket configured and no default bucket specieid" - ))?; - - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id: u.aws_access_key_id.clone(), - aws_secret_access_key: u.aws_secret_access_key.clone(), - bucket, - }; + Some(u) => u, + }; - let keys = match (&u.master_key, &u.secret_key) { - (Some(m), Some(s)) => { - let master_key = Key::from_slice(&base64::decode(m)?) - .ok_or(anyhow!("Invalid master key"))?; - let secret_key = SecretKey::from_slice(&base64::decode(s)?) - .ok_or(anyhow!("Invalid secret key"))?; - CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await? - } - (None, None) => { - let user_secrets = UserSecrets { - user_secret: u.user_secret.clone(), - alternate_user_secrets: u.alternate_user_secrets.clone(), - }; - CryptoKeys::open(&storage, &user_secrets, password).await? - } - _ => bail!("Either both master and secret key or none of them must be specified for user"), - }; + if !verify_password(password, &user.password)? { + bail!("Wrong password"); + } + let bucket = user + .bucket + .clone() + .or_else(|| self.default_bucket.clone()) + .ok_or(anyhow!( + "No bucket configured and no default bucket specieid" + ))?; - Ok(Credentials { storage, keys }) + let storage = StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id: user.aws_access_key_id.clone(), + aws_secret_access_key: user.aws_secret_access_key.clone(), + bucket, + }; + + let keys = match (&user.master_key, &user.secret_key) { + (Some(m), Some(s)) => { + let master_key = + Key::from_slice(&base64::decode(m)?).ok_or(anyhow!("Invalid master key"))?; + let secret_key = SecretKey::from_slice(&base64::decode(s)?) + .ok_or(anyhow!("Invalid secret key"))?; + CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await? } - } + (None, None) => { + let user_secrets = UserSecrets { + user_secret: user.user_secret.clone(), + alternate_user_secrets: user.alternate_user_secrets.clone(), + }; + CryptoKeys::open(&storage, &user_secrets, password).await? + } + _ => bail!( + "Either both master and secret key or none of them must be specified for user" + ), + }; + + Ok(Credentials { storage, keys }) + } + + async fn public_login(&self, email: &str) -> Result { + let user = match self.users_by_email.get(email) { + None => bail!("No user for email address {}", email), + Some(u) => u, + }; + + let bucket = user + .bucket + .clone() + .or_else(|| self.default_bucket.clone()) + .ok_or(anyhow!( + "No bucket configured and no default bucket specieid" + ))?; + + let storage = StorageCredentials { + k2v_region: self.k2v_region.clone(), + s3_region: self.s3_region.clone(), + aws_access_key_id: user.aws_access_key_id.clone(), + aws_secret_access_key: user.aws_secret_access_key.clone(), + bucket, + }; + + let k2v_client = storage.k2v_client()?; + let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + + Ok(PublicCredentials { + storage, + public_key, + }) } } -- cgit v1.2.3 From 01d82c14ca61e7c4de1e72c5f94456610464c064 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 31 May 2022 15:49:10 +0200 Subject: UUID generator; import kannader smtp server --- src/config.rs | 8 ++++++ src/mail_uuid.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mailbox.rs | 6 ++--- src/main.rs | 1 + src/uidindex.rs | 36 ++------------------------- 5 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 src/mail_uuid.rs (limited to 'src') diff --git a/src/config.rs b/src/config.rs index a1de5ba..3fd0bd4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::io::Read; +use std::net::SocketAddr; use std::path::PathBuf; use anyhow::Result; @@ -13,6 +14,8 @@ pub struct Config { pub login_static: Option, pub login_ldap: Option, + + pub lmtp: Option, } #[derive(Deserialize, Debug, Clone)] @@ -62,6 +65,11 @@ pub struct LoginLdapConfig { pub bucket_attr: Option, } +#[derive(Deserialize, Debug, Clone)] +pub struct LmtpConfig { + pub bind_addr: SocketAddr, +} + pub fn read_config(config_file: PathBuf) -> Result { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs new file mode 100644 index 0000000..d0d582f --- /dev/null +++ b/src/mail_uuid.rs @@ -0,0 +1,76 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use lazy_static::lazy_static; +use rand::prelude::*; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::time::now_msec; + +/// A Mail UUID is composed of two components: +/// - a process identifier, 128 bits, itself composed of: +/// - the timestamp of when the process started, 64 bits +/// - a 64-bit random number +/// - a sequence number, 64 bits +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] +pub struct MailUuid(pub [u8; 24]); + +struct UuidGenerator { + pid: u128, + sn: AtomicU64, +} + +impl UuidGenerator { + fn new() -> Self { + let time = now_msec() as u128; + let rand = thread_rng().gen::() as u128; + Self { + pid: (time << 64) | rand, + sn: AtomicU64::new(0), + } + } + + fn gen(&self) -> MailUuid { + let sn = self.sn.fetch_add(1, Ordering::Relaxed); + let mut res = [0u8; 24]; + res[0..16].copy_from_slice(&u128::to_be_bytes(self.pid)); + res[16..24].copy_from_slice(&u64::to_be_bytes(sn)); + MailUuid(res) + } +} + +lazy_static! { + static ref GENERATOR: UuidGenerator = UuidGenerator::new(); +} + +pub fn gen_uuid() -> MailUuid { + GENERATOR.gen() +} + +// -- serde -- + +impl<'de> Deserialize<'de> for MailUuid { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?; + + if bytes.len() != 24 { + return Err(D::Error::custom("bad length")); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(Self(tmp)) + } +} + +impl Serialize for MailUuid { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&hex::encode(self.0)) + } +} diff --git a/src/mailbox.rs b/src/mailbox.rs index a20ca15..49d8e56 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -1,11 +1,11 @@ use anyhow::Result; use k2v_client::K2vClient; -use rand::prelude::*; use rusoto_s3::S3Client; use crate::bayou::Bayou; use crate::cryptoblob::Key; use crate::login::Credentials; +use crate::mail_uuid::*; use crate::uidindex::*; pub struct Mailbox { @@ -38,12 +38,10 @@ impl Mailbox { dump(&self.uid_index); - let mut rand_id = [0u8; 24]; - rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen())); let add_mail_op = self .uid_index .state() - .op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]); + .op_mail_add(gen_uuid(), vec!["\\Unseen".into()]); self.uid_index.push(add_mail_op).await?; dump(&self.uid_index); diff --git a/src/main.rs b/src/main.rs index ada94fc..b8231f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod bayou; mod config; mod cryptoblob; mod login; +mod mail_uuid; mod mailbox; mod server; mod time; diff --git a/src/uidindex.rs b/src/uidindex.rs index 1e30190..ecd52ff 100644 --- a/src/uidindex.rs +++ b/src/uidindex.rs @@ -1,17 +1,12 @@ use im::OrdMap; -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::bayou::*; +use crate::mail_uuid::MailUuid; type ImapUid = u32; type ImapUidvalidity = u32; -/// A Mail UUID is composed of two components: -/// - a process identifier, 128 bits -/// - a sequence number, 64 bits -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] -pub struct MailUuid(pub [u8; 24]); - #[derive(Clone)] pub struct UidIndex { pub mail_uid: OrdMap, @@ -176,30 +171,3 @@ impl Serialize for UidIndex { val.serialize(serializer) } } - -impl<'de> Deserialize<'de> for MailUuid { - fn deserialize(d: D) -> Result - where - D: Deserializer<'de>, - { - let v = String::deserialize(d)?; - let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?; - - if bytes.len() != 24 { - return Err(D::Error::custom("bad length")); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(Self(tmp)) - } -} - -impl Serialize for MailUuid { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&hex::encode(self.0)) - } -} -- cgit v1.2.3 From 553a15a82a700792986b23cb89e2a8ec070cc27d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 31 May 2022 17:07:34 +0200 Subject: Implementn basic LMTP server --- src/config.rs | 1 + src/lmtp.rs | 263 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mail_uuid.rs | 8 +- src/main.rs | 12 +++ src/server.rs | 67 +++++++++++--- 5 files changed, 339 insertions(+), 12 deletions(-) create mode 100644 src/lmtp.rs (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 3fd0bd4..9ec0ea1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -68,6 +68,7 @@ pub struct LoginLdapConfig { #[derive(Deserialize, Debug, Clone)] pub struct LmtpConfig { pub bind_addr: SocketAddr, + pub hostname: String, } pub fn read_config(config_file: PathBuf) -> Result { diff --git a/src/lmtp.rs b/src/lmtp.rs new file mode 100644 index 0000000..4186d69 --- /dev/null +++ b/src/lmtp.rs @@ -0,0 +1,263 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::{pin::Pin, sync::Arc}; + +use anyhow::{bail, Result}; +use async_trait::async_trait; +use duplexify::Duplex; +use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; +use futures::{stream, stream::FuturesUnordered, StreamExt}; +use log::*; +use rusoto_s3::{PutObjectRequest, S3Client, S3}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::watch; +use tokio_util::compat::*; + +use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode}; +use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol}; + +use crate::config::*; +use crate::cryptoblob::*; +use crate::login::*; +use crate::mail_uuid::*; + +pub struct LmtpServer { + bind_addr: SocketAddr, + hostname: String, + login_provider: Arc, +} + +impl LmtpServer { + pub fn new( + config: LmtpConfig, + login_provider: Arc, + ) -> Arc { + Arc::new(Self { + bind_addr: config.bind_addr, + hostname: config.hostname, + login_provider, + }) + } + + pub async fn run(self: &Arc, mut must_exit: watch::Receiver) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + let mut connections = FuturesUnordered::new(); + + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + + let conn = tokio::spawn(smtp_server::interact( + socket.compat(), + smtp_server::IsAlreadyTls::No, + Conn { remote_addr }, + self.clone(), + )); + + connections.push(conn); + } + drop(tcp); + + info!("LMTP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} + + Ok(()) + } +} + +// ---- + +pub struct Conn { + remote_addr: SocketAddr, +} + +pub struct Message { + to: Vec, +} + +#[async_trait] +impl Config for LmtpServer { + const PROTOCOL: Protocol = Protocol::Lmtp; + + type ConnectionUserMeta = Conn; + type MailUserMeta = Message; + + fn hostname(&self, _conn_meta: &ConnectionMetadata) -> &str { + &self.hostname + } + + async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata) -> Message { + Message { to: vec![] } + } + + async fn tls_accept( + &self, + _io: IO, + _conn_meta: &mut ConnectionMetadata, + ) -> io::Result>, Pin>>> + where + IO: Send + AsyncRead + AsyncWrite, + { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "TLS not implemented for LMTP server", + )) + } + + async fn filter_from( + &self, + from: Option, + meta: &mut MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision> { + Decision::Accept { + reply: reply::okay_from().convert(), + res: from, + } + } + + async fn filter_to( + &self, + to: Email, + meta: &mut MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision { + let to_str = match to.hostname.as_ref() { + Some(h) => format!("{}@{}", to.localpart, h), + None => to.localpart.to_string(), + }; + match self.login_provider.public_login(&to_str).await { + Ok(creds) => { + meta.user.to.push(creds); + Decision::Accept { + reply: reply::okay_to().convert(), + res: to, + } + } + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + + async fn handle_mail<'a, R>( + &self, + reader: &mut EscapedDataReader<'a, R>, + _mail: MailMetadata, + _conn_meta: &mut ConnectionMetadata, + ) -> Decision<()> + where + R: Send + Unpin + AsyncRead, + { + unreachable!(); + } + + async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>( + &'slife0 self, + reader: &mut EscapedDataReader<'a, R>, + meta: MailMetadata, + conn_meta: &'slife1 mut ConnectionMetadata, + ) -> Pin> + Send + 'stream>> + where + R: Send + Unpin + AsyncRead, + 'slife0: 'stream, + 'slife1: 'stream, + Self: 'stream, + { + let err_response_stream = |meta: MailMetadata, msg: String| { + Box::pin( + stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())], + }, + }), + ) + }; + + let mut text = Vec::new(); + if reader.read_to_end(&mut text).await.is_err() { + return err_response_stream(meta, "io error".into()); + } + reader.complete(); + + let encrypted_message = match EncryptedMessage::new(text) { + Ok(x) => Arc::new(x), + Err(e) => return err_response_stream(meta, e.to_string()), + }; + + Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| { + let encrypted_message = encrypted_message.clone(); + async move { + match encrypted_message.deliver_to(creds).await { + Ok(()) => Decision::Accept { + reply: reply::okay_mail().convert(), + res: (), + }, + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + })) + } +} + +// ---- + +struct EncryptedMessage { + key: Key, + encrypted_body: Vec, +} + +impl EncryptedMessage { + fn new(body: Vec) -> Result { + let key = gen_key(); + let encrypted_body = seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { + let s3_client = creds.storage.s3_client()?; + + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::encode(&encrypted_key); + + let mut por = PutObjectRequest::default(); + por.bucket = creds.storage.bucket.clone(); + por.key = format!("incoming/{}", gen_uuid().to_string()); + por.metadata = Some( + [("Message-Key".to_string(), key_header)] + .into_iter() + .collect::>(), + ); + por.body = Some(self.encrypted_body.clone().into()); + s3_client.put_object(por).await?; + + Ok(()) + } +} diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs index d0d582f..b784e78 100644 --- a/src/mail_uuid.rs +++ b/src/mail_uuid.rs @@ -71,6 +71,12 @@ impl Serialize for MailUuid { where S: Serializer, { - serializer.serialize_str(&hex::encode(self.0)) + serializer.serialize_str(&self.to_string()) + } +} + +impl ToString for MailUuid { + fn to_string(&self) -> String { + hex::encode(self.0) } } diff --git a/src/main.rs b/src/main.rs index b8231f1..33d3188 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod bayou; mod config; mod cryptoblob; +mod lmtp; mod login; mod mail_uuid; mod mailbox; @@ -35,6 +36,11 @@ enum Command { #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] config_file: PathBuf, }, + /// TEST TEST TEST + Test { + #[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")] + config_file: PathBuf, + }, /// Initializes key pairs for a user and adds a key decryption password FirstLogin { #[clap(flatten)] @@ -125,6 +131,12 @@ async fn main() -> Result<()> { let server = Server::new(config)?; server.run().await?; } + Command::Test { config_file } => { + let config = read_config(config_file)?; + + let server = Server::new(config)?; + server.test().await?; + } Command::FirstLogin { creds, user_secrets, diff --git a/src/server.rs b/src/server.rs index e1ab599..1fd21b4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,23 @@ -use anyhow::{bail, Result}; use std::sync::Arc; +use anyhow::{bail, Result}; +use futures::{try_join, StreamExt}; +use log::*; use rusoto_signature::Region; +use tokio::sync::watch; use crate::config::*; +use crate::lmtp::*; use crate::login::{ldap_provider::*, static_provider::*, *}; use crate::mailbox::Mailbox; pub struct Server { - pub login_provider: Box, + pub login_provider: Arc, + pub lmtp_server: Option>, } impl Server { - pub fn new(config: Config) -> Result> { + pub fn new(config: Config) -> Result { let s3_region = Region::Custom { name: config.aws_region.clone(), endpoint: config.s3_endpoint, @@ -21,17 +26,43 @@ impl Server { name: config.aws_region, endpoint: config.k2v_endpoint, }; - let login_provider: Box = match (config.login_static, config.login_ldap) - { - (Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), - (None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), - (Some(_), Some(_)) => bail!("A single login provider must be set up in config file"), - (None, None) => bail!("No login provider is set up in config file"), + let login_provider: Arc = + match (config.login_static, config.login_ldap) { + (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), + (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), + (Some(_), Some(_)) => { + bail!("A single login provider must be set up in config file") + } + (None, None) => bail!("No login provider is set up in config file"), + }; + + let lmtp_server = config + .lmtp + .map(|cfg| LmtpServer::new(cfg, login_provider.clone())); + + Ok(Self { + login_provider, + lmtp_server, + }) + } + + pub async fn run(&self) -> Result<()> { + let (exit_signal, provoke_exit) = watch_ctrl_c(); + let exit_on_err = move |err: anyhow::Error| { + error!("Error: {}", err); + let _ = provoke_exit.send(true); }; - Ok(Arc::new(Self { login_provider })) + + try_join!(async { + match self.lmtp_server.as_ref() { + None => Ok(()), + Some(s) => s.run(exit_signal.clone()).await, + } + })?; + Ok(()) } - pub async fn run(self: &Arc) -> Result<()> { + pub async fn test(&self) -> Result<()> { let creds = self.login_provider.login("lx", "plop").await?; let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?; @@ -41,3 +72,17 @@ impl Server { Ok(()) } } + +pub fn watch_ctrl_c() -> (watch::Receiver, Arc>) { + let (send_cancel, watch_cancel) = watch::channel(false); + let send_cancel = Arc::new(send_cancel); + let send_cancel_2 = send_cancel.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + (watch_cancel, send_cancel_2) +} -- cgit v1.2.3 From dd62efa24c66eaec87b0cbb99fc48d6bf8441801 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 1 Jun 2022 00:02:27 +0200 Subject: Implement FromStr for MailUuid --- src/mail_uuid.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs index b784e78..647238f 100644 --- a/src/mail_uuid.rs +++ b/src/mail_uuid.rs @@ -1,4 +1,5 @@ use std::sync::atomic::{AtomicU64, Ordering}; +use std::str::FromStr; use lazy_static::lazy_static; use rand::prelude::*; @@ -54,15 +55,7 @@ impl<'de> Deserialize<'de> for MailUuid { D: Deserializer<'de>, { let v = String::deserialize(d)?; - let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?; - - if bytes.len() != 24 { - return Err(D::Error::custom("bad length")); - } - - let mut tmp = [0u8; 24]; - tmp[..].copy_from_slice(&bytes); - Ok(Self(tmp)) + MailUuid::from_str(&v).map_err(D::Error::custom) } } @@ -80,3 +73,19 @@ impl ToString for MailUuid { hex::encode(self.0) } } + +impl FromStr for MailUuid { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + + if bytes.len() != 24 { + return Err("bad length"); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(MailUuid(tmp)) + } +} -- cgit v1.2.3 From 0700e27127e4644dbd323b9a22d994209143fa2a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 1 Jun 2022 00:06:26 +0200 Subject: Implement ToString and FromStr for bayou timestamp --- src/bayou.rs | 38 +++++++++++++++++++++++--------------- src/mail_uuid.rs | 2 +- 2 files changed, 24 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/bayou.rs b/src/bayou.rs index c9ae67f..7a76222 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,3 +1,4 @@ +use std::str::FromStr; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; @@ -123,7 +124,7 @@ impl Bayou { .collect(); // 3. List all operations starting from checkpoint - let ts_ser = self.checkpoint.0.serialize(); + let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .k2v @@ -148,8 +149,9 @@ impl Bayou { let mut ops = vec![]; for (tsstr, val) in ops_map { - let ts = Timestamp::parse(&tsstr) - .ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?; + let ts = tsstr + .parse::() + .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; if val.value.len() != 1 { bail!("Invalid operation, has {} values", val.value.len()); } @@ -251,7 +253,7 @@ impl Bayou { self.k2v .insert_item( &self.path, - &ts.serialize(), + &ts.to_string(), seal_serialize(&op, &self.key)?, None, ) @@ -316,7 +318,7 @@ impl Bayou { let ts_cp = self.history[i_cp].0; debug!( "(cp) we could checkpoint at time {} (index {} in history)", - ts_cp.serialize(), + ts_cp.to_string(), i_cp ); @@ -330,13 +332,13 @@ impl Bayou { { debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", - last_cp.0.serialize() + last_cp.0.to_string() ); return Ok(()); } } - debug!("(cp) saving checkpoint at {}", ts_cp.serialize()); + debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -356,7 +358,7 @@ impl Bayou { let mut por = PutObjectRequest::default(); por.bucket = self.bucket.clone(); - por.key = format!("{}/checkpoint/{}", self.path, ts_cp.serialize()); + por.key = format!("{}/checkpoint/{}", self.path, ts_cp.to_string()); por.body = Some(cryptoblob.into()); self.s3.put_object(por).await?; @@ -375,7 +377,7 @@ impl Bayou { } // Delete corresponding range of operations - let ts_ser = existing_checkpoints[last_to_keep].0.serialize(); + let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); self.k2v .delete_batch(&[BatchDeleteOp { partition_key: &self.path, @@ -414,7 +416,7 @@ impl Bayou { for object in checkpoints_res.contents.unwrap_or_default() { if let Some(key) = object.key { if let Some(ckid) = key.strip_prefix(&prefix) { - if let Some(ts) = Timestamp::parse(ckid) { + if let Ok(ts) = ckid.parse::() { checkpoints.push((ts, key)); } } @@ -451,20 +453,26 @@ impl Timestamp { pub fn zero() -> Self { Self { msec: 0, rand: 0 } } +} - pub fn serialize(&self) -> String { +impl ToString for Timestamp { + fn to_string(&self) -> String { let mut bytes = [0u8; 16]; bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); hex::encode(&bytes) } +} - pub fn parse(v: &str) -> Option { - let bytes = hex::decode(v).ok()?; +impl FromStr for Timestamp { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; if bytes.len() != 16 { - return None; + return Err("bad length"); } - Some(Self { + Ok(Self { msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), }) diff --git a/src/mail_uuid.rs b/src/mail_uuid.rs index 647238f..ab76bce 100644 --- a/src/mail_uuid.rs +++ b/src/mail_uuid.rs @@ -1,5 +1,5 @@ -use std::sync::atomic::{AtomicU64, Ordering}; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; use lazy_static::lazy_static; use rand::prelude::*; -- cgit v1.2.3